[Git][java-team/disruptor][master] 9 commits: Standards-Version updated to 4.1.4

Emmanuel Bourg gitlab at salsa.debian.org
Mon Jun 4 21:47:50 BST 2018


Emmanuel Bourg pushed to branch master at Debian Java Maintainers / disruptor


Commits:
c256be5f by Emmanuel Bourg at 2018-06-04T22:31:48+02:00
Standards-Version updated to 4.1.4

- - - - -
7534ec40 by Emmanuel Bourg at 2018-06-04T22:34:04+02:00
New upstream version 3.3.9
- - - - -
09b0feb9 by Emmanuel Bourg at 2018-06-04T22:34:05+02:00
Update upstream source from tag 'upstream/3.3.9'

Update to upstream version '3.3.9'
with Debian dir 08cc71f6925e45b0274cac9378e6de29ec183f72
- - - - -
d19d76c0 by Emmanuel Bourg at 2018-06-04T22:34:30+02:00
New upstream version 3.3.10
- - - - -
630c1583 by Emmanuel Bourg at 2018-06-04T22:34:30+02:00
Update upstream source from tag 'upstream/3.3.10'

Update to upstream version '3.3.10'
with Debian dir 08cc71f6925e45b0274cac9378e6de29ec183f72
- - - - -
2b92f9b2 by Emmanuel Bourg at 2018-06-04T22:34:49+02:00
New upstream version 3.3.11
- - - - -
c47ce5e1 by Emmanuel Bourg at 2018-06-04T22:34:50+02:00
Update upstream source from tag 'upstream/3.3.11'

Update to upstream version '3.3.11'
with Debian dir 08cc71f6925e45b0274cac9378e6de29ec183f72
- - - - -
4f0b807a by Emmanuel Bourg at 2018-06-04T22:40:22+02:00
Use salsa.debian.org Vcs-* URLs

- - - - -
be611096 by Emmanuel Bourg at 2018-06-04T22:47:07+02:00
New upstream release (3.3.11)

- - - - -


9 changed files:

- README.md
- build.gradle
- debian/changelog
- debian/control
- debian/pom.xml
- debian/rules
- src/main/java/com/lmax/disruptor/BatchEventProcessor.java
- src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
- src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java


Changes:

=====================================
README.md
=====================================
--- a/README.md
+++ b/README.md
@@ -15,6 +15,18 @@ A High Performance Inter-Thread Messaging Library
 
 ## Changelog
 
+### 3.3.11
+
+- Fix race condition in BatchEventProcessor with 3 or more starting/halting concurrently.
+
+### 3.3.10
+
+- Fix race condition in BatchEventProcessor between run() and halt().
+
+### 3.3.9
+
+- Change SleepingWaitStrategy to use a parkNanos(100).
+
 ### 3.3.8
 
 - Revert belt and braces WaitStategy signalling.


=====================================
build.gradle
=====================================
--- a/build.gradle
+++ b/build.gradle
@@ -24,7 +24,7 @@ apply plugin: 'idea'
 defaultTasks 'build'
 
 group = 'com.lmax'
-version = new Version(major: 3, minor: 3, revision: 8)
+version = new Version(major: 3, minor: 3, revision: 11)
 
 ext {
     fullName = 'Disruptor Framework'


=====================================
debian/changelog
=====================================
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,12 @@
+disruptor (3.3.11-1) unstable; urgency=medium
+
+  * Team upload.
+  * New upstream release
+  * Standards-Version updated to 4.1.4
+  * Use salsa.debian.org Vcs-* URLs
+
+ -- Emmanuel Bourg <ebourg at apache.org>  Mon, 04 Jun 2018 22:40:39 +0200
+
 disruptor (3.3.8-1) unstable; urgency=medium
 
   [ tony mancill ]


=====================================
debian/control
=====================================
--- a/debian/control
+++ b/debian/control
@@ -5,9 +5,9 @@ Maintainer: Debian Java Maintainers <pkg-java-maintainers at lists.alioth.debian.or
 Uploaders: Emmanuel Bourg <ebourg at apache.org>,
  tony mancill <tmancill at debian.org>
 Build-Depends: debhelper (>= 11), default-jdk, maven-debian-helper (>= 1.5)
-Standards-Version: 4.1.3
-Vcs-Git: https://anonscm.debian.org/git/pkg-java/disruptor.git
-Vcs-Browser: https://anonscm.debian.org/cgit/pkg-java/disruptor.git
+Standards-Version: 4.1.4
+Vcs-Git: https://salsa.debian.org/java-team/disruptor.git
+Vcs-Browser: https://salsa.debian.org/java-team/disruptor
 Homepage: https://github.com/LMAX-Exchange/disruptor/wiki
 
 Package: libdisruptor-java


=====================================
debian/pom.xml
=====================================
--- a/debian/pom.xml
+++ b/debian/pom.xml
@@ -4,7 +4,7 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.lmax</groupId>
   <artifactId>disruptor</artifactId>
-  <version>3.3.7</version>
+  <version>3.3.11</version>
   <name>Disruptor Framework</name>
   <description>Disruptor - Concurrent Programming Framework</description>
   <url>http://lmax-exchange.github.com/disruptor</url>


=====================================
debian/rules
=====================================
--- a/debian/rules
+++ b/debian/rules
@@ -17,8 +17,5 @@ override_dh_clean:
 	dh_clean
 	rm -f pom.xml
 
-get-orig-source:
-	uscan --download-current-version --force-download --verbose
-
 get-orig-pom:
 	wget http://central.maven.org/maven2/com/lmax/disruptor/$(VERSION)/disruptor-$(VERSION).pom -O debian/pom.xml


=====================================
src/main/java/com/lmax/disruptor/BatchEventProcessor.java
=====================================
--- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java
+++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java
@@ -15,7 +15,7 @@
  */
 package com.lmax.disruptor;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
@@ -30,7 +30,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public final class BatchEventProcessor<T>
     implements EventProcessor
 {
-    private final AtomicBoolean running = new AtomicBoolean(false);
+    private static final int IDLE = 0;
+    private static final int HALTED = IDLE + 1;
+    private static final int RUNNING = HALTED + 1;
+
+    private final AtomicInteger running = new AtomicInteger(IDLE);
     private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
     private final DataProvider<T> dataProvider;
     private final SequenceBarrier sequenceBarrier;
@@ -62,9 +66,9 @@ public final class BatchEventProcessor<T>
         }
 
         batchStartAware =
-                (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
+            (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
         timeoutHandler =
-                (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
+            (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
     }
 
     @Override
@@ -76,14 +80,14 @@ public final class BatchEventProcessor<T>
     @Override
     public void halt()
     {
-        running.set(false);
+        running.set(HALTED);
         sequenceBarrier.alert();
     }
 
     @Override
     public boolean isRunning()
     {
-        return running.get();
+        return running.get() != IDLE;
     }
 
     /**
@@ -109,61 +113,88 @@ public final class BatchEventProcessor<T>
     @Override
     public void run()
     {
-        if (!running.compareAndSet(false, true))
+        if (running.compareAndSet(IDLE, RUNNING))
         {
-            throw new IllegalStateException("Thread is already running");
-        }
-        sequenceBarrier.clearAlert();
+            sequenceBarrier.clearAlert();
 
-        notifyStart();
+            notifyStart();
+            try
+            {
+                if (running.get() == RUNNING)
+                {
+                    processEvents();
+                }
+            }
+            finally
+            {
+                notifyShutdown();
+                running.set(IDLE);
+            }
+        }
+        else
+        {
+            // This is a little bit of guess work.  The running state could of changed to HALTED by
+            // this point.  However, Java does not have compareAndExchange which is the only way
+            // to get it exactly correct.
+            if (running.get() == RUNNING)
+            {
+                throw new IllegalStateException("Thread is already running");
+            }
+            else
+            {
+                earlyExit();
+            }
+        }
+    }
 
+    private void processEvents()
+    {
         T event = null;
         long nextSequence = sequence.get() + 1L;
-        try
+
+        while (true)
         {
-            while (true)
+            try
             {
-                try
-                {
-                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
-                    if (batchStartAware != null)
-                    {
-                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
-                    }
-
-                    while (nextSequence <= availableSequence)
-                    {
-                        event = dataProvider.get(nextSequence);
-                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
-                        nextSequence++;
-                    }
-
-                    sequence.set(availableSequence);
-                }
-                catch (final TimeoutException e)
+                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
+                if (batchStartAware != null)
                 {
-                    notifyTimeout(sequence.get());
+                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                 }
-                catch (final AlertException ex)
+
+                while (nextSequence <= availableSequence)
                 {
-                    if (!running.get())
-                    {
-                        break;
-                    }
+                    event = dataProvider.get(nextSequence);
+                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
+                    nextSequence++;
                 }
-                catch (final Throwable ex)
+
+                sequence.set(availableSequence);
+            }
+            catch (final TimeoutException e)
+            {
+                notifyTimeout(sequence.get());
+            }
+            catch (final AlertException ex)
+            {
+                if (running.get() != RUNNING)
                 {
-                    exceptionHandler.handleEventException(ex, nextSequence, event);
-                    sequence.set(nextSequence);
-                    nextSequence++;
+                    break;
                 }
             }
+            catch (final Throwable ex)
+            {
+                exceptionHandler.handleEventException(ex, nextSequence, event);
+                sequence.set(nextSequence);
+                nextSequence++;
+            }
         }
-        finally
-        {
-            notifyShutdown();
-            running.set(false);
-        }
+    }
+
+    private void earlyExit()
+    {
+        notifyStart();
+        notifyShutdown();
     }
 
     private void notifyTimeout(final long availableSequence)


=====================================
src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
@@ -29,23 +29,31 @@ import java.util.concurrent.locks.LockSupport;
 public final class SleepingWaitStrategy implements WaitStrategy
 {
     private static final int DEFAULT_RETRIES = 200;
+    private static final long DEFAULT_SLEEP = 100;
 
     private final int retries;
+    private final long sleepTimeNs;
 
     public SleepingWaitStrategy()
     {
-        this(DEFAULT_RETRIES);
+        this(DEFAULT_RETRIES, DEFAULT_SLEEP);
     }
 
     public SleepingWaitStrategy(int retries)
     {
+        this(retries, DEFAULT_SLEEP);
+    }
+
+    public SleepingWaitStrategy(int retries, long sleepTimeNs)
+    {
         this.retries = retries;
+        this.sleepTimeNs = sleepTimeNs;
     }
 
     @Override
     public long waitFor(
         final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
-        throws AlertException, InterruptedException
+        throws AlertException
     {
         long availableSequence;
         int counter = retries;
@@ -79,7 +87,7 @@ public final class SleepingWaitStrategy implements WaitStrategy
         }
         else
         {
-            LockSupport.parkNanos(1L);
+            LockSupport.parkNanos(sleepTimeNs);
         }
 
         return counter;


=====================================
src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
+++ b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
@@ -187,4 +187,91 @@ public final class BatchEventProcessorTest
 
         assertEquals(Arrays.asList(3L, 2L, 1L), batchSizes);
     }
+
+    @Test
+    public void shouldAlwaysHalt() throws InterruptedException
+    {
+        WaitStrategy waitStrategy = new BusySpinWaitStrategy();
+        final SingleProducerSequencer sequencer = new SingleProducerSequencer(8, waitStrategy);
+        final ProcessingSequenceBarrier barrier = new ProcessingSequenceBarrier(
+            sequencer, waitStrategy, new Sequence(-1), new Sequence[0]);
+        DataProvider<Object> dp = new DataProvider<Object>()
+        {
+            @Override
+            public Object get(long sequence)
+            {
+                return null;
+            }
+        };
+
+        final LatchLifeCycleHandler h1 = new LatchLifeCycleHandler();
+        final BatchEventProcessor p1 = new BatchEventProcessor<Object>(dp, barrier, h1);
+
+        Thread t1 = new Thread(p1);
+        p1.halt();
+        t1.start();
+
+        assertTrue(h1.awaitStart(2, TimeUnit.SECONDS));
+        assertTrue(h1.awaitStop(2, TimeUnit.SECONDS));
+
+        for (int i = 0; i < 1000; i++)
+        {
+            final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
+            final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+            Thread t2 = new Thread(p2);
+            t2.start();
+            p2.halt();
+
+            assertTrue(h2.awaitStart(2, TimeUnit.SECONDS));
+            assertTrue(h2.awaitStop(2, TimeUnit.SECONDS));
+        }
+
+        for (int i = 0; i < 1000; i++)
+        {
+            final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
+            final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+            Thread t2 = new Thread(p2);
+            t2.start();
+            Thread.yield();
+            p2.halt();
+
+            assertTrue(h2.awaitStart(2, TimeUnit.SECONDS));
+            assertTrue(h2.awaitStop(2, TimeUnit.SECONDS));
+        }
+    }
+
+    private static class LatchLifeCycleHandler implements EventHandler<Object>, LifecycleAware
+    {
+        private final CountDownLatch startLatch = new CountDownLatch(1);
+        private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+        @Override
+        public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception
+        {
+
+        }
+
+        @Override
+        public void onStart()
+        {
+            startLatch.countDown();
+        }
+
+        @Override
+        public void onShutdown()
+        {
+            stopLatch.countDown();
+        }
+
+        public boolean awaitStart(long time, TimeUnit unit) throws InterruptedException
+        {
+            return startLatch.await(time, unit);
+        }
+
+
+        public boolean awaitStop(long time, TimeUnit unit) throws InterruptedException
+        {
+            return stopLatch.await(time, unit);
+        }
+    }
 }



View it on GitLab: https://salsa.debian.org/java-team/disruptor/compare/6bf5ca156d6ad206d68b168884fe88523f6d6702...be6110967119c94f54f922c68e17dceb725c106f

-- 
View it on GitLab: https://salsa.debian.org/java-team/disruptor/compare/6bf5ca156d6ad206d68b168884fe88523f6d6702...be6110967119c94f54f922c68e17dceb725c106f
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20180604/1469e672/attachment.html>


More information about the pkg-java-commits mailing list