[Git][java-team/disruptor][upstream] New upstream version 3.4.2
Tony Mancill
gitlab at salsa.debian.org
Mon Jun 25 06:18:54 BST 2018
Tony Mancill pushed to branch upstream at Debian Java Maintainers / disruptor
Commits:
fa9451b4 by tony mancill at 2018-06-24T10:54:50-07:00
New upstream version 3.4.2
- - - - -
30 changed files:
- README.md
- build.gradle
- runoffheap.sh
- src/main/java/com/lmax/disruptor/AggregateEventHandler.java
- src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java
- src/main/java/com/lmax/disruptor/BusySpinWaitStrategy.java
- src/main/java/com/lmax/disruptor/EventHandler.java
- src/main/java/com/lmax/disruptor/EventProcessor.java
- − src/main/java/com/lmax/disruptor/Foo.java
- src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java
- src/main/java/com/lmax/disruptor/SequenceGroup.java
- src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
- src/main/java/com/lmax/disruptor/WorkerPool.java
- src/main/java/com/lmax/disruptor/YieldingWaitStrategy.java
- − src/main/java/com/lmax/disruptor/collections/Histogram.java
- src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java
- src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java
- src/main/java/com/lmax/disruptor/dsl/Disruptor.java
- src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
- src/main/java/com/lmax/disruptor/dsl/ExceptionHandlerSetting.java
- + src/main/java/com/lmax/disruptor/util/ThreadHints.java
- src/main/java/com/lmax/disruptor/util/Util.java
- src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java
- src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java
- src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
- src/test/java/com/lmax/disruptor/RingBufferTest.java
- − src/test/java/com/lmax/disruptor/collections/HistogramTest.java
- src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
- + src/test/java/com/lmax/disruptor/example/ShutdownOnError.java
- src/test/java/com/lmax/disruptor/util/UtilTest.java
Changes:
=====================================
README.md
=====================================
--- a/README.md
+++ b/README.md
@@ -15,21 +15,23 @@ A High Performance Inter-Thread Messaging Library
## Changelog
-### 3.3.11
+### 3.4.2
- Fix race condition in BatchEventProcessor with 3 or more starting/halting concurrently.
-### 3.3.10
+### 3.4.1
-- Fix race condition in BatchEventProcessor between run() and halt().
+ - Fix race between run() and halt() on BatchEventProcessor.
-### 3.3.9
+### 3.4.0
-- Change SleepingWaitStrategy to use a parkNanos(100).
+ - Drop support for JDK6, support JDK7 and above only.
+ - Add `ThreadHints.onSpinWait` to all busy spins within Disruptor.
+ - Increase default sleep time for LockSupport.parkNanos to prevent busy spinning.
### 3.3.8
-- Revert belt and braces WaitStategy signalling.
+- Revert belt and braces WaitStrategy signalling.
### 3.3.7
=====================================
build.gradle
=====================================
--- a/build.gradle
+++ b/build.gradle
@@ -24,7 +24,7 @@ apply plugin: 'idea'
defaultTasks 'build'
group = 'com.lmax'
-version = new Version(major: 3, minor: 3, revision: 11)
+version = new Version(major: 3, minor: 4, revision: 2)
ext {
fullName = 'Disruptor Framework'
@@ -61,8 +61,8 @@ idea.module {
scopes.TEST.plus += [ configurations.perfCompile ]
}
-sourceCompatibility = 1.6
-targetCompatibility = 1.6
+sourceCompatibility = 1.7
+targetCompatibility = 1.7
compileJava {
@@ -175,7 +175,7 @@ task perfJar(type: Jar) {
}
task wrapper(type: Wrapper) {
- gradleVersion = '4.2'
+ gradleVersion = '4.3'
}
class Version {
=====================================
runoffheap.sh
=====================================
--- a/runoffheap.sh
+++ b/runoffheap.sh
@@ -14,4 +14,8 @@ echo "Done"
echo "Running OnHeap..."
$BIN $ARGS $GCARGS -Xloggc:custom-gc.log $CPATH com.lmax.disruptor.offheap.OneToOneOnHeapThroughputTest
echo "Done"
+
+echo "Running Sliced OnHeap..."
+$BIN $ARGS $GCARGS -Xloggc:custom-gc.log $CPATH -Dsliced=true com.lmax.disruptor.offheap.OneToOneOnHeapThroughputTest
+echo "Done"
=====================================
src/main/java/com/lmax/disruptor/AggregateEventHandler.java
=====================================
--- a/src/main/java/com/lmax/disruptor/AggregateEventHandler.java
+++ b/src/main/java/com/lmax/disruptor/AggregateEventHandler.java
@@ -30,6 +30,7 @@ public final class AggregateEventHandler<T>
*
* @param eventHandlers to be called in sequence.
*/
+ @SafeVarargs
public AggregateEventHandler(final EventHandler<T>... eventHandlers)
{
this.eventHandlers = eventHandlers;
=====================================
src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java
@@ -19,6 +19,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import com.lmax.disruptor.util.ThreadHints;
+
/**
* Blocking strategy that uses a lock and condition variable for {@link EventProcessor}s waiting on a barrier.
* <p>
@@ -54,6 +56,7 @@ public final class BlockingWaitStrategy implements WaitStrategy
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
+ ThreadHints.onSpinWait();
}
return availableSequence;
=====================================
src/main/java/com/lmax/disruptor/BusySpinWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/BusySpinWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/BusySpinWaitStrategy.java
@@ -16,6 +16,8 @@
package com.lmax.disruptor;
+import com.lmax.disruptor.util.ThreadHints;
+
/**
* Busy Spin strategy that uses a busy spin loop for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier.
* <p>
@@ -34,6 +36,7 @@ public final class BusySpinWaitStrategy implements WaitStrategy
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
+ ThreadHints.onSpinWait();
}
return availableSequence;
=====================================
src/main/java/com/lmax/disruptor/EventHandler.java
=====================================
--- a/src/main/java/com/lmax/disruptor/EventHandler.java
+++ b/src/main/java/com/lmax/disruptor/EventHandler.java
@@ -24,7 +24,12 @@ package com.lmax.disruptor;
public interface EventHandler<T>
{
/**
- * Called when a publisher has published an event to the {@link RingBuffer}
+ * Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will
+ * read messages from the {@link RingBuffer} in batches, where a batch is all of the events available to be
+ * processed without having to wait for any new event to arrive. This can be useful for event handlers that need
+ * to do slower operations like I/O as they can group together the data from multiple events into a single
+ * operation. Implementations should ensure that the operation is always performed when endOfBatch is true as
+ * the time between that message an the next one is inderminate.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
=====================================
src/main/java/com/lmax/disruptor/EventProcessor.java
=====================================
--- a/src/main/java/com/lmax/disruptor/EventProcessor.java
+++ b/src/main/java/com/lmax/disruptor/EventProcessor.java
@@ -16,7 +16,10 @@
package com.lmax.disruptor;
/**
- * EventProcessors waitFor events to become available for consumption from the {@link RingBuffer}
+ * An EventProcessor needs to be an implementation of a runnable that will poll for events from the {@link RingBuffer}
+ * using the appropriate wait strategy. It is unlikely that you will need to implement this interface yourself.
+ * Look at using the {@link EventHandler} interface along with the pre-supplied BatchEventProcessor in the first
+ * instance.
* <p>
* An EventProcessor will generally be associated with a Thread for execution.
*/
=====================================
src/main/java/com/lmax/disruptor/Foo.java deleted
=====================================
--- a/src/main/java/com/lmax/disruptor/Foo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.lmax.disruptor;
-
-/**
- * Created by barkerm on 13/06/17.
- */
-public class Foo
-{
- int a;
- int b;
- short c;
- short d;
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Foo foo = (Foo) o;
-
- if (a != foo.a) return false;
- if (b != foo.b) return false;
- if (c != foo.c) return false;
- return d == foo.d;
- }
-
- @Override
- public int hashCode()
- {
- int result = a;
- result = 31 * result + b;
- result = 31 * result + (int) c;
- result = 31 * result + (int) d;
- return result;
- }
-}
=====================================
src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java
@@ -20,6 +20,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import com.lmax.disruptor.util.ThreadHints;
+
/**
* Variation of the {@link BlockingWaitStrategy} that attempts to elide conditional wake-ups when
* the lock is uncontended. Shows performance improvements on microbenchmarks. However this
@@ -66,6 +68,7 @@ public final class LiteBlockingWaitStrategy implements WaitStrategy
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
+ ThreadHints.onSpinWait();
}
return availableSequence;
=====================================
src/main/java/com/lmax/disruptor/SequenceGroup.java
=====================================
--- a/src/main/java/com/lmax/disruptor/SequenceGroup.java
+++ b/src/main/java/com/lmax/disruptor/SequenceGroup.java
@@ -60,9 +60,9 @@ public final class SequenceGroup extends Sequence
public void set(final long value)
{
final Sequence[] sequences = this.sequences;
- for (int i = 0, size = sequences.length; i < size; i++)
+ for (Sequence sequence : sequences)
{
- sequences[i].set(value);
+ sequence.set(value);
}
}
=====================================
src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
@@ -19,12 +19,14 @@ import java.util.concurrent.locks.LockSupport;
/**
* Sleeping strategy that initially spins, then uses a Thread.yield(), and
- * eventually sleep (<code>LockSupport.parkNanos(1)</code>) for the minimum
+ * eventually sleep (<code>LockSupport.parkNanos(n)</code>) for the minimum
* number of nanos the OS and JVM will allow while the
* {@link com.lmax.disruptor.EventProcessor}s are waiting on a barrier.
* <p>
* This strategy is a good compromise between performance and CPU resource.
- * Latency spikes can occur after quiet periods.
+ * Latency spikes can occur after quiet periods. It will also reduce the impact
+ * on the producing thread as it will not need signal any conditional variables
+ * to wake up the event handling thread.
*/
public final class SleepingWaitStrategy implements WaitStrategy
{
=====================================
src/main/java/com/lmax/disruptor/WorkerPool.java
=====================================
--- a/src/main/java/com/lmax/disruptor/WorkerPool.java
+++ b/src/main/java/com/lmax/disruptor/WorkerPool.java
@@ -45,6 +45,7 @@ public final class WorkerPool<T>
* @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.
* @param workHandlers to distribute the work load across.
*/
+ @SafeVarargs
public WorkerPool(
final RingBuffer<T> ringBuffer,
final SequenceBarrier sequenceBarrier,
@@ -57,7 +58,7 @@ public final class WorkerPool<T>
for (int i = 0; i < numWorkers; i++)
{
- workProcessors[i] = new WorkProcessor<T>(
+ workProcessors[i] = new WorkProcessor<>(
ringBuffer,
sequenceBarrier,
workHandlers[i],
@@ -75,6 +76,7 @@ public final class WorkerPool<T>
* @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.
* @param workHandlers to distribute the work load across.
*/
+ @SafeVarargs
public WorkerPool(
final EventFactory<T> eventFactory,
final ExceptionHandler<? super T> exceptionHandler,
@@ -87,7 +89,7 @@ public final class WorkerPool<T>
for (int i = 0; i < numWorkers; i++)
{
- workProcessors[i] = new WorkProcessor<T>(
+ workProcessors[i] = new WorkProcessor<>(
ringBuffer,
barrier,
workHandlers[i],
=====================================
src/main/java/com/lmax/disruptor/YieldingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/YieldingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/YieldingWaitStrategy.java
@@ -20,7 +20,8 @@ package com.lmax.disruptor;
* Yielding strategy that uses a Thread.yield() for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier
* after an initially spinning.
* <p>
- * This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes.
+ * This strategy will use 100% CPU, but will more readily give up the CPU than a busy spin strategy if other threads
+ * require CPU resource.
*/
public final class YieldingWaitStrategy implements WaitStrategy
{
=====================================
src/main/java/com/lmax/disruptor/collections/Histogram.java deleted
=====================================
--- a/src/main/java/com/lmax/disruptor/collections/Histogram.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Copyright 2011 LMAX Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.lmax.disruptor.collections;
-
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.util.Arrays;
-
-/**
- * <p>Histogram for tracking the frequency of observations of values below interval upper bounds.</p>
- *
- * <p>This class is useful for recording timings across a large number of observations
- * when high performance is required.</p>
- *
- * <p>The interval bounds are used to define the ranges of the histogram buckets. If provided bounds
- * are [10,20,30,40,50] then there will be five buckets, accessible by index 0-4. Any value
- * 0-10 will fall into the first interval bar, values 11-20 will fall into the
- * second bar, and so on.</p>
- */
- at Deprecated
-public final class Histogram
-{
- // tracks the upper intervals of each of the buckets/bars
- private final long[] upperBounds;
- // tracks the count of the corresponding bucket
- private final long[] counts;
- // minimum value so far observed
- private long minValue = Long.MAX_VALUE;
- // maximum value so far observed
- private long maxValue = 0L;
-
- /**
- * Create a new Histogram with a provided list of interval bounds.
- *
- * @param upperBounds of the intervals. Bounds must be provided in order least to greatest, and
- * lowest bound must be greater than or equal to 1.
- * @throws IllegalArgumentException if any of the upper bounds are less than or equal to zero
- * @throws IllegalArgumentException if the bounds are not in order, least to greatest
- */
- public Histogram(final long[] upperBounds)
- {
- validateBounds(upperBounds);
-
- this.upperBounds = Arrays.copyOf(upperBounds, upperBounds.length);
- this.counts = new long[upperBounds.length];
- }
-
- /**
- * Validates the input bounds; used by constructor only.
- */
- private void validateBounds(final long[] upperBounds)
- {
- long lastBound = -1L;
- if (upperBounds.length <= 0)
- {
- throw new IllegalArgumentException("Must provide at least one interval");
- }
- for (final long bound : upperBounds)
- {
- if (bound <= 0L)
- {
- throw new IllegalArgumentException("Bounds must be positive values");
- }
-
- if (bound <= lastBound)
- {
- throw new IllegalArgumentException("bound " + bound + " is not greater than " + lastBound);
- }
-
- lastBound = bound;
- }
- }
-
- /**
- * Size of the list of interval bars (ie: count of interval bars).
- *
- * @return size of the interval bar list.
- */
- public int getSize()
- {
- return upperBounds.length;
- }
-
- /**
- * Get the upper bound of an interval for an index.
- *
- * @param index of the upper bound.
- * @return the interval upper bound for the index.
- */
- public long getUpperBoundAt(final int index)
- {
- return upperBounds[index];
- }
-
- /**
- * Get the count of observations at a given index.
- *
- * @param index of the observations counter.
- * @return the count of observations at a given index.
- */
- public long getCountAt(final int index)
- {
- return counts[index];
- }
-
- /**
- * Add an observation to the histogram and increment the counter for the interval it matches.
- *
- * @param value for the observation to be added.
- * @return return true if in the range of intervals and successfully added observation; otherwise false.
- */
- public boolean addObservation(final long value)
- {
- int low = 0;
- int high = upperBounds.length - 1;
-
- // do a classic binary search to find the high value
- while (low < high)
- {
- int mid = low + ((high - low) >> 1);
- if (upperBounds[mid] < value)
- {
- low = mid + 1;
- }
- else
- {
- high = mid;
- }
- }
-
- // if the binary search found an eligible bucket, increment
- if (value <= upperBounds[high])
- {
- counts[high]++;
- trackRange(value);
-
- return true;
- }
-
- // otherwise value was not found
- return false;
- }
-
- /**
- * Track minimum and maximum observations
- */
- private void trackRange(final long value)
- {
- if (value < minValue)
- {
- minValue = value;
- }
-
- if (value > maxValue)
- {
- maxValue = value;
- }
- }
-
- /**
- * <p>Add observations from another Histogram into this one.</p>
- *
- * <p>Histograms must have the same intervals.</p>
- *
- * @param histogram from which to add the observation counts.
- * @throws IllegalArgumentException if interval count or values do not match exactly
- */
- public void addObservations(final Histogram histogram)
- {
- // validate the intervals
- if (upperBounds.length != histogram.upperBounds.length)
- {
- throw new IllegalArgumentException("Histograms must have matching intervals");
- }
-
- for (int i = 0, size = upperBounds.length; i < size; i++)
- {
- if (upperBounds[i] != histogram.upperBounds[i])
- {
- throw new IllegalArgumentException("Histograms must have matching intervals");
- }
- }
-
- // increment all of the internal counts
- for (int i = 0, size = counts.length; i < size; i++)
- {
- counts[i] += histogram.counts[i];
- }
-
- // refresh the minimum and maximum observation ranges
- trackRange(histogram.minValue);
- trackRange(histogram.maxValue);
- }
-
- /**
- * Clear the list of interval counters
- */
- public void clear()
- {
- maxValue = 0L;
- minValue = Long.MAX_VALUE;
-
- for (int i = 0, size = counts.length; i < size; i++)
- {
- counts[i] = 0L;
- }
- }
-
- /**
- * Count total number of recorded observations.
- *
- * @return the total number of recorded observations.
- */
- public long getCount()
- {
- long count = 0L;
-
- for (int i = 0, size = counts.length; i < size; i++)
- {
- count += counts[i];
- }
-
- return count;
- }
-
- /**
- * Get the minimum observed value.
- *
- * @return the minimum value observed.
- */
- public long getMin()
- {
- return minValue;
- }
-
- /**
- * Get the maximum observed value.
- *
- * @return the maximum of the observed values;
- */
- public long getMax()
- {
- return maxValue;
- }
-
- /**
- * <p>Calculate the mean of all recorded observations.</p>
- *
- * <p>The mean is calculated by summing the mid points of each interval multiplied by the count
- * for that interval, then dividing by the total count of observations. The max and min are
- * considered for adjusting the top and bottom bin when calculating the mid point, this
- * minimises skew if the observed values are very far away from the possible histogram values.</p>
- *
- * @return the mean of all recorded observations.
- */
- public BigDecimal getMean()
- {
- // early exit to avoid divide by zero later
- if (0L == getCount())
- {
- return BigDecimal.ZERO;
- }
-
- // precalculate the initial lower bound; needed in the loop
- long lowerBound = counts[0] > 0L ? minValue : 0L;
- // use BigDecimal to avoid precision errors
- BigDecimal total = BigDecimal.ZERO;
-
- // midpoint is calculated as the average between the lower and upper bound
- // (after taking into account the min & max values seen)
- // then, simply multiply midpoint by the count of values at the interval (intervalTotal)
- // and add to running total (total)
- for (int i = 0, size = upperBounds.length; i < size; i++)
- {
- if (0L != counts[i])
- {
- long upperBound = Math.min(upperBounds[i], maxValue);
- long midPoint = lowerBound + ((upperBound - lowerBound) / 2L);
-
- BigDecimal intervalTotal = new BigDecimal(midPoint).multiply(new BigDecimal(counts[i]));
- total = total.add(intervalTotal);
- }
-
- // and recalculate the lower bound for the next time around the loop
- lowerBound = Math.max(upperBounds[i] + 1L, minValue);
- }
-
- return total.divide(new BigDecimal(getCount()), 2, RoundingMode.HALF_UP);
- }
-
- /**
- * Calculate the upper bound within which 99% of observations fall.
- *
- * @return the upper bound for 99% of observations.
- */
- public long getTwoNinesUpperBound()
- {
- return getUpperBoundForFactor(0.99d);
- }
-
- /**
- * Calculate the upper bound within which 99.99% of observations fall.
- *
- * @return the upper bound for 99.99% of observations.
- */
- public long getFourNinesUpperBound()
- {
- return getUpperBoundForFactor(0.9999d);
- }
-
- /**
- * <p>Get the interval upper bound for a given factor of the observation population.</p>
- *
- * <p>Note this does not get the actual percentile measurement, it only gets the bucket</p>
- *
- * @param factor representing the size of the population.
- * @return the interval upper bound.
- * @throws IllegalArgumentException if factor < 0.0 or factor > 1.0
- */
- public long getUpperBoundForFactor(final double factor)
- {
- if (0.0d >= factor || factor >= 1.0d)
- {
- throw new IllegalArgumentException("factor must be >= 0.0 and <= 1.0");
- }
-
- final long totalCount = getCount();
- final long tailTotal = totalCount - Math.round(totalCount * factor);
- long tailCount = 0L;
-
- // reverse search the intervals ('tailCount' from end)
- for (int i = counts.length - 1; i >= 0; i--)
- {
- if (0L != counts[i])
- {
- tailCount += counts[i];
- if (tailCount >= tailTotal)
- {
- return upperBounds[i];
- }
- }
- }
-
- return 0L;
- }
-
- @Override
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
-
- sb.append("Histogram{");
-
- sb.append("min=").append(getMin()).append(", ");
- sb.append("max=").append(getMax()).append(", ");
- sb.append("mean=").append(getMean()).append(", ");
- sb.append("99%=").append(getTwoNinesUpperBound()).append(", ");
- sb.append("99.99%=").append(getFourNinesUpperBound()).append(", ");
-
- sb.append('[');
- for (int i = 0, size = counts.length; i < size; i++)
- {
- sb.append(upperBounds[i]).append('=').append(counts[i]).append(", ");
- }
-
- if (counts.length > 0)
- {
- sb.setLength(sb.length() - 2);
- }
- sb.append(']');
-
- sb.append('}');
-
- return sb.toString();
- }
-}
=====================================
src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java
=====================================
--- a/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java
+++ b/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java
@@ -11,7 +11,7 @@ import java.util.concurrent.ThreadFactory;
public class BasicExecutor implements Executor
{
private final ThreadFactory factory;
- private final Queue<Thread> threads = new ConcurrentLinkedQueue<Thread>();
+ private final Queue<Thread> threads = new ConcurrentLinkedQueue<>();
public BasicExecutor(ThreadFactory factory)
{
=====================================
src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java
=====================================
--- a/src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java
+++ b/src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java
@@ -27,17 +27,17 @@ import java.util.*;
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
- new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
+ new IdentityHashMap<>();
private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
- new IdentityHashMap<Sequence, ConsumerInfo>();
- private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
+ new IdentityHashMap<>();
+ private final Collection<ConsumerInfo> consumerInfos = new ArrayList<>();
public void add(
final EventProcessor eventprocessor,
final EventHandler<? super T> handler,
final SequenceBarrier barrier)
{
- final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
+ final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<>(eventprocessor, handler, barrier);
eventProcessorInfoByEventHandler.put(handler, consumerInfo);
eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
consumerInfos.add(consumerInfo);
@@ -45,14 +45,14 @@ class ConsumerRepository<T> implements Iterable<ConsumerInfo>
public void add(final EventProcessor processor)
{
- final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(processor, null, null);
+ final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<>(processor, null, null);
eventProcessorInfoBySequence.put(processor.getSequence(), consumerInfo);
consumerInfos.add(consumerInfo);
}
public void add(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier)
{
- final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<T>(workerPool, sequenceBarrier);
+ final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<>(workerPool, sequenceBarrier);
consumerInfos.add(workerPoolInfo);
for (Sequence sequence : workerPool.getWorkerSequences())
{
@@ -62,7 +62,7 @@ class ConsumerRepository<T> implements Iterable<ConsumerInfo>
public Sequence[] getLastSequenceInChain(boolean includeStopped)
{
- List<Sequence> lastSequence = new ArrayList<Sequence>();
+ List<Sequence> lastSequence = new ArrayList<>();
for (ConsumerInfo consumerInfo : consumerInfos)
{
if ((includeStopped || consumerInfo.isRunning()) && consumerInfo.isEndOfChain())
=====================================
src/main/java/com/lmax/disruptor/dsl/Disruptor.java
=====================================
--- a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java
+++ b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java
@@ -60,9 +60,9 @@ public class Disruptor<T>
{
private final RingBuffer<T> ringBuffer;
private final Executor executor;
- private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
+ private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
- private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();
+ private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>();
/**
* Create a new Disruptor. Will default to {@link com.lmax.disruptor.BlockingWaitStrategy} and
@@ -155,11 +155,14 @@ public class Disruptor<T>
* process events before handler <code>B</code>:</p>
* <pre><code>dw.handleEventsWith(A).then(B);</code></pre>
*
+ * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>
+ *
* @param handlers the event handlers that will process events.
* @return a {@link EventHandlerGroup} that can be used to chain dependencies.
*/
@SuppressWarnings("varargs")
- public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
+ @SafeVarargs
+ public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
@@ -177,10 +180,13 @@ public class Disruptor<T>
* {@link EventHandlerGroup#handleEventsWith(EventProcessorFactory...)} and {@link EventHandlerGroup#then(EventProcessorFactory...)}
* which do have barrier sequences to provide.</p>
*
+ * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>
+ *
* @param eventProcessorFactories the event processor factories to use to create the event processors that will process events.
* @return a {@link EventHandlerGroup} that can be used to chain dependencies.
*/
- public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
+ @SafeVarargs
+ public final EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
{
final Sequence[] barrierSequences = new Sequence[0];
return createEventProcessors(barrierSequences, eventProcessorFactories);
@@ -212,7 +218,7 @@ public class Disruptor<T>
ringBuffer.addGatingSequences(sequences);
- return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
+ return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors));
}
@@ -224,8 +230,9 @@ public class Disruptor<T>
* @param workHandlers the work handlers that will process events.
* @return a {@link EventHandlerGroup} that can be used to chain dependencies.
*/
+ @SafeVarargs
@SuppressWarnings("varargs")
- public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
+ public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
{
return createWorkerPool(new Sequence[0], workHandlers);
}
@@ -270,7 +277,7 @@ public class Disruptor<T>
*/
public ExceptionHandlerSetting<T> handleExceptionsFor(final EventHandler<T> eventHandler)
{
- return new ExceptionHandlerSetting<T>(eventHandler, consumerRepository);
+ return new ExceptionHandlerSetting<>(eventHandler, consumerRepository);
}
/**
@@ -283,8 +290,9 @@ public class Disruptor<T>
* that will form the barrier for subsequent handlers or processors.
* @return an {@link EventHandlerGroup} that can be used to setup a dependency barrier over the specified event handlers.
*/
+ @SafeVarargs
@SuppressWarnings("varargs")
- public EventHandlerGroup<T> after(final EventHandler<T>... handlers)
+ public final EventHandlerGroup<T> after(final EventHandler<T>... handlers)
{
final Sequence[] sequences = new Sequence[handlers.length];
for (int i = 0, handlersLength = handlers.length; i < handlersLength; i++)
@@ -292,7 +300,7 @@ public class Disruptor<T>
sequences[i] = consumerRepository.getSequenceFor(handlers[i]);
}
- return new EventHandlerGroup<T>(this, consumerRepository, sequences);
+ return new EventHandlerGroup<>(this, consumerRepository, sequences);
}
/**
@@ -310,7 +318,7 @@ public class Disruptor<T>
consumerRepository.add(processor);
}
- return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
+ return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors));
}
/**
@@ -551,7 +559,7 @@ public class Disruptor<T>
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
- new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
+ new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
@@ -564,7 +572,7 @@ public class Disruptor<T>
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
- return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
+ return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
@@ -596,7 +604,7 @@ public class Disruptor<T>
final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
- final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
+ final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
consumerRepository.add(workerPool, sequenceBarrier);
@@ -605,7 +613,7 @@ public class Disruptor<T>
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
- return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
+ return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
}
private void checkNotStarted()
=====================================
src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
=====================================
--- a/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
+++ b/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
@@ -57,7 +57,7 @@ public class EventHandlerGroup<T>
System.arraycopy(
otherHandlerGroup.sequences, 0,
combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length);
- return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences);
+ return new EventHandlerGroup<>(disruptor, consumerRepository, combinedSequences);
}
/**
@@ -77,7 +77,7 @@ public class EventHandlerGroup<T>
}
System.arraycopy(sequences, 0, combinedSequences, processors.length, sequences.length);
- return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences);
+ return new EventHandlerGroup<>(disruptor, consumerRepository, combinedSequences);
}
/**
@@ -92,7 +92,8 @@ public class EventHandlerGroup<T>
* @param handlers the batch handlers that will process events.
* @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
*/
- public EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
+ @SafeVarargs
+ public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
{
return handleEventsWith(handlers);
}
@@ -107,7 +108,8 @@ public class EventHandlerGroup<T>
* @param eventProcessorFactories the event processor factories to use to create the event processors that will process events.
* @return a {@link EventHandlerGroup} that can be used to chain dependencies.
*/
- public EventHandlerGroup<T> then(final EventProcessorFactory<T>... eventProcessorFactories)
+ @SafeVarargs
+ public final EventHandlerGroup<T> then(final EventProcessorFactory<T>... eventProcessorFactories)
{
return handleEventsWith(eventProcessorFactories);
}
@@ -125,7 +127,8 @@ public class EventHandlerGroup<T>
* @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool.
* @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
*/
- public EventHandlerGroup<T> thenHandleEventsWithWorkerPool(final WorkHandler<? super T>... handlers)
+ @SafeVarargs
+ public final EventHandlerGroup<T> thenHandleEventsWithWorkerPool(final WorkHandler<? super T>... handlers)
{
return handleEventsWithWorkerPool(handlers);
}
@@ -142,7 +145,8 @@ public class EventHandlerGroup<T>
* @param handlers the batch handlers that will process events.
* @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
*/
- public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
+ @SafeVarargs
+ public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return disruptor.createEventProcessors(sequences, handlers);
}
@@ -159,7 +163,8 @@ public class EventHandlerGroup<T>
* @param eventProcessorFactories the event processor factories to use to create the event processors that will process events.
* @return a {@link EventHandlerGroup} that can be used to chain dependencies.
*/
- public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
+ @SafeVarargs
+ public final EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
{
return disruptor.createEventProcessors(sequences, eventProcessorFactories);
}
@@ -177,7 +182,8 @@ public class EventHandlerGroup<T>
* @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool.
* @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
*/
- public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<? super T>... handlers)
+ @SafeVarargs
+ public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<? super T>... handlers)
{
return disruptor.createWorkerPool(sequences, handlers);
}
=====================================
src/main/java/com/lmax/disruptor/dsl/ExceptionHandlerSetting.java
=====================================
--- a/src/main/java/com/lmax/disruptor/dsl/ExceptionHandlerSetting.java
+++ b/src/main/java/com/lmax/disruptor/dsl/ExceptionHandlerSetting.java
@@ -17,6 +17,7 @@ package com.lmax.disruptor.dsl;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.ExceptionHandler;
/**
@@ -44,10 +45,20 @@ public class ExceptionHandlerSetting<T>
*
* @param exceptionHandler the exception handler to use.
*/
+ @SuppressWarnings("unchecked")
public void with(ExceptionHandler<? super T> exceptionHandler)
{
- ((BatchEventProcessor<T>) consumerRepository.getEventProcessorFor(eventHandler))
- .setExceptionHandler(exceptionHandler);
- consumerRepository.getBarrierFor(eventHandler).alert();
+ final EventProcessor eventProcessor = consumerRepository.getEventProcessorFor(eventHandler);
+ if (eventProcessor instanceof BatchEventProcessor)
+ {
+ ((BatchEventProcessor<T>) eventProcessor).setExceptionHandler(exceptionHandler);
+ consumerRepository.getBarrierFor(eventHandler).alert();
+ }
+ else
+ {
+ throw new RuntimeException(
+ "EventProcessor: " + eventProcessor + " is not a BatchEventProcessor " +
+ "and does not support exception handlers");
+ }
}
}
=====================================
src/main/java/com/lmax/disruptor/util/ThreadHints.java
=====================================
--- /dev/null
+++ b/src/main/java/com/lmax/disruptor/util/ThreadHints.java
@@ -0,0 +1,77 @@
+/* Copyright 2016 Gil Tene
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.lmax.disruptor.util;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+
+
+import static java.lang.invoke.MethodType.methodType;
+
+/**
+ * This class captures possible hints that may be used by some
+ * runtimes to improve code performance. It is intended to capture hinting
+ * behaviours that are implemented in or anticipated to be spec'ed under the
+ * {@link java.lang.Thread} class in some Java SE versions, but missing in prior
+ * versions.
+ */
+public final class ThreadHints
+{
+ private static final MethodHandle ON_SPIN_WAIT_METHOD_HANDLE;
+
+ static
+ {
+ final MethodHandles.Lookup lookup = MethodHandles.lookup();
+
+ MethodHandle methodHandle = null;
+ try
+ {
+ methodHandle = lookup.findStatic(Thread.class, "onSpinWait", methodType(void.class));
+ }
+ catch (final Exception ignore)
+ {
+ }
+
+ ON_SPIN_WAIT_METHOD_HANDLE = methodHandle;
+ }
+
+ private ThreadHints()
+ {
+ }
+
+ /**
+ * Indicates that the caller is momentarily unable to progress, until the
+ * occurrence of one or more actions on the part of other activities. By
+ * invoking this method within each iteration of a spin-wait loop construct,
+ * the calling thread indicates to the runtime that it is busy-waiting. The runtime
+ * may take action to improve the performance of invoking spin-wait loop constructions.
+ */
+ public static void onSpinWait()
+ {
+ // Call java.lang.Thread.onSpinWait() on Java SE versions that support it. Do nothing otherwise.
+ // This should optimize away to either nothing or to an inlining of java.lang.Thread.onSpinWait()
+ if (null != ON_SPIN_WAIT_METHOD_HANDLE)
+ {
+ try
+ {
+ ON_SPIN_WAIT_METHOD_HANDLE.invokeExact();
+ }
+ catch (final Throwable ignore)
+ {
+ }
+ }
+ }
+}
\ No newline at end of file
=====================================
src/main/java/com/lmax/disruptor/util/Util.java
=====================================
--- a/src/main/java/com/lmax/disruptor/util/Util.java
+++ b/src/main/java/com/lmax/disruptor/util/Util.java
@@ -16,16 +16,15 @@
package com.lmax.disruptor.util;
import java.lang.reflect.Field;
-import java.nio.Buffer;
-import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
-import sun.misc.Unsafe;
-
import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.Sequence;
+
+import sun.misc.Unsafe;
+
/**
* Set of common functions used by the Disruptor
*/
@@ -127,28 +126,6 @@ public final class Util
}
/**
- * Gets the address value for the memory that backs a direct byte buffer.
- *
- * @param buffer a direct buffer to get the address from.
- * @return The system address for the buffers
- */
- @Deprecated
- public static long getAddressFromDirectByteBuffer(ByteBuffer buffer)
- {
- try
- {
- Field addressField = Buffer.class.getDeclaredField("address");
- addressField.setAccessible(true);
- return addressField.getLong(buffer);
- }
- catch (Exception e)
- {
- throw new RuntimeException("Unable to address field from ByteBuffer", e);
- }
- }
-
-
- /**
* Calculate the log base 2 of the supplied integer, essentially reports the location
* of the highest bit.
*
=====================================
src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java
=====================================
--- a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java
+++ b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java
@@ -1,24 +1,16 @@
package com.lmax.disruptor.offheap;
+import com.lmax.disruptor.*;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.LockSupport;
-import com.lmax.disruptor.AbstractPerfTestDisruptor;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.DataProvider;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.Sequence;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.Sequencer;
-import com.lmax.disruptor.SingleProducerSequencer;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.YieldingWaitStrategy;
-import com.lmax.disruptor.util.DaemonThreadFactory;
-
public class OneToOneOffHeapThroughputTest extends AbstractPerfTestDisruptor
{
private static final int BLOCK_SIZE = 256;
@@ -99,9 +91,10 @@ public class OneToOneOffHeapThroughputTest extends AbstractPerfTestDisruptor
@Override
public void onEvent(ByteBuffer event, long sequence, boolean endOfBatch) throws Exception
{
- for (int i = 0; i < BLOCK_SIZE; i += 8)
+ final int start = event.position();
+ for (int i = start, size = start + BLOCK_SIZE; i < size; i += 8)
{
- total += event.getLong();
+ total += event.getLong(i);
}
if (--expectedCount == 0)
@@ -134,7 +127,7 @@ public class OneToOneOffHeapThroughputTest extends AbstractPerfTestDisruptor
@Override
protected ByteBuffer initialValue()
{
- return buffer.duplicate();
+ return buffer.duplicate().order(ByteOrder.nativeOrder());
}
};
@@ -143,7 +136,7 @@ public class OneToOneOffHeapThroughputTest extends AbstractPerfTestDisruptor
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
- buffer = ByteBuffer.allocateDirect(sequencer.getBufferSize() * entrySize);
+ buffer = ByteBuffer.allocateDirect(sequencer.getBufferSize() * entrySize).order(ByteOrder.nativeOrder());
}
public void addGatingSequences(Sequence sequence)
=====================================
src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java
=====================================
--- a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java
+++ b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java
@@ -1,32 +1,30 @@
package com.lmax.disruptor.offheap;
+import com.lmax.disruptor.*;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.LockSupport;
-import com.lmax.disruptor.AbstractPerfTestDisruptor;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.YieldingWaitStrategy;
-import com.lmax.disruptor.util.DaemonThreadFactory;
-
public class OneToOneOnHeapThroughputTest extends AbstractPerfTestDisruptor
{
private static final int BLOCK_SIZE = 256;
private static final int BUFFER_SIZE = 1024 * 1024;
private static final long ITERATIONS = 1000 * 1000 * 10L;
+ private static final boolean SLICED_BUFFER = Boolean.getBoolean("sliced");
private final Executor executor = Executors.newFixedThreadPool(1, DaemonThreadFactory.INSTANCE);
private final WaitStrategy waitStrategy = new YieldingWaitStrategy();
private final RingBuffer<ByteBuffer> buffer =
- RingBuffer.createSingleProducer(BufferFactory.direct(BLOCK_SIZE), BUFFER_SIZE, waitStrategy);
- private final ByteBufferHandler handler = new ByteBufferHandler();
+ RingBuffer.createSingleProducer(
+ SLICED_BUFFER ? SlicedBufferFactory.direct(BLOCK_SIZE, BUFFER_SIZE) : BufferFactory.direct(BLOCK_SIZE),
+ BUFFER_SIZE, waitStrategy);
+ private final ByteBufferHandler handler = new ByteBufferHandler();
private final BatchEventProcessor<ByteBuffer> processor =
new BatchEventProcessor<ByteBuffer>(buffer, buffer.newBarrier(), handler);
@@ -103,7 +101,7 @@ public class OneToOneOnHeapThroughputTest extends AbstractPerfTestDisruptor
{
for (int i = 0; i < BLOCK_SIZE; i += 8)
{
- total += event.getLong();
+ total += event.getLong(i);
}
if (--expectedCount == 0)
@@ -140,11 +138,11 @@ public class OneToOneOnHeapThroughputTest extends AbstractPerfTestDisruptor
{
if (isDirect)
{
- return ByteBuffer.allocateDirect(size);
+ return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
}
else
{
- return ByteBuffer.allocate(size);
+ return ByteBuffer.allocate(size).order(ByteOrder.nativeOrder());
}
}
@@ -159,4 +157,51 @@ public class OneToOneOnHeapThroughputTest extends AbstractPerfTestDisruptor
return new BufferFactory(false, size);
}
}
+
+ private static final class SlicedBufferFactory implements EventFactory<ByteBuffer>
+ {
+ private final boolean isDirect;
+ private final int size;
+ private final int total;
+ private ByteBuffer buffer;
+
+ private SlicedBufferFactory(boolean isDirect, int size, int total)
+ {
+ this.isDirect = isDirect;
+ this.size = size;
+ this.total = total;
+ this.buffer =
+ (isDirect ? ByteBuffer.allocateDirect(size * total) : ByteBuffer.allocate(size * total))
+ .order(ByteOrder.nativeOrder());
+ this.buffer.limit(0);
+ }
+
+ @Override
+ public ByteBuffer newInstance()
+ {
+ if (this.buffer.limit() == this.buffer.capacity())
+ {
+ this.buffer =
+ (isDirect ? ByteBuffer.allocateDirect(size * total) : ByteBuffer.allocate(size * total))
+ .order(ByteOrder.nativeOrder());
+ this.buffer.limit(0);
+ }
+ final int limit = this.buffer.limit();
+ this.buffer.limit(limit + size);
+ this.buffer.position(limit);
+ final ByteBuffer slice = this.buffer.slice().order(ByteOrder.nativeOrder());
+ return slice;
+ }
+
+ public static SlicedBufferFactory direct(int size, int total)
+ {
+ return new SlicedBufferFactory(true, size, total);
+ }
+
+ @SuppressWarnings("unused")
+ public static SlicedBufferFactory heap(int size, int total)
+ {
+ return new SlicedBufferFactory(false, size, total);
+ }
+ }
}
=====================================
src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
+++ b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
@@ -205,7 +205,7 @@ public final class BatchEventProcessorTest
};
final LatchLifeCycleHandler h1 = new LatchLifeCycleHandler();
- final BatchEventProcessor p1 = new BatchEventProcessor<Object>(dp, barrier, h1);
+ final BatchEventProcessor p1 = new BatchEventProcessor<>(dp, barrier, h1);
Thread t1 = new Thread(p1);
p1.halt();
@@ -217,7 +217,7 @@ public final class BatchEventProcessorTest
for (int i = 0; i < 1000; i++)
{
final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
- final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+ final BatchEventProcessor p2 = new BatchEventProcessor<>(dp, barrier, h2);
Thread t2 = new Thread(p2);
t2.start();
p2.halt();
@@ -229,7 +229,7 @@ public final class BatchEventProcessorTest
for (int i = 0; i < 1000; i++)
{
final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
- final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+ final BatchEventProcessor p2 = new BatchEventProcessor<>(dp, barrier, h2);
Thread t2 = new Thread(p2);
t2.start();
Thread.yield();
=====================================
src/test/java/com/lmax/disruptor/RingBufferTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/RingBufferTest.java
+++ b/src/test/java/com/lmax/disruptor/RingBufferTest.java
@@ -599,6 +599,22 @@ public class RingBufferTest
try
{
ringBuffer.publishEvents(new EventTranslator[]{translator, translator, translator, translator}, 1, 0);
+ }
+ finally
+ {
+ assertEmptyRingBuffer(ringBuffer);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldNotTryPublishEventsWhenBatchSizeIs0() throws Exception
+ {
+ RingBuffer<Object[]> ringBuffer = RingBuffer.createSingleProducer(new ArrayFactory(1), 4);
+ EventTranslator<Object[]> translator = new NoArgEventTranslator();
+
+ try
+ {
ringBuffer.tryPublishEvents(new EventTranslator[]{translator, translator, translator, translator}, 1, 0);
}
finally
@@ -717,7 +733,22 @@ public class RingBufferTest
try
{
ringBuffer.publishEvents(translator, 1, 0, new String[]{"Foo", "Foo"});
- assertFalse(ringBuffer.tryPublishEvents(translator, 1, 0, new String[]{"Foo", "Foo"}));
+ }
+ finally
+ {
+ assertEmptyRingBuffer(ringBuffer);
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldNotTryPublishEventsOneArgWhenBatchSizeIs0() throws Exception
+ {
+ RingBuffer<Object[]> ringBuffer = RingBuffer.createSingleProducer(new ArrayFactory(1), 4);
+ EventTranslatorOneArg<Object[], String> translator = new OneArgEventTranslator();
+
+ try
+ {
+ ringBuffer.tryPublishEvents(translator, 1, 0, new String[]{"Foo", "Foo"});
}
finally
{
@@ -829,10 +860,22 @@ public class RingBufferTest
try
{
ringBuffer.publishEvents(translator, 1, 0, new String[]{"Foo", "Foo"}, new String[]{"Bar", "Bar"});
- assertFalse(
- ringBuffer.tryPublishEvents(
- translator, 1, 0, new String[]{"Foo", "Foo"},
- new String[]{"Bar", "Bar"}));
+ }
+ finally
+ {
+ assertEmptyRingBuffer(ringBuffer);
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldNotTryPublishEventsTwoArgWhenBatchSizeIs0() throws Exception
+ {
+ RingBuffer<Object[]> ringBuffer = RingBuffer.createSingleProducer(new ArrayFactory(1), 4);
+ EventTranslatorTwoArg<Object[], String, String> translator = new TwoArgEventTranslator();
+
+ try
+ {
+ ringBuffer.tryPublishEvents(translator, 1, 0, new String[]{"Foo", "Foo"}, new String[]{"Bar", "Bar"});
}
finally
{
@@ -946,10 +989,24 @@ public class RingBufferTest
ringBuffer.publishEvents(
translator, 1, 0, new String[]{"Foo", "Foo"}, new String[]{"Bar", "Bar"},
new String[]{"Baz", "Baz"});
- assertFalse(
- ringBuffer.tryPublishEvents(
+ }
+ finally
+ {
+ assertEmptyRingBuffer(ringBuffer);
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldNotTryPublishEventsThreeArgWhenBatchSizeIs0() throws Exception
+ {
+ RingBuffer<Object[]> ringBuffer = RingBuffer.createSingleProducer(new ArrayFactory(1), 4);
+ EventTranslatorThreeArg<Object[], String, String, String> translator = new ThreeArgEventTranslator();
+
+ try
+ {
+ ringBuffer.tryPublishEvents(
translator, 1, 0, new String[]{"Foo", "Foo"},
- new String[]{"Bar", "Bar"}, new String[]{"Baz", "Baz"}));
+ new String[]{"Bar", "Bar"}, new String[]{"Baz", "Baz"});
}
finally
{
@@ -1079,11 +1136,25 @@ public class RingBufferTest
"Foo2", "Bar2",
"Baz2", "Bam2"
});
- assertFalse(
- ringBuffer.tryPublishEvents(
+ }
+ finally
+ {
+ assertEmptyRingBuffer(ringBuffer);
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldNotTryPublishEventsVarArgWhenBatchSizeIs0() throws Exception
+ {
+ RingBuffer<Object[]> ringBuffer = RingBuffer.createSingleProducer(new ArrayFactory(1), 4);
+ VarArgEventTranslator translator = new VarArgEventTranslator();
+
+ try
+ {
+ ringBuffer.tryPublishEvents(
translator, 1, 0, new String[]{"Foo0", "Bar0", "Baz0", "Bam0"},
new String[]{"Foo1", "Bar1", "Baz1", "Bam1"},
- new String[]{"Foo2", "Bar2", "Baz2", "Bam2"}));
+ new String[]{"Foo2", "Bar2", "Baz2", "Bam2"});
}
finally
{
=====================================
src/test/java/com/lmax/disruptor/collections/HistogramTest.java deleted
=====================================
--- a/src/test/java/com/lmax/disruptor/collections/HistogramTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Copyright 2011 LMAX Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.lmax.disruptor.collections;
-
-import org.junit.Test;
-
-import java.math.BigDecimal;
-
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-public final class HistogramTest
-{
- public static final long[] INTERVALS = new long[]{1, 10, 100, 1000, Long.MAX_VALUE};
- private Histogram histogram = new Histogram(INTERVALS);
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowExceptionOnInvalidBounds()
- {
- new Histogram(new long[]{});
- }
-
- @Test
- public void shouldSizeBasedOnBucketConfiguration()
- {
- assertThat(Long.valueOf(histogram.getSize()), is(Long.valueOf(INTERVALS.length)));
- }
-
- @Test
- public void shouldWalkIntervals()
- {
- for (int i = 0, size = histogram.getSize(); i < size; i++)
- {
- assertThat(Long.valueOf(histogram.getUpperBoundAt(i)), is(Long.valueOf(INTERVALS[i])));
- }
- }
-
- @Test
- public void shouldConfirmIntervalsAreInitialised()
- {
- for (int i = 0, size = histogram.getSize(); i < size; i++)
- {
- assertThat(Long.valueOf(histogram.getCountAt(i)), is(Long.valueOf(0L)));
- }
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowExceptionWhenIntervalLessThanOrEqualToZero()
- {
- new Histogram(new long[]{-1, 10, 20});
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowExceptionWhenIntervalDoNotIncrease()
- {
- new Histogram(new long[]{1, 10, 10, 20});
- }
-
- @Test
- public void shouldAddObservation()
- {
- assertTrue(histogram.addObservation(10L));
- assertThat(Long.valueOf(histogram.getCountAt(1)), is(Long.valueOf(1L)));
- }
-
- @Test
- public void shouldNotAddObservation()
- {
- Histogram histogram = new Histogram(new long[]{10, 20, 30});
- assertFalse(histogram.addObservation(31));
- }
-
- @Test
- public void shouldAddObservations()
- {
- addObservations(histogram, 10L, 30L, 50L);
-
- Histogram histogram2 = new Histogram(INTERVALS);
- addObservations(histogram2, 10L, 20L, 25L);
-
- histogram.addObservations(histogram2);
-
- assertThat(Long.valueOf(6L), is(Long.valueOf(histogram.getCount())));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowExceptionWhenIntervalsDoNotMatch()
- {
- long[] intervals = INTERVALS.clone();
- intervals[0]++;
-
- Histogram histogram2 = new Histogram(intervals);
- histogram.addObservations(histogram2);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowExceptionWhenIntervalsLengthDoNotMatch()
- {
- Histogram histogram2 = new Histogram(new long[]{1L, 2L, 3L});
- histogram.addObservations(histogram2);
- }
-
- @Test
- public void shouldClearCounts()
- {
- addObservations(histogram, 1L, 7L, 10L, 3000L);
- histogram.clear();
-
- for (int i = 0, size = histogram.getSize(); i < size; i++)
- {
- assertThat(Long.valueOf(histogram.getCountAt(i)), is(Long.valueOf(0)));
- }
- }
-
- @Test
- public void shouldCountTotalObservations()
- {
- addObservations(histogram, 1L, 7L, 10L, 3000L);
-
- assertThat(Long.valueOf(histogram.getCount()), is(Long.valueOf(4L)));
- }
-
- @Test
- public void shouldGetMeanZeroObservation()
- {
- final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
- final Histogram histogram = new Histogram(intervals);
-
- assertThat(histogram.getMean(), is(BigDecimal.ZERO));
- }
-
- @Test
- public void shouldGetMeanObservation()
- {
- final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
- final Histogram histogram = new Histogram(intervals);
-
- addObservations(histogram, 1L, 7L, 10L, 10L, 11L, 144L);
-
- assertThat(histogram.getMean(), is(new BigDecimal("32.67")));
- }
-
- @Test
- public void shouldCorrectMeanForSkewInTopAndBottomPopulatedIntervals()
- {
- final long[] intervals = new long[]{100, 110, 120, 130, 140, 150, 1000, 10000};
- final Histogram histogram = new Histogram(intervals);
-
- for (long i = 100; i < 152; i++)
- {
- histogram.addObservation(i);
- }
-
- assertThat(histogram.getMean(), is(new BigDecimal("125.02")));
- }
-
- @Test
- public void shouldGetMaxObservation()
- {
- addObservations(histogram, 1L, 7L, 10L, 10L, 11L, 144L);
-
- assertThat(Long.valueOf(histogram.getMax()), is(Long.valueOf(144L)));
- }
-
- @Test
- public void shouldGetMinObservation()
- {
- addObservations(histogram, 1L, 7L, 10L, 10L, 11L, 144L);
-
- assertThat(Long.valueOf(histogram.getMin()), is(Long.valueOf(1L)));
- }
-
- @Test
- public void shouldGetMinAndMaxOfSingleObservation()
- {
- addObservations(histogram, 10L);
-
- assertThat(Long.valueOf(histogram.getMin()), is(Long.valueOf(10L)));
- assertThat(Long.valueOf(histogram.getMax()), is(Long.valueOf(10L)));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldGetExceptionUpperBoundForFactorZero()
- {
- final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
- final Histogram histogram = new Histogram(intervals);
-
- Long.valueOf(histogram.getUpperBoundForFactor(0.0));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldGetExceptionUpperBoundForFactorOne()
- {
- final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
- final Histogram histogram = new Histogram(intervals);
-
- Long.valueOf(histogram.getUpperBoundForFactor(1.0));
- }
-
- @Test
- public void shouldGetZeroUpperBound()
- {
- final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
- final Histogram histogram = new Histogram(intervals);
-
- assertThat(Long.valueOf(histogram.getUpperBoundForFactor(0.5)), is(Long.valueOf(0L)));
- }
-
- @Test
- public void shouldGetTwoNinesUpperBound()
- {
- final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
- final Histogram histogram = new Histogram(intervals);
-
- for (long i = 1; i < 101; i++)
- {
- histogram.addObservation(i);
- }
-
- assertThat(Long.valueOf(histogram.getTwoNinesUpperBound()), is(Long.valueOf(100L)));
- }
-
- @Test
- public void shouldGetFourNinesUpperBound()
- {
- final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
- final Histogram histogram = new Histogram(intervals);
-
- for (long i = 1; i < 102; i++)
- {
- histogram.addObservation(i);
- }
-
- assertThat(Long.valueOf(histogram.getFourNinesUpperBound()), is(Long.valueOf(1000L)));
- }
-
- @Test
- public void shouldToString()
- {
- addObservations(histogram, 1L, 7L, 10L, 300L);
-
- String expectedResults =
- "Histogram{min=1, max=300, mean=53.25, 99%=1000, 99.99%=1000, [1=1, 10=2, 100=0, 1000=1, 9223372036854775807=0]}";
- assertThat(histogram.toString(), is(expectedResults));
- }
-
- private void addObservations(final Histogram histogram, final long... observations)
- {
- for (int i = 0, size = observations.length; i < size; i++)
- {
- histogram.addObservation(observations[i]);
- }
- }
-}
=====================================
src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
+++ b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
@@ -20,6 +20,7 @@ import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.FatalExceptionHandler;
import com.lmax.disruptor.RingBuffer;
@@ -134,6 +135,40 @@ public class DisruptorTest
}
}
+
+ @Test
+ public void shouldBatchOfEvents() throws Exception
+ {
+ final CountDownLatch eventCounter = new CountDownLatch(2);
+ disruptor.handleEventsWith(new EventHandler<TestEvent>()
+ {
+ @Override
+ public void onEvent(final TestEvent event, final long sequence, final boolean endOfBatch) throws Exception
+ {
+ eventCounter.countDown();
+ }
+ });
+
+ disruptor.start();
+
+ disruptor.publishEvents(
+ new EventTranslatorOneArg<TestEvent, Object>()
+ {
+ @Override
+ public void translateTo(final TestEvent event, final long sequence, Object arg)
+ {
+ lastPublishedEvent = event;
+ }
+ },
+ new Object[] { "a", "b" }
+ );
+
+ if (!eventCounter.await(5, TimeUnit.SECONDS))
+ {
+ fail("Did not process event published before start was called. Missed events: " + eventCounter.getCount());
+ }
+ }
+
@Test
public void shouldAddEventProcessorsAfterPublishing() throws Exception
{
@@ -323,6 +358,7 @@ public class DisruptorTest
disruptor.after(handler2);
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldSupportSpecifyingAExceptionHandlerForEventProcessors()
throws Exception
@@ -341,6 +377,7 @@ public class DisruptorTest
assertSame(testException, actualException);
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldOnlyApplyExceptionsHandlersSpecifiedViaHandleExceptionsWithOnNewEventProcessors()
throws Exception
=====================================
src/test/java/com/lmax/disruptor/example/ShutdownOnError.java
=====================================
--- /dev/null
+++ b/src/test/java/com/lmax/disruptor/example/ShutdownOnError.java
@@ -0,0 +1,132 @@
+package com.lmax.disruptor.example;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ShutdownOnError
+{
+ private static class Event
+ {
+ public long value;
+
+ public static final EventFactory<Event> FACTORY = new EventFactory<Event>()
+ {
+ @Override
+ public Event newInstance()
+ {
+ return new Event();
+ }
+ };
+ }
+
+ private static class DefaultThreadFactory implements ThreadFactory
+ {
+ @Override
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r);
+ }
+ }
+
+ private static class Handler implements EventHandler<Event>
+ {
+ @Override
+ public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception
+ {
+ // do work, if a failure occurs throw exception.
+ }
+ }
+
+ private static final class ErrorHandler implements ExceptionHandler<Event>
+ {
+ private final AtomicBoolean running;
+
+ private ErrorHandler(AtomicBoolean running)
+ {
+ this.running = running;
+ }
+
+ @Override
+ public void handleEventException(Throwable ex, long sequence, Event event)
+ {
+ if (execeptionIsFatal(ex))
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private boolean execeptionIsFatal(Throwable ex)
+ {
+ // Do what is appropriate here.
+ return true;
+ }
+
+ @Override
+ public void handleOnStartException(Throwable ex)
+ {
+
+ }
+
+ @Override
+ public void handleOnShutdownException(Throwable ex)
+ {
+
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ Disruptor<Event> disruptor = new Disruptor<>(Event.FACTORY, 1024, new DefaultThreadFactory());
+
+ AtomicBoolean running = new AtomicBoolean(true);
+
+ ErrorHandler errorHandler = new ErrorHandler(running);
+
+ final Handler handler = new Handler();
+ disruptor.handleEventsWith(handler);
+ disruptor.handleExceptionsFor(handler).with(errorHandler);
+
+ simplePublish(disruptor, running);
+ }
+
+ private static void simplePublish(Disruptor<Event> disruptor, AtomicBoolean running)
+ {
+ while (running.get())
+ {
+ disruptor.publishEvent(new EventTranslator<Event>()
+ {
+ @Override
+ public void translateTo(Event event, long sequence)
+ {
+ event.value = sequence;
+ }
+ });
+ }
+ }
+
+ private static void smarterPublish(Disruptor<Event> disruptor, AtomicBoolean running)
+ {
+ final RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();
+
+ boolean publishOk;
+ do
+ {
+ publishOk = ringBuffer.tryPublishEvent(new EventTranslator<Event>()
+ {
+ @Override
+ public void translateTo(Event event, long sequence)
+ {
+ event.value = sequence;
+ }
+ });
+ }
+ while (publishOk && running.get());
+ }
+}
=====================================
src/test/java/com/lmax/disruptor/util/UtilTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/util/UtilTest.java
+++ b/src/test/java/com/lmax/disruptor/util/UtilTest.java
@@ -16,15 +16,10 @@
package com.lmax.disruptor.util;
import com.lmax.disruptor.Sequence;
+
import org.junit.Assert;
import org.junit.Test;
-import java.nio.ByteBuffer;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertThat;
-
public final class UtilTest
{
@Test
@@ -57,11 +52,4 @@ public final class UtilTest
Assert.assertEquals(Long.MAX_VALUE, Util.getMinimumSequence(sequences));
}
-
- @Test
- public void shouldGetByteBufferAddress() throws Exception
- {
- ByteBuffer buffer = ByteBuffer.allocateDirect(16);
- assertThat(Util.getAddressFromDirectByteBuffer(buffer), is(not(0L)));
- }
}
View it on GitLab: https://salsa.debian.org/java-team/disruptor/commit/fa9451b4f7fd9acaee9df7b620f8ce48d806bfe0
--
View it on GitLab: https://salsa.debian.org/java-team/disruptor/commit/fa9451b4f7fd9acaee9df7b620f8ce48d806bfe0
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20180625/51b5e8d0/attachment.html>
More information about the pkg-java-commits
mailing list