[Git][java-team/disruptor][upstream] 3 commits: New upstream version 3.3.9
Emmanuel Bourg
gitlab at salsa.debian.org
Mon Jun 4 21:48:00 BST 2018
Emmanuel Bourg pushed to branch upstream at Debian Java Maintainers / disruptor
Commits:
7534ec40 by Emmanuel Bourg at 2018-06-04T22:34:04+02:00
New upstream version 3.3.9
- - - - -
d19d76c0 by Emmanuel Bourg at 2018-06-04T22:34:30+02:00
New upstream version 3.3.10
- - - - -
2b92f9b2 by Emmanuel Bourg at 2018-06-04T22:34:49+02:00
New upstream version 3.3.11
- - - - -
5 changed files:
- README.md
- build.gradle
- src/main/java/com/lmax/disruptor/BatchEventProcessor.java
- src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
- src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
Changes:
=====================================
README.md
=====================================
--- a/README.md
+++ b/README.md
@@ -15,6 +15,18 @@ A High Performance Inter-Thread Messaging Library
## Changelog
+### 3.3.11
+
+- Fix race condition in BatchEventProcessor with 3 or more starting/halting concurrently.
+
+### 3.3.10
+
+- Fix race condition in BatchEventProcessor between run() and halt().
+
+### 3.3.9
+
+- Change SleepingWaitStrategy to use a parkNanos(100).
+
### 3.3.8
- Revert belt and braces WaitStategy signalling.
=====================================
build.gradle
=====================================
--- a/build.gradle
+++ b/build.gradle
@@ -24,7 +24,7 @@ apply plugin: 'idea'
defaultTasks 'build'
group = 'com.lmax'
-version = new Version(major: 3, minor: 3, revision: 8)
+version = new Version(major: 3, minor: 3, revision: 11)
ext {
fullName = 'Disruptor Framework'
=====================================
src/main/java/com/lmax/disruptor/BatchEventProcessor.java
=====================================
--- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java
+++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java
@@ -15,7 +15,7 @@
*/
package com.lmax.disruptor;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -30,7 +30,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
public final class BatchEventProcessor<T>
implements EventProcessor
{
- private final AtomicBoolean running = new AtomicBoolean(false);
+ private static final int IDLE = 0;
+ private static final int HALTED = IDLE + 1;
+ private static final int RUNNING = HALTED + 1;
+
+ private final AtomicInteger running = new AtomicInteger(IDLE);
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
private final DataProvider<T> dataProvider;
private final SequenceBarrier sequenceBarrier;
@@ -62,9 +66,9 @@ public final class BatchEventProcessor<T>
}
batchStartAware =
- (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
+ (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
timeoutHandler =
- (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
+ (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
}
@Override
@@ -76,14 +80,14 @@ public final class BatchEventProcessor<T>
@Override
public void halt()
{
- running.set(false);
+ running.set(HALTED);
sequenceBarrier.alert();
}
@Override
public boolean isRunning()
{
- return running.get();
+ return running.get() != IDLE;
}
/**
@@ -109,61 +113,88 @@ public final class BatchEventProcessor<T>
@Override
public void run()
{
- if (!running.compareAndSet(false, true))
+ if (running.compareAndSet(IDLE, RUNNING))
{
- throw new IllegalStateException("Thread is already running");
- }
- sequenceBarrier.clearAlert();
+ sequenceBarrier.clearAlert();
- notifyStart();
+ notifyStart();
+ try
+ {
+ if (running.get() == RUNNING)
+ {
+ processEvents();
+ }
+ }
+ finally
+ {
+ notifyShutdown();
+ running.set(IDLE);
+ }
+ }
+ else
+ {
+ // This is a little bit of guess work. The running state could of changed to HALTED by
+ // this point. However, Java does not have compareAndExchange which is the only way
+ // to get it exactly correct.
+ if (running.get() == RUNNING)
+ {
+ throw new IllegalStateException("Thread is already running");
+ }
+ else
+ {
+ earlyExit();
+ }
+ }
+ }
+ private void processEvents()
+ {
T event = null;
long nextSequence = sequence.get() + 1L;
- try
+
+ while (true)
{
- while (true)
+ try
{
- try
- {
- final long availableSequence = sequenceBarrier.waitFor(nextSequence);
- if (batchStartAware != null)
- {
- batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
- }
-
- while (nextSequence <= availableSequence)
- {
- event = dataProvider.get(nextSequence);
- eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
- nextSequence++;
- }
-
- sequence.set(availableSequence);
- }
- catch (final TimeoutException e)
+ final long availableSequence = sequenceBarrier.waitFor(nextSequence);
+ if (batchStartAware != null)
{
- notifyTimeout(sequence.get());
+ batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
- catch (final AlertException ex)
+
+ while (nextSequence <= availableSequence)
{
- if (!running.get())
- {
- break;
- }
+ event = dataProvider.get(nextSequence);
+ eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
+ nextSequence++;
}
- catch (final Throwable ex)
+
+ sequence.set(availableSequence);
+ }
+ catch (final TimeoutException e)
+ {
+ notifyTimeout(sequence.get());
+ }
+ catch (final AlertException ex)
+ {
+ if (running.get() != RUNNING)
{
- exceptionHandler.handleEventException(ex, nextSequence, event);
- sequence.set(nextSequence);
- nextSequence++;
+ break;
}
}
+ catch (final Throwable ex)
+ {
+ exceptionHandler.handleEventException(ex, nextSequence, event);
+ sequence.set(nextSequence);
+ nextSequence++;
+ }
}
- finally
- {
- notifyShutdown();
- running.set(false);
- }
+ }
+
+ private void earlyExit()
+ {
+ notifyStart();
+ notifyShutdown();
}
private void notifyTimeout(final long availableSequence)
=====================================
src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
@@ -29,23 +29,31 @@ import java.util.concurrent.locks.LockSupport;
public final class SleepingWaitStrategy implements WaitStrategy
{
private static final int DEFAULT_RETRIES = 200;
+ private static final long DEFAULT_SLEEP = 100;
private final int retries;
+ private final long sleepTimeNs;
public SleepingWaitStrategy()
{
- this(DEFAULT_RETRIES);
+ this(DEFAULT_RETRIES, DEFAULT_SLEEP);
}
public SleepingWaitStrategy(int retries)
{
+ this(retries, DEFAULT_SLEEP);
+ }
+
+ public SleepingWaitStrategy(int retries, long sleepTimeNs)
+ {
this.retries = retries;
+ this.sleepTimeNs = sleepTimeNs;
}
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
- throws AlertException, InterruptedException
+ throws AlertException
{
long availableSequence;
int counter = retries;
@@ -79,7 +87,7 @@ public final class SleepingWaitStrategy implements WaitStrategy
}
else
{
- LockSupport.parkNanos(1L);
+ LockSupport.parkNanos(sleepTimeNs);
}
return counter;
=====================================
src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
+++ b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
@@ -187,4 +187,91 @@ public final class BatchEventProcessorTest
assertEquals(Arrays.asList(3L, 2L, 1L), batchSizes);
}
+
+ @Test
+ public void shouldAlwaysHalt() throws InterruptedException
+ {
+ WaitStrategy waitStrategy = new BusySpinWaitStrategy();
+ final SingleProducerSequencer sequencer = new SingleProducerSequencer(8, waitStrategy);
+ final ProcessingSequenceBarrier barrier = new ProcessingSequenceBarrier(
+ sequencer, waitStrategy, new Sequence(-1), new Sequence[0]);
+ DataProvider<Object> dp = new DataProvider<Object>()
+ {
+ @Override
+ public Object get(long sequence)
+ {
+ return null;
+ }
+ };
+
+ final LatchLifeCycleHandler h1 = new LatchLifeCycleHandler();
+ final BatchEventProcessor p1 = new BatchEventProcessor<Object>(dp, barrier, h1);
+
+ Thread t1 = new Thread(p1);
+ p1.halt();
+ t1.start();
+
+ assertTrue(h1.awaitStart(2, TimeUnit.SECONDS));
+ assertTrue(h1.awaitStop(2, TimeUnit.SECONDS));
+
+ for (int i = 0; i < 1000; i++)
+ {
+ final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
+ final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+ Thread t2 = new Thread(p2);
+ t2.start();
+ p2.halt();
+
+ assertTrue(h2.awaitStart(2, TimeUnit.SECONDS));
+ assertTrue(h2.awaitStop(2, TimeUnit.SECONDS));
+ }
+
+ for (int i = 0; i < 1000; i++)
+ {
+ final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
+ final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+ Thread t2 = new Thread(p2);
+ t2.start();
+ Thread.yield();
+ p2.halt();
+
+ assertTrue(h2.awaitStart(2, TimeUnit.SECONDS));
+ assertTrue(h2.awaitStop(2, TimeUnit.SECONDS));
+ }
+ }
+
+ private static class LatchLifeCycleHandler implements EventHandler<Object>, LifecycleAware
+ {
+ private final CountDownLatch startLatch = new CountDownLatch(1);
+ private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+ @Override
+ public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception
+ {
+
+ }
+
+ @Override
+ public void onStart()
+ {
+ startLatch.countDown();
+ }
+
+ @Override
+ public void onShutdown()
+ {
+ stopLatch.countDown();
+ }
+
+ public boolean awaitStart(long time, TimeUnit unit) throws InterruptedException
+ {
+ return startLatch.await(time, unit);
+ }
+
+
+ public boolean awaitStop(long time, TimeUnit unit) throws InterruptedException
+ {
+ return stopLatch.await(time, unit);
+ }
+ }
}
View it on GitLab: https://salsa.debian.org/java-team/disruptor/compare/c26cf52ca4f2c39e3994959ee800bba6b5062b60...2b92f9b235ea7ce41b6ec941c2a1451c3b9c9214
--
View it on GitLab: https://salsa.debian.org/java-team/disruptor/compare/c26cf52ca4f2c39e3994959ee800bba6b5062b60...2b92f9b235ea7ce41b6ec941c2a1451c3b9c9214
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20180604/c2cc7d25/attachment.html>
More information about the pkg-java-commits
mailing list