[Git][java-team/disruptor][master] 9 commits: Standards-Version updated to 4.1.4
Emmanuel Bourg
gitlab at salsa.debian.org
Mon Jun 4 21:47:50 BST 2018
Emmanuel Bourg pushed to branch master at Debian Java Maintainers / disruptor
Commits:
c256be5f by Emmanuel Bourg at 2018-06-04T22:31:48+02:00
Standards-Version updated to 4.1.4
- - - - -
7534ec40 by Emmanuel Bourg at 2018-06-04T22:34:04+02:00
New upstream version 3.3.9
- - - - -
09b0feb9 by Emmanuel Bourg at 2018-06-04T22:34:05+02:00
Update upstream source from tag 'upstream/3.3.9'
Update to upstream version '3.3.9'
with Debian dir 08cc71f6925e45b0274cac9378e6de29ec183f72
- - - - -
d19d76c0 by Emmanuel Bourg at 2018-06-04T22:34:30+02:00
New upstream version 3.3.10
- - - - -
630c1583 by Emmanuel Bourg at 2018-06-04T22:34:30+02:00
Update upstream source from tag 'upstream/3.3.10'
Update to upstream version '3.3.10'
with Debian dir 08cc71f6925e45b0274cac9378e6de29ec183f72
- - - - -
2b92f9b2 by Emmanuel Bourg at 2018-06-04T22:34:49+02:00
New upstream version 3.3.11
- - - - -
c47ce5e1 by Emmanuel Bourg at 2018-06-04T22:34:50+02:00
Update upstream source from tag 'upstream/3.3.11'
Update to upstream version '3.3.11'
with Debian dir 08cc71f6925e45b0274cac9378e6de29ec183f72
- - - - -
4f0b807a by Emmanuel Bourg at 2018-06-04T22:40:22+02:00
Use salsa.debian.org Vcs-* URLs
- - - - -
be611096 by Emmanuel Bourg at 2018-06-04T22:47:07+02:00
New upstream release (3.3.11)
- - - - -
9 changed files:
- README.md
- build.gradle
- debian/changelog
- debian/control
- debian/pom.xml
- debian/rules
- src/main/java/com/lmax/disruptor/BatchEventProcessor.java
- src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
- src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
Changes:
=====================================
README.md
=====================================
--- a/README.md
+++ b/README.md
@@ -15,6 +15,18 @@ A High Performance Inter-Thread Messaging Library
## Changelog
+### 3.3.11
+
+- Fix race condition in BatchEventProcessor with 3 or more starting/halting concurrently.
+
+### 3.3.10
+
+- Fix race condition in BatchEventProcessor between run() and halt().
+
+### 3.3.9
+
+- Change SleepingWaitStrategy to use a parkNanos(100).
+
### 3.3.8
- Revert belt and braces WaitStategy signalling.
=====================================
build.gradle
=====================================
--- a/build.gradle
+++ b/build.gradle
@@ -24,7 +24,7 @@ apply plugin: 'idea'
defaultTasks 'build'
group = 'com.lmax'
-version = new Version(major: 3, minor: 3, revision: 8)
+version = new Version(major: 3, minor: 3, revision: 11)
ext {
fullName = 'Disruptor Framework'
=====================================
debian/changelog
=====================================
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,12 @@
+disruptor (3.3.11-1) unstable; urgency=medium
+
+ * Team upload.
+ * New upstream release
+ * Standards-Version updated to 4.1.4
+ * Use salsa.debian.org Vcs-* URLs
+
+ -- Emmanuel Bourg <ebourg at apache.org> Mon, 04 Jun 2018 22:40:39 +0200
+
disruptor (3.3.8-1) unstable; urgency=medium
[ tony mancill ]
=====================================
debian/control
=====================================
--- a/debian/control
+++ b/debian/control
@@ -5,9 +5,9 @@ Maintainer: Debian Java Maintainers <pkg-java-maintainers at lists.alioth.debian.or
Uploaders: Emmanuel Bourg <ebourg at apache.org>,
tony mancill <tmancill at debian.org>
Build-Depends: debhelper (>= 11), default-jdk, maven-debian-helper (>= 1.5)
-Standards-Version: 4.1.3
-Vcs-Git: https://anonscm.debian.org/git/pkg-java/disruptor.git
-Vcs-Browser: https://anonscm.debian.org/cgit/pkg-java/disruptor.git
+Standards-Version: 4.1.4
+Vcs-Git: https://salsa.debian.org/java-team/disruptor.git
+Vcs-Browser: https://salsa.debian.org/java-team/disruptor
Homepage: https://github.com/LMAX-Exchange/disruptor/wiki
Package: libdisruptor-java
=====================================
debian/pom.xml
=====================================
--- a/debian/pom.xml
+++ b/debian/pom.xml
@@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
- <version>3.3.7</version>
+ <version>3.3.11</version>
<name>Disruptor Framework</name>
<description>Disruptor - Concurrent Programming Framework</description>
<url>http://lmax-exchange.github.com/disruptor</url>
=====================================
debian/rules
=====================================
--- a/debian/rules
+++ b/debian/rules
@@ -17,8 +17,5 @@ override_dh_clean:
dh_clean
rm -f pom.xml
-get-orig-source:
- uscan --download-current-version --force-download --verbose
-
get-orig-pom:
wget http://central.maven.org/maven2/com/lmax/disruptor/$(VERSION)/disruptor-$(VERSION).pom -O debian/pom.xml
=====================================
src/main/java/com/lmax/disruptor/BatchEventProcessor.java
=====================================
--- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java
+++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java
@@ -15,7 +15,7 @@
*/
package com.lmax.disruptor;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -30,7 +30,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
public final class BatchEventProcessor<T>
implements EventProcessor
{
- private final AtomicBoolean running = new AtomicBoolean(false);
+ private static final int IDLE = 0;
+ private static final int HALTED = IDLE + 1;
+ private static final int RUNNING = HALTED + 1;
+
+ private final AtomicInteger running = new AtomicInteger(IDLE);
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
private final DataProvider<T> dataProvider;
private final SequenceBarrier sequenceBarrier;
@@ -62,9 +66,9 @@ public final class BatchEventProcessor<T>
}
batchStartAware =
- (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
+ (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
timeoutHandler =
- (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
+ (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
}
@Override
@@ -76,14 +80,14 @@ public final class BatchEventProcessor<T>
@Override
public void halt()
{
- running.set(false);
+ running.set(HALTED);
sequenceBarrier.alert();
}
@Override
public boolean isRunning()
{
- return running.get();
+ return running.get() != IDLE;
}
/**
@@ -109,61 +113,88 @@ public final class BatchEventProcessor<T>
@Override
public void run()
{
- if (!running.compareAndSet(false, true))
+ if (running.compareAndSet(IDLE, RUNNING))
{
- throw new IllegalStateException("Thread is already running");
- }
- sequenceBarrier.clearAlert();
+ sequenceBarrier.clearAlert();
- notifyStart();
+ notifyStart();
+ try
+ {
+ if (running.get() == RUNNING)
+ {
+ processEvents();
+ }
+ }
+ finally
+ {
+ notifyShutdown();
+ running.set(IDLE);
+ }
+ }
+ else
+ {
+ // This is a little bit of guess work. The running state could of changed to HALTED by
+ // this point. However, Java does not have compareAndExchange which is the only way
+ // to get it exactly correct.
+ if (running.get() == RUNNING)
+ {
+ throw new IllegalStateException("Thread is already running");
+ }
+ else
+ {
+ earlyExit();
+ }
+ }
+ }
+ private void processEvents()
+ {
T event = null;
long nextSequence = sequence.get() + 1L;
- try
+
+ while (true)
{
- while (true)
+ try
{
- try
- {
- final long availableSequence = sequenceBarrier.waitFor(nextSequence);
- if (batchStartAware != null)
- {
- batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
- }
-
- while (nextSequence <= availableSequence)
- {
- event = dataProvider.get(nextSequence);
- eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
- nextSequence++;
- }
-
- sequence.set(availableSequence);
- }
- catch (final TimeoutException e)
+ final long availableSequence = sequenceBarrier.waitFor(nextSequence);
+ if (batchStartAware != null)
{
- notifyTimeout(sequence.get());
+ batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
- catch (final AlertException ex)
+
+ while (nextSequence <= availableSequence)
{
- if (!running.get())
- {
- break;
- }
+ event = dataProvider.get(nextSequence);
+ eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
+ nextSequence++;
}
- catch (final Throwable ex)
+
+ sequence.set(availableSequence);
+ }
+ catch (final TimeoutException e)
+ {
+ notifyTimeout(sequence.get());
+ }
+ catch (final AlertException ex)
+ {
+ if (running.get() != RUNNING)
{
- exceptionHandler.handleEventException(ex, nextSequence, event);
- sequence.set(nextSequence);
- nextSequence++;
+ break;
}
}
+ catch (final Throwable ex)
+ {
+ exceptionHandler.handleEventException(ex, nextSequence, event);
+ sequence.set(nextSequence);
+ nextSequence++;
+ }
}
- finally
- {
- notifyShutdown();
- running.set(false);
- }
+ }
+
+ private void earlyExit()
+ {
+ notifyStart();
+ notifyShutdown();
}
private void notifyTimeout(final long availableSequence)
=====================================
src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
@@ -29,23 +29,31 @@ import java.util.concurrent.locks.LockSupport;
public final class SleepingWaitStrategy implements WaitStrategy
{
private static final int DEFAULT_RETRIES = 200;
+ private static final long DEFAULT_SLEEP = 100;
private final int retries;
+ private final long sleepTimeNs;
public SleepingWaitStrategy()
{
- this(DEFAULT_RETRIES);
+ this(DEFAULT_RETRIES, DEFAULT_SLEEP);
}
public SleepingWaitStrategy(int retries)
{
+ this(retries, DEFAULT_SLEEP);
+ }
+
+ public SleepingWaitStrategy(int retries, long sleepTimeNs)
+ {
this.retries = retries;
+ this.sleepTimeNs = sleepTimeNs;
}
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
- throws AlertException, InterruptedException
+ throws AlertException
{
long availableSequence;
int counter = retries;
@@ -79,7 +87,7 @@ public final class SleepingWaitStrategy implements WaitStrategy
}
else
{
- LockSupport.parkNanos(1L);
+ LockSupport.parkNanos(sleepTimeNs);
}
return counter;
=====================================
src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
+++ b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
@@ -187,4 +187,91 @@ public final class BatchEventProcessorTest
assertEquals(Arrays.asList(3L, 2L, 1L), batchSizes);
}
+
+ @Test
+ public void shouldAlwaysHalt() throws InterruptedException
+ {
+ WaitStrategy waitStrategy = new BusySpinWaitStrategy();
+ final SingleProducerSequencer sequencer = new SingleProducerSequencer(8, waitStrategy);
+ final ProcessingSequenceBarrier barrier = new ProcessingSequenceBarrier(
+ sequencer, waitStrategy, new Sequence(-1), new Sequence[0]);
+ DataProvider<Object> dp = new DataProvider<Object>()
+ {
+ @Override
+ public Object get(long sequence)
+ {
+ return null;
+ }
+ };
+
+ final LatchLifeCycleHandler h1 = new LatchLifeCycleHandler();
+ final BatchEventProcessor p1 = new BatchEventProcessor<Object>(dp, barrier, h1);
+
+ Thread t1 = new Thread(p1);
+ p1.halt();
+ t1.start();
+
+ assertTrue(h1.awaitStart(2, TimeUnit.SECONDS));
+ assertTrue(h1.awaitStop(2, TimeUnit.SECONDS));
+
+ for (int i = 0; i < 1000; i++)
+ {
+ final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
+ final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+ Thread t2 = new Thread(p2);
+ t2.start();
+ p2.halt();
+
+ assertTrue(h2.awaitStart(2, TimeUnit.SECONDS));
+ assertTrue(h2.awaitStop(2, TimeUnit.SECONDS));
+ }
+
+ for (int i = 0; i < 1000; i++)
+ {
+ final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
+ final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+ Thread t2 = new Thread(p2);
+ t2.start();
+ Thread.yield();
+ p2.halt();
+
+ assertTrue(h2.awaitStart(2, TimeUnit.SECONDS));
+ assertTrue(h2.awaitStop(2, TimeUnit.SECONDS));
+ }
+ }
+
+ private static class LatchLifeCycleHandler implements EventHandler<Object>, LifecycleAware
+ {
+ private final CountDownLatch startLatch = new CountDownLatch(1);
+ private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+ @Override
+ public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception
+ {
+
+ }
+
+ @Override
+ public void onStart()
+ {
+ startLatch.countDown();
+ }
+
+ @Override
+ public void onShutdown()
+ {
+ stopLatch.countDown();
+ }
+
+ public boolean awaitStart(long time, TimeUnit unit) throws InterruptedException
+ {
+ return startLatch.await(time, unit);
+ }
+
+
+ public boolean awaitStop(long time, TimeUnit unit) throws InterruptedException
+ {
+ return stopLatch.await(time, unit);
+ }
+ }
}
View it on GitLab: https://salsa.debian.org/java-team/disruptor/compare/6bf5ca156d6ad206d68b168884fe88523f6d6702...be6110967119c94f54f922c68e17dceb725c106f
--
View it on GitLab: https://salsa.debian.org/java-team/disruptor/compare/6bf5ca156d6ad206d68b168884fe88523f6d6702...be6110967119c94f54f922c68e17dceb725c106f
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20180604/1469e672/attachment.html>
More information about the pkg-java-commits
mailing list