[Git][java-team/disruptor][upstream] 3 commits: New upstream version 3.3.9

Emmanuel Bourg gitlab at salsa.debian.org
Mon Jun 4 21:48:00 BST 2018


Emmanuel Bourg pushed to branch upstream at Debian Java Maintainers / disruptor


Commits:
7534ec40 by Emmanuel Bourg at 2018-06-04T22:34:04+02:00
New upstream version 3.3.9
- - - - -
d19d76c0 by Emmanuel Bourg at 2018-06-04T22:34:30+02:00
New upstream version 3.3.10
- - - - -
2b92f9b2 by Emmanuel Bourg at 2018-06-04T22:34:49+02:00
New upstream version 3.3.11
- - - - -


5 changed files:

- README.md
- build.gradle
- src/main/java/com/lmax/disruptor/BatchEventProcessor.java
- src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
- src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java


Changes:

=====================================
README.md
=====================================
--- a/README.md
+++ b/README.md
@@ -15,6 +15,18 @@ A High Performance Inter-Thread Messaging Library
 
 ## Changelog
 
+### 3.3.11
+
+- Fix race condition in BatchEventProcessor with 3 or more starting/halting concurrently.
+
+### 3.3.10
+
+- Fix race condition in BatchEventProcessor between run() and halt().
+
+### 3.3.9
+
+- Change SleepingWaitStrategy to use a parkNanos(100).
+
 ### 3.3.8
 
 - Revert belt and braces WaitStategy signalling.


=====================================
build.gradle
=====================================
--- a/build.gradle
+++ b/build.gradle
@@ -24,7 +24,7 @@ apply plugin: 'idea'
 defaultTasks 'build'
 
 group = 'com.lmax'
-version = new Version(major: 3, minor: 3, revision: 8)
+version = new Version(major: 3, minor: 3, revision: 11)
 
 ext {
     fullName = 'Disruptor Framework'


=====================================
src/main/java/com/lmax/disruptor/BatchEventProcessor.java
=====================================
--- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java
+++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java
@@ -15,7 +15,7 @@
  */
 package com.lmax.disruptor;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
@@ -30,7 +30,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public final class BatchEventProcessor<T>
     implements EventProcessor
 {
-    private final AtomicBoolean running = new AtomicBoolean(false);
+    private static final int IDLE = 0;
+    private static final int HALTED = IDLE + 1;
+    private static final int RUNNING = HALTED + 1;
+
+    private final AtomicInteger running = new AtomicInteger(IDLE);
     private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
     private final DataProvider<T> dataProvider;
     private final SequenceBarrier sequenceBarrier;
@@ -62,9 +66,9 @@ public final class BatchEventProcessor<T>
         }
 
         batchStartAware =
-                (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
+            (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
         timeoutHandler =
-                (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
+            (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
     }
 
     @Override
@@ -76,14 +80,14 @@ public final class BatchEventProcessor<T>
     @Override
     public void halt()
     {
-        running.set(false);
+        running.set(HALTED);
         sequenceBarrier.alert();
     }
 
     @Override
     public boolean isRunning()
     {
-        return running.get();
+        return running.get() != IDLE;
     }
 
     /**
@@ -109,61 +113,88 @@ public final class BatchEventProcessor<T>
     @Override
     public void run()
     {
-        if (!running.compareAndSet(false, true))
+        if (running.compareAndSet(IDLE, RUNNING))
         {
-            throw new IllegalStateException("Thread is already running");
-        }
-        sequenceBarrier.clearAlert();
+            sequenceBarrier.clearAlert();
 
-        notifyStart();
+            notifyStart();
+            try
+            {
+                if (running.get() == RUNNING)
+                {
+                    processEvents();
+                }
+            }
+            finally
+            {
+                notifyShutdown();
+                running.set(IDLE);
+            }
+        }
+        else
+        {
+            // This is a little bit of guess work.  The running state could of changed to HALTED by
+            // this point.  However, Java does not have compareAndExchange which is the only way
+            // to get it exactly correct.
+            if (running.get() == RUNNING)
+            {
+                throw new IllegalStateException("Thread is already running");
+            }
+            else
+            {
+                earlyExit();
+            }
+        }
+    }
 
+    private void processEvents()
+    {
         T event = null;
         long nextSequence = sequence.get() + 1L;
-        try
+
+        while (true)
         {
-            while (true)
+            try
             {
-                try
-                {
-                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
-                    if (batchStartAware != null)
-                    {
-                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
-                    }
-
-                    while (nextSequence <= availableSequence)
-                    {
-                        event = dataProvider.get(nextSequence);
-                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
-                        nextSequence++;
-                    }
-
-                    sequence.set(availableSequence);
-                }
-                catch (final TimeoutException e)
+                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
+                if (batchStartAware != null)
                 {
-                    notifyTimeout(sequence.get());
+                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                 }
-                catch (final AlertException ex)
+
+                while (nextSequence <= availableSequence)
                 {
-                    if (!running.get())
-                    {
-                        break;
-                    }
+                    event = dataProvider.get(nextSequence);
+                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
+                    nextSequence++;
                 }
-                catch (final Throwable ex)
+
+                sequence.set(availableSequence);
+            }
+            catch (final TimeoutException e)
+            {
+                notifyTimeout(sequence.get());
+            }
+            catch (final AlertException ex)
+            {
+                if (running.get() != RUNNING)
                 {
-                    exceptionHandler.handleEventException(ex, nextSequence, event);
-                    sequence.set(nextSequence);
-                    nextSequence++;
+                    break;
                 }
             }
+            catch (final Throwable ex)
+            {
+                exceptionHandler.handleEventException(ex, nextSequence, event);
+                sequence.set(nextSequence);
+                nextSequence++;
+            }
         }
-        finally
-        {
-            notifyShutdown();
-            running.set(false);
-        }
+    }
+
+    private void earlyExit()
+    {
+        notifyStart();
+        notifyShutdown();
     }
 
     private void notifyTimeout(final long availableSequence)


=====================================
src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
@@ -29,23 +29,31 @@ import java.util.concurrent.locks.LockSupport;
 public final class SleepingWaitStrategy implements WaitStrategy
 {
     private static final int DEFAULT_RETRIES = 200;
+    private static final long DEFAULT_SLEEP = 100;
 
     private final int retries;
+    private final long sleepTimeNs;
 
     public SleepingWaitStrategy()
     {
-        this(DEFAULT_RETRIES);
+        this(DEFAULT_RETRIES, DEFAULT_SLEEP);
     }
 
     public SleepingWaitStrategy(int retries)
     {
+        this(retries, DEFAULT_SLEEP);
+    }
+
+    public SleepingWaitStrategy(int retries, long sleepTimeNs)
+    {
         this.retries = retries;
+        this.sleepTimeNs = sleepTimeNs;
     }
 
     @Override
     public long waitFor(
         final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
-        throws AlertException, InterruptedException
+        throws AlertException
     {
         long availableSequence;
         int counter = retries;
@@ -79,7 +87,7 @@ public final class SleepingWaitStrategy implements WaitStrategy
         }
         else
         {
-            LockSupport.parkNanos(1L);
+            LockSupport.parkNanos(sleepTimeNs);
         }
 
         return counter;


=====================================
src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
+++ b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
@@ -187,4 +187,91 @@ public final class BatchEventProcessorTest
 
         assertEquals(Arrays.asList(3L, 2L, 1L), batchSizes);
     }
+
+    @Test
+    public void shouldAlwaysHalt() throws InterruptedException
+    {
+        WaitStrategy waitStrategy = new BusySpinWaitStrategy();
+        final SingleProducerSequencer sequencer = new SingleProducerSequencer(8, waitStrategy);
+        final ProcessingSequenceBarrier barrier = new ProcessingSequenceBarrier(
+            sequencer, waitStrategy, new Sequence(-1), new Sequence[0]);
+        DataProvider<Object> dp = new DataProvider<Object>()
+        {
+            @Override
+            public Object get(long sequence)
+            {
+                return null;
+            }
+        };
+
+        final LatchLifeCycleHandler h1 = new LatchLifeCycleHandler();
+        final BatchEventProcessor p1 = new BatchEventProcessor<Object>(dp, barrier, h1);
+
+        Thread t1 = new Thread(p1);
+        p1.halt();
+        t1.start();
+
+        assertTrue(h1.awaitStart(2, TimeUnit.SECONDS));
+        assertTrue(h1.awaitStop(2, TimeUnit.SECONDS));
+
+        for (int i = 0; i < 1000; i++)
+        {
+            final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
+            final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+            Thread t2 = new Thread(p2);
+            t2.start();
+            p2.halt();
+
+            assertTrue(h2.awaitStart(2, TimeUnit.SECONDS));
+            assertTrue(h2.awaitStop(2, TimeUnit.SECONDS));
+        }
+
+        for (int i = 0; i < 1000; i++)
+        {
+            final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
+            final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+            Thread t2 = new Thread(p2);
+            t2.start();
+            Thread.yield();
+            p2.halt();
+
+            assertTrue(h2.awaitStart(2, TimeUnit.SECONDS));
+            assertTrue(h2.awaitStop(2, TimeUnit.SECONDS));
+        }
+    }
+
+    private static class LatchLifeCycleHandler implements EventHandler<Object>, LifecycleAware
+    {
+        private final CountDownLatch startLatch = new CountDownLatch(1);
+        private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+        @Override
+        public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception
+        {
+
+        }
+
+        @Override
+        public void onStart()
+        {
+            startLatch.countDown();
+        }
+
+        @Override
+        public void onShutdown()
+        {
+            stopLatch.countDown();
+        }
+
+        public boolean awaitStart(long time, TimeUnit unit) throws InterruptedException
+        {
+            return startLatch.await(time, unit);
+        }
+
+
+        public boolean awaitStop(long time, TimeUnit unit) throws InterruptedException
+        {
+            return stopLatch.await(time, unit);
+        }
+    }
 }



View it on GitLab: https://salsa.debian.org/java-team/disruptor/compare/c26cf52ca4f2c39e3994959ee800bba6b5062b60...2b92f9b235ea7ce41b6ec941c2a1451c3b9c9214

-- 
View it on GitLab: https://salsa.debian.org/java-team/disruptor/compare/c26cf52ca4f2c39e3994959ee800bba6b5062b60...2b92f9b235ea7ce41b6ec941c2a1451c3b9c9214
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20180604/c2cc7d25/attachment.html>


More information about the pkg-java-commits mailing list