[disruptor] 01/02: New upstream version 3.3.6
Tony Mancill
tmancill at moszumanska.debian.org
Mon Oct 10 02:54:39 UTC 2016
This is an automated email from the git hooks/post-receive script.
tmancill pushed a commit to branch master
in repository disruptor.
commit d3e725b50ab91f48dbe8cc88b03fd551e9e1ffb0
Author: tony mancill <tmancill at debian.org>
Date: Sun Oct 9 19:44:01 2016 -0700
New upstream version 3.3.6
---
README.md | 12 ++
build.gradle | 2 +-
.../lmax/disruptor/SingleProducerSequencer.java | 4 +
.../java/com/lmax/disruptor/dsl/Disruptor.java | 53 +++++-
.../com/lmax/disruptor/dsl/EventHandlerGroup.java | 5 +-
.../OneToOneTranslatorThroughputTest.java | 13 +-
.../com/lmax/disruptor/DisruptorStressTest.java | 11 +-
.../disruptor/ShutdownOnFatalExceptionTest.java | 4 +-
...ruptorStressTest.java => WorkerStressTest.java} | 46 ++---
.../java/com/lmax/disruptor/dsl/DisruptorTest.java | 211 ++++++++++++++++++---
.../{StubExecutor.java => StubThreadFactory.java} | 27 ++-
.../example/MultiProducerWithTranslator.java | 11 +-
.../lmax/disruptor/example/WaitForShutdown.java | 65 +++++++
13 files changed, 372 insertions(+), 92 deletions(-)
diff --git a/README.md b/README.md
index 330435b..ce0118e 100644
--- a/README.md
+++ b/README.md
@@ -13,6 +13,18 @@ A High Performance Inter-Thread Messaging Library
## Changelog
+### 3.3.6
+
+- Support adding gating sequences before calling Disruptor.start()
+- Fix minor concurrency race when dynamically adding sequences
+- Fix wrapping problem when adding work handlers to the Disruptor
+
+### 3.3.5
+
+- Fix NPE in TimeoutBlockingWaitStrategy when used with WorkProcessor
+- Add LiteTimeoutBlockingWaitStrategy
+- Resignal any waiting threads when trying to publish to a full ring buffer
+
### 3.3.4
- Small build fixes and refactorings
diff --git a/build.gradle b/build.gradle
index 0a12475..840b791 100644
--- a/build.gradle
+++ b/build.gradle
@@ -24,7 +24,7 @@ apply plugin: 'idea'
defaultTasks 'build'
group = 'com.lmax'
-version = new Version(major: 3, minor: 3, revision: 5)
+version = new Version(major: 3, minor: 3, revision: 6)
ext {
fullName = 'Disruptor Framework'
diff --git a/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java b/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java
index 50d2bea..35b1b15 100644
--- a/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java
+++ b/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java
@@ -79,6 +79,8 @@ public final class SingleProducerSequencer extends SingleProducerSequencerFields
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
+ cursor.setVolatile(nextValue); // StoreLoad fence
+
long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
this.cachedValue = minSequence;
@@ -119,6 +121,8 @@ public final class SingleProducerSequencer extends SingleProducerSequencerFields
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
+ cursor.setVolatile(nextValue); // StoreLoad fence
+
long minSequence;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
diff --git a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java
index eaaca72..c9d8f8f 100644
--- a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java
+++ b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java
@@ -134,9 +134,9 @@ public class Disruptor<T>
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
- this(RingBuffer.create(
- producerType, eventFactory, ringBufferSize, waitStrategy),
- new BasicExecutor(threadFactory));
+ this(
+ RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
+ new BasicExecutor(threadFactory));
}
/**
@@ -204,6 +204,15 @@ public class Disruptor<T>
{
consumerRepository.add(processor);
}
+
+ Sequence[] sequences = new Sequence[processors.length];
+ for (int i = 0; i < processors.length; i++)
+ {
+ sequences[i] = processors[i].getSequence();
+ }
+
+ ringBuffer.addGatingSequences(sequences);
+
return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
}
@@ -349,9 +358,6 @@ public class Disruptor<T>
*/
public RingBuffer<T> start()
{
- final Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
- ringBuffer.addGatingSequences(gatingSequences);
-
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
@@ -473,6 +479,17 @@ public class Disruptor<T>
}
/**
+ * Gets the sequence value for the specified event handlers.
+ *
+ * @param b1
+ * @return
+ */
+ public long getSequenceValueFor(EventHandler<T> b1)
+ {
+ return consumerRepository.getSequenceFor(b1).get();
+ }
+
+ /**
* Confirms if all messages have been consumed by all event processors
*/
private boolean hasBacklog()
@@ -513,12 +530,22 @@ public class Disruptor<T>
processorSequences[i] = batchEventProcessor.getSequence();
}
+ updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
+
+ return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
+ }
+
+ private void updateGatingSequencesForNextInChain(Sequence[] barrierSequences, Sequence[] processorSequences)
+ {
if (processorSequences.length > 0)
{
+ ringBuffer.addGatingSequences(processorSequences);
+ for (final Sequence barrierSequence : barrierSequences)
+ {
+ ringBuffer.removeGatingSequence(barrierSequence);
+ }
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
-
- return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
EventHandlerGroup<T> createEventProcessors(
@@ -529,6 +556,7 @@ public class Disruptor<T>
{
eventProcessors[i] = processorFactories[i].createEventProcessor(ringBuffer, barrierSequences);
}
+
return handleEventsWith(eventProcessors);
}
@@ -537,8 +565,15 @@ public class Disruptor<T>
{
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
+
+
consumerRepository.add(workerPool, sequenceBarrier);
- return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
+
+ Sequence[] workerSequences = workerPool.getWorkerSequences();
+
+ updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
+
+ return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
}
private void checkNotStarted()
diff --git a/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java b/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
index 0acfc92..c8fe91d 100644
--- a/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
+++ b/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
@@ -54,8 +54,9 @@ public class EventHandlerGroup<T>
{
final Sequence[] combinedSequences = new Sequence[this.sequences.length + otherHandlerGroup.sequences.length];
System.arraycopy(this.sequences, 0, combinedSequences, 0, this.sequences.length);
- System
- .arraycopy(otherHandlerGroup.sequences, 0, combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length);
+ System.arraycopy(
+ otherHandlerGroup.sequences, 0,
+ combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length);
return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences);
}
diff --git a/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java b/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java
index 94863c8..3c483a8 100644
--- a/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java
+++ b/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java
@@ -15,12 +15,6 @@
*/
package com.lmax.disruptor.translator;
-import static com.lmax.disruptor.support.PerfTestUtil.failIfNot;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
import com.lmax.disruptor.AbstractPerfTestDisruptor;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
@@ -33,6 +27,10 @@ import com.lmax.disruptor.support.ValueEvent;
import com.lmax.disruptor.util.DaemonThreadFactory;
import com.lmax.disruptor.util.MutableLong;
+import java.util.concurrent.CountDownLatch;
+
+import static com.lmax.disruptor.support.PerfTestUtil.failIfNot;
+
/**
* <pre>
* UniCast a series of items between 1 publisher and 1 event processor using the EventTranslator API
@@ -66,7 +64,6 @@ public final class OneToOneTranslatorThroughputTest extends AbstractPerfTestDisr
{
private static final int BUFFER_SIZE = 1024 * 64;
private static final long ITERATIONS = 1000L * 1000L * 100L;
- private final ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.INSTANCE);
private final long expectedResult = PerfTestUtil.accumulatedAddition(ITERATIONS);
private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
private final RingBuffer<ValueEvent> ringBuffer;
@@ -80,7 +77,7 @@ public final class OneToOneTranslatorThroughputTest extends AbstractPerfTestDisr
Disruptor<ValueEvent> disruptor =
new Disruptor<ValueEvent>(
ValueEvent.EVENT_FACTORY,
- BUFFER_SIZE, executor,
+ BUFFER_SIZE, DaemonThreadFactory.INSTANCE,
ProducerType.SINGLE,
new YieldingWaitStrategy());
disruptor.handleEventsWith(handler);
diff --git a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java b/src/test/java/com/lmax/disruptor/DisruptorStressTest.java
index 0d885fb..a066b3c 100644
--- a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java
+++ b/src/test/java/com/lmax/disruptor/DisruptorStressTest.java
@@ -2,6 +2,7 @@ package com.lmax.disruptor;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
@@ -23,14 +24,14 @@ public class DisruptorStressTest
public void shouldHandleLotsOfThreads() throws Exception
{
Disruptor<TestEvent> disruptor = new Disruptor<TestEvent>(
- TestEvent.FACTORY, 1 << 16, executor,
- ProducerType.MULTI, new BusySpinWaitStrategy());
+ TestEvent.FACTORY, 1 << 16, DaemonThreadFactory.INSTANCE,
+ ProducerType.MULTI, new BusySpinWaitStrategy());
RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
disruptor.setDefaultExceptionHandler(new FatalExceptionHandler());
int threads = max(1, Runtime.getRuntime().availableProcessors() / 2);
- int iterations = 20000000;
+ int iterations = 200000;
int publisherCount = threads;
int handlerCount = threads;
@@ -177,9 +178,9 @@ public class DisruptorStressTest
public static final EventFactory<TestEvent> FACTORY = new EventFactory<DisruptorStressTest.TestEvent>()
{
@Override
- public TestEvent newInstance()
+ public DisruptorStressTest.TestEvent newInstance()
{
- return new TestEvent();
+ return new DisruptorStressTest.TestEvent();
}
};
}
diff --git a/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java b/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java
index 327c9e2..7a26e6e 100644
--- a/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java
+++ b/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java
@@ -2,12 +2,12 @@ package com.lmax.disruptor;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Random;
-import java.util.concurrent.Executors;
public class ShutdownOnFatalExceptionTest
{
@@ -23,7 +23,7 @@ public class ShutdownOnFatalExceptionTest
public void setUp()
{
disruptor = new Disruptor<byte[]>(
- new ByteArrayFactory(256), 1024, Executors.newCachedThreadPool(), ProducerType.SINGLE,
+ new ByteArrayFactory(256), 1024, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE,
new BlockingWaitStrategy());
disruptor.handleEventsWith(eventHandler);
disruptor.setDefaultExceptionHandler(new FatalExceptionHandler());
diff --git a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java b/src/test/java/com/lmax/disruptor/WorkerStressTest.java
similarity index 76%
copy from src/test/java/com/lmax/disruptor/DisruptorStressTest.java
copy to src/test/java/com/lmax/disruptor/WorkerStressTest.java
index 0d885fb..ca5e4c3 100644
--- a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java
+++ b/src/test/java/com/lmax/disruptor/WorkerStressTest.java
@@ -2,6 +2,7 @@ package com.lmax.disruptor;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
@@ -15,7 +16,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
-public class DisruptorStressTest
+public class WorkerStressTest
{
private final ExecutorService executor = Executors.newCachedThreadPool();
@@ -23,23 +24,25 @@ public class DisruptorStressTest
public void shouldHandleLotsOfThreads() throws Exception
{
Disruptor<TestEvent> disruptor = new Disruptor<TestEvent>(
- TestEvent.FACTORY, 1 << 16, executor,
- ProducerType.MULTI, new BusySpinWaitStrategy());
+ TestEvent.FACTORY, 1 << 16, DaemonThreadFactory.INSTANCE,
+ ProducerType.MULTI, new SleepingWaitStrategy());
RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
disruptor.setDefaultExceptionHandler(new FatalExceptionHandler());
int threads = max(1, Runtime.getRuntime().availableProcessors() / 2);
- int iterations = 20000000;
+ int iterations = 200000;
int publisherCount = threads;
int handlerCount = threads;
CyclicBarrier barrier = new CyclicBarrier(publisherCount);
CountDownLatch latch = new CountDownLatch(publisherCount);
- TestEventHandler[] handlers = initialise(disruptor, new TestEventHandler[handlerCount]);
+ TestWorkHandler[] handlers = initialise(new TestWorkHandler[handlerCount]);
Publisher[] publishers = initialise(new Publisher[publisherCount], ringBuffer, iterations, barrier, latch);
+ disruptor.handleEventsWithWorkerPool(handlers);
+
disruptor.start();
for (Publisher publisher : publishers)
@@ -60,10 +63,9 @@ public class DisruptorStressTest
assertThat(publisher.failed, is(false));
}
- for (TestEventHandler handler : handlers)
+ for (TestWorkHandler handler : handlers)
{
- assertThat(handler.messagesSeen, is(not(0)));
- assertThat(handler.failureCount, is(0));
+ assertThat(handler.seen, is(not(0)));
}
}
@@ -80,39 +82,25 @@ public class DisruptorStressTest
}
@SuppressWarnings("unchecked")
- private TestEventHandler[] initialise(Disruptor<TestEvent> disruptor, TestEventHandler[] testEventHandlers)
+ private TestWorkHandler[] initialise(TestWorkHandler[] testEventHandlers)
{
for (int i = 0; i < testEventHandlers.length; i++)
{
- TestEventHandler handler = new TestEventHandler();
- disruptor.handleEventsWith(handler);
+ TestWorkHandler handler = new TestWorkHandler();
testEventHandlers[i] = handler;
}
return testEventHandlers;
}
- private static class TestEventHandler implements EventHandler<TestEvent>
+ private static class TestWorkHandler implements WorkHandler<TestEvent>
{
- public int failureCount = 0;
- public int messagesSeen = 0;
-
- public TestEventHandler()
- {
- }
+ private int seen;
@Override
- public void onEvent(TestEvent event, long sequence, boolean endOfBatch) throws Exception
+ public void onEvent(TestEvent event) throws Exception
{
- if (event.sequence != sequence ||
- event.a != sequence + 13 ||
- event.b != sequence - 7 ||
- !("wibble-" + sequence).equals(event.s))
- {
- failureCount++;
- }
-
- messagesSeen++;
+ seen++;
}
}
@@ -174,7 +162,7 @@ public class DisruptorStressTest
public long b;
public String s;
- public static final EventFactory<TestEvent> FACTORY = new EventFactory<DisruptorStressTest.TestEvent>()
+ public static final EventFactory<TestEvent> FACTORY = new EventFactory<WorkerStressTest.TestEvent>()
{
@Override
public TestEvent newInstance()
diff --git a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
index ea008d8..42511fa 100644
--- a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
+++ b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
@@ -32,8 +32,8 @@ import com.lmax.disruptor.dsl.stubs.EvilEqualsEventHandler;
import com.lmax.disruptor.dsl.stubs.ExceptionThrowingEventHandler;
import com.lmax.disruptor.dsl.stubs.SleepingEventHandler;
import com.lmax.disruptor.dsl.stubs.StubExceptionHandler;
-import com.lmax.disruptor.dsl.stubs.StubExecutor;
import com.lmax.disruptor.dsl.stubs.StubPublisher;
+import com.lmax.disruptor.dsl.stubs.StubThreadFactory;
import com.lmax.disruptor.dsl.stubs.TestWorkHandler;
import com.lmax.disruptor.support.TestEvent;
import org.junit.After;
@@ -44,7 +44,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static java.lang.Thread.yield;
@@ -56,15 +57,17 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
@SuppressWarnings(value = {"unchecked"})
public class DisruptorTest
{
private static final int TIMEOUT_IN_SECONDS = 2;
- private Disruptor<TestEvent> disruptor;
- private StubExecutor executor;
+
private final Collection<DelayedEventHandler> delayedEventHandlers = new ArrayList<DelayedEventHandler>();
private final Collection<TestWorkHandler> testWorkHandlers = new ArrayList<TestWorkHandler>();
+ private Disruptor<TestEvent> disruptor;
+ private StubThreadFactory executor;
private RingBuffer<TestEvent> ringBuffer;
private TestEvent lastPublishedEvent;
@@ -91,6 +94,119 @@ public class DisruptorTest
}
@Test
+ public void shouldProcessMessagesPublishedBeforeStartIsCalled() throws Exception
+ {
+ final CountDownLatch eventCounter = new CountDownLatch(0);
+ disruptor.handleEventsWith(new EventHandler<TestEvent>()
+ {
+ @Override
+ public void onEvent(final TestEvent event, final long sequence, final boolean endOfBatch) throws Exception
+ {
+ eventCounter.countDown();
+ }
+ });
+
+ disruptor.publishEvent(
+ new EventTranslator<TestEvent>()
+ {
+ @Override
+ public void translateTo(final TestEvent event, final long sequence)
+ {
+ lastPublishedEvent = event;
+ }
+ });
+
+ disruptor.start();
+
+ disruptor.publishEvent(
+ new EventTranslator<TestEvent>()
+ {
+ @Override
+ public void translateTo(final TestEvent event, final long sequence)
+ {
+ lastPublishedEvent = event;
+ }
+ });
+
+ if (!eventCounter.await(5, TimeUnit.SECONDS))
+ {
+ fail("Did not process event published before start was called. Missed events: " + eventCounter.getCount());
+ }
+ }
+
+ @Test
+ public void shouldAddEventProcessorsAfterPublishing() throws Exception
+ {
+ RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
+ BatchEventProcessor<TestEvent> b1 = new BatchEventProcessor<TestEvent>(
+ rb, rb.newBarrier(), new SleepingEventHandler());
+ BatchEventProcessor<TestEvent> b2 = new BatchEventProcessor<TestEvent>(
+ rb, rb.newBarrier(b1.getSequence()), new SleepingEventHandler());
+ BatchEventProcessor<TestEvent> b3 = new BatchEventProcessor<TestEvent>(
+ rb, rb.newBarrier(b2.getSequence()), new SleepingEventHandler());
+
+ assertThat(b1.getSequence().get(), is(-1L));
+ assertThat(b2.getSequence().get(), is(-1L));
+ assertThat(b3.getSequence().get(), is(-1L));
+
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+
+ disruptor.handleEventsWith(b1, b2, b3);
+
+ assertThat(b1.getSequence().get(), is(5L));
+ assertThat(b2.getSequence().get(), is(5L));
+ assertThat(b3.getSequence().get(), is(5L));
+ }
+
+ @Test
+ public void shouldSetSequenceForHandlerIfAddedAfterPublish() throws Exception
+ {
+ RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
+ EventHandler<TestEvent> b1 = new SleepingEventHandler();
+ EventHandler<TestEvent> b2 = new SleepingEventHandler();
+ EventHandler<TestEvent> b3 = new SleepingEventHandler();
+
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+
+ disruptor.handleEventsWith(b1, b2, b3);
+
+ assertThat(disruptor.getSequenceValueFor(b1), is(5L));
+ assertThat(disruptor.getSequenceValueFor(b2), is(5L));
+ assertThat(disruptor.getSequenceValueFor(b3), is(5L));
+ }
+
+ @Test
+ public void shouldSetSequenceForWorkProcessorIfAddedAfterPublish() throws Exception
+ {
+ RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
+ TestWorkHandler wh1 = createTestWorkHandler();
+ TestWorkHandler wh2 = createTestWorkHandler();
+ TestWorkHandler wh3 = createTestWorkHandler();
+
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+ rb.publish(rb.next());
+
+ disruptor.handleEventsWithWorkerPool(wh1, wh2, wh3);
+
+ assertThat(disruptor.getRingBuffer().getMinimumGatingSequence(), is(5L));
+ }
+
+
+ @Test
public void shouldCreateEventProcessorGroupForFirstEventProcessors()
throws Exception
{
@@ -132,6 +248,29 @@ public class DisruptorTest
}
@Test
+ public void should()
+ throws Exception
+ {
+ RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
+ BatchEventProcessor<TestEvent> b1 = new BatchEventProcessor<TestEvent>(
+ rb, rb.newBarrier(), new SleepingEventHandler());
+ EventProcessorFactory<TestEvent> b2 = new EventProcessorFactory<TestEvent>()
+ {
+ @Override
+ public EventProcessor createEventProcessor(
+ RingBuffer<TestEvent> ringBuffer, Sequence[] barrierSequences)
+ {
+ return new BatchEventProcessor<TestEvent>(
+ ringBuffer, ringBuffer.newBarrier(barrierSequences), new SleepingEventHandler());
+ }
+ };
+
+ disruptor.handleEventsWith(b1).then(b2);
+
+ disruptor.start();
+ }
+
+ @Test
public void shouldAllowSpecifyingSpecificEventProcessorsToWaitFor()
throws Exception
{
@@ -270,7 +409,7 @@ public class DisruptorTest
final StubPublisher stubPublisher = new StubPublisher(ringBuffer);
try
{
- executor.execute(stubPublisher);
+ executor.newThread(stubPublisher).start();
assertProducerReaches(stubPublisher, 4, true);
@@ -338,8 +477,8 @@ public class DisruptorTest
final BatchEventProcessor<TestEvent> processor =
new BatchEventProcessor<TestEvent>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler);
- disruptor.handleEventsWith(processor);
- disruptor.after(processor).handleEventsWith(handlerWithBarrier);
+
+ disruptor.handleEventsWith(processor).then(handlerWithBarrier);
ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
@@ -399,6 +538,28 @@ public class DisruptorTest
workHandler2.processEvent();
}
+
+ @Test
+ public void shouldProvideEventsMultipleWorkHandlers() throws Exception
+ {
+ final TestWorkHandler workHandler1 = createTestWorkHandler();
+ final TestWorkHandler workHandler2 = createTestWorkHandler();
+ final TestWorkHandler workHandler3 = createTestWorkHandler();
+ final TestWorkHandler workHandler4 = createTestWorkHandler();
+ final TestWorkHandler workHandler5 = createTestWorkHandler();
+ final TestWorkHandler workHandler6 = createTestWorkHandler();
+ final TestWorkHandler workHandler7 = createTestWorkHandler();
+ final TestWorkHandler workHandler8 = createTestWorkHandler();
+
+ disruptor
+ .handleEventsWithWorkerPool(workHandler1, workHandler2)
+ .thenHandleEventsWithWorkerPool(workHandler3, workHandler4);
+ disruptor
+ .handleEventsWithWorkerPool(workHandler5, workHandler6)
+ .thenHandleEventsWithWorkerPool(workHandler7, workHandler8);
+ }
+
+
@Test
public void shouldSupportUsingWorkerPoolAsDependency() throws Exception
{
@@ -461,8 +622,10 @@ public class DisruptorTest
final TestWorkHandler workHandler1 = createTestWorkHandler();
final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler();
final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler();
- disruptor.handleEventsWith(delayedEventHandler1).and(disruptor.handleEventsWithWorkerPool(workHandler1)).then(
- delayedEventHandler2);
+ disruptor
+ .handleEventsWith(delayedEventHandler1)
+ .and(disruptor.handleEventsWithWorkerPool(workHandler1))
+ .then(delayedEventHandler2);
publishEvent();
publishEvent();
@@ -549,18 +712,18 @@ public class DisruptorTest
final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);
disruptor.handleEventsWith(
- new EventProcessorFactory<TestEvent>()
- {
- @Override
- public EventProcessor createEventProcessor(
- final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
- {
- assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length);
- return new BatchEventProcessor<TestEvent>(
- disruptor.getRingBuffer(), ringBuffer.newBarrier(
- barrierSequences), eventHandler);
- }
- });
+ new EventProcessorFactory<TestEvent>()
+ {
+ @Override
+ public EventProcessor createEventProcessor(
+ final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
+ {
+ assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length);
+ return new BatchEventProcessor<TestEvent>(
+ disruptor.getRingBuffer(), ringBuffer.newBarrier(
+ barrierSequences), eventHandler);
+ }
+ });
ensureTwoEventsProcessedAccordingToDependencies(countDownLatch);
}
@@ -641,14 +804,14 @@ public class DisruptorTest
private void createDisruptor()
{
- executor = new StubExecutor();
+ executor = new StubThreadFactory();
createDisruptor(executor);
}
- private void createDisruptor(final Executor executor)
+ private void createDisruptor(final ThreadFactory threadFactory)
{
disruptor = new Disruptor<TestEvent>(
- TestEvent.EVENT_FACTORY, 4, executor,
+ TestEvent.EVENT_FACTORY, 4, threadFactory,
ProducerType.SINGLE, new BlockingWaitStrategy());
}
diff --git a/src/test/java/com/lmax/disruptor/dsl/stubs/StubExecutor.java b/src/test/java/com/lmax/disruptor/dsl/stubs/StubThreadFactory.java
similarity index 77%
rename from src/test/java/com/lmax/disruptor/dsl/stubs/StubExecutor.java
rename to src/test/java/com/lmax/disruptor/dsl/stubs/StubThreadFactory.java
index 1167340..0e160e2 100644
--- a/src/test/java/com/lmax/disruptor/dsl/stubs/StubExecutor.java
+++ b/src/test/java/com/lmax/disruptor/dsl/stubs/StubThreadFactory.java
@@ -20,27 +20,30 @@ import org.junit.Assert;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-public class StubExecutor implements Executor
+public final class StubThreadFactory implements ThreadFactory
{
private final DaemonThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
private final Collection<Thread> threads = new CopyOnWriteArrayList<Thread>();
private final AtomicBoolean ignoreExecutions = new AtomicBoolean(false);
private final AtomicInteger executionCount = new AtomicInteger(0);
- public void execute(final Runnable command)
+ @Override
+ public Thread newThread(final Runnable command)
{
executionCount.getAndIncrement();
- if (!ignoreExecutions.get())
+ Runnable toExecute = command;
+ if(ignoreExecutions.get())
{
- Thread t = threadFactory.newThread(command);
- t.setName(command.toString());
- threads.add(t);
- t.start();
+ toExecute = new NoOpRunnable();
}
+ final Thread thread = threadFactory.newThread(toExecute);
+ thread.setName(command.toString());
+ threads.add(thread);
+ return thread;
}
public void joinAllThreads()
@@ -75,4 +78,12 @@ public class StubExecutor implements Executor
{
return executionCount.get();
}
+
+ private static final class NoOpRunnable implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ }
+ }
}
diff --git a/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java b/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java
index 71727aa..7102523 100644
--- a/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java
+++ b/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java
@@ -1,10 +1,13 @@
package com.lmax.disruptor.example;
-import com.lmax.disruptor.*;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslatorThreeArg;
+import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
-
-import java.util.concurrent.Executors;
+import com.lmax.disruptor.util.DaemonThreadFactory;
public class MultiProducerWithTranslator
{
@@ -72,7 +75,7 @@ public class MultiProducerWithTranslator
public static void main(String[] args) throws InterruptedException
{
Disruptor<ObjectBox> disruptor = new Disruptor<ObjectBox>(
- ObjectBox.FACTORY, RING_SIZE, Executors.newCachedThreadPool(), ProducerType.MULTI,
+ ObjectBox.FACTORY, RING_SIZE, DaemonThreadFactory.INSTANCE, ProducerType.MULTI,
new BlockingWaitStrategy());
disruptor.handleEventsWith(new Consumer()).then(new Consumer());
final RingBuffer<ObjectBox> ringBuffer = disruptor.getRingBuffer();
diff --git a/src/test/java/com/lmax/disruptor/example/WaitForShutdown.java b/src/test/java/com/lmax/disruptor/example/WaitForShutdown.java
new file mode 100644
index 0000000..ab2ffe0
--- /dev/null
+++ b/src/test/java/com/lmax/disruptor/example/WaitForShutdown.java
@@ -0,0 +1,65 @@
+package com.lmax.disruptor.example;
+
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.support.LongEvent;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class WaitForShutdown
+{
+ private static volatile int value = 0;
+
+ private static class Handler implements EventHandler<LongEvent>, LifecycleAware
+ {
+ private final CountDownLatch latch;
+
+ public Handler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ @Override
+ public void onStart()
+ {
+ }
+
+ @Override
+ public void onShutdown()
+ {
+ latch.countDown();
+ }
+
+ @Override
+ public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception
+ {
+ value = 1;
+ }
+ }
+
+ public static void main(String[] args) throws TimeoutException, InterruptedException
+ {
+ Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
+ LongEvent.FACTORY, 16, DaemonThreadFactory.INSTANCE
+ );
+
+ CountDownLatch shutdownLatch = new CountDownLatch(2);
+
+ disruptor.handleEventsWith(new Handler(shutdownLatch)).then(new Handler(shutdownLatch));
+ disruptor.start();
+
+ long next = disruptor.getRingBuffer().next();
+ disruptor.getRingBuffer().get(next).set(next);
+ disruptor.getRingBuffer().publish(next);
+
+ disruptor.shutdown(10, TimeUnit.SECONDS);
+
+ shutdownLatch.await();
+
+ System.out.println(value);
+ }
+}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-java/disruptor.git
More information about the pkg-java-commits
mailing list