[Git][java-team/disruptor][upstream] New upstream version 3.4.2

Tony Mancill gitlab at salsa.debian.org
Mon Jun 25 06:18:54 BST 2018


Tony Mancill pushed to branch upstream at Debian Java Maintainers / disruptor


Commits:
fa9451b4 by tony mancill at 2018-06-24T10:54:50-07:00
New upstream version 3.4.2
- - - - -


30 changed files:

- README.md
- build.gradle
- runoffheap.sh
- src/main/java/com/lmax/disruptor/AggregateEventHandler.java
- src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java
- src/main/java/com/lmax/disruptor/BusySpinWaitStrategy.java
- src/main/java/com/lmax/disruptor/EventHandler.java
- src/main/java/com/lmax/disruptor/EventProcessor.java
- − src/main/java/com/lmax/disruptor/Foo.java
- src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java
- src/main/java/com/lmax/disruptor/SequenceGroup.java
- src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
- src/main/java/com/lmax/disruptor/WorkerPool.java
- src/main/java/com/lmax/disruptor/YieldingWaitStrategy.java
- − src/main/java/com/lmax/disruptor/collections/Histogram.java
- src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java
- src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java
- src/main/java/com/lmax/disruptor/dsl/Disruptor.java
- src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
- src/main/java/com/lmax/disruptor/dsl/ExceptionHandlerSetting.java
- + src/main/java/com/lmax/disruptor/util/ThreadHints.java
- src/main/java/com/lmax/disruptor/util/Util.java
- src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java
- src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java
- src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
- src/test/java/com/lmax/disruptor/RingBufferTest.java
- − src/test/java/com/lmax/disruptor/collections/HistogramTest.java
- src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
- + src/test/java/com/lmax/disruptor/example/ShutdownOnError.java
- src/test/java/com/lmax/disruptor/util/UtilTest.java


Changes:

=====================================
README.md
=====================================
--- a/README.md
+++ b/README.md
@@ -15,21 +15,23 @@ A High Performance Inter-Thread Messaging Library
 
 ## Changelog
 
-### 3.3.11
+### 3.4.2
 
 - Fix race condition in BatchEventProcessor with 3 or more starting/halting concurrently.
 
-### 3.3.10
+### 3.4.1
 
-- Fix race condition in BatchEventProcessor between run() and halt().
+ - Fix race between run() and halt() on BatchEventProcessor.
 
-### 3.3.9
+### 3.4.0
 
-- Change SleepingWaitStrategy to use a parkNanos(100).
+ - Drop support for JDK6, support JDK7 and above only.
+ - Add `ThreadHints.onSpinWait` to all busy spins within Disruptor.
+ - Increase default sleep time for LockSupport.parkNanos to prevent busy spinning.
 
 ### 3.3.8
 
-- Revert belt and braces WaitStategy signalling.
+- Revert belt and braces WaitStrategy signalling.
 
 ### 3.3.7
 


=====================================
build.gradle
=====================================
--- a/build.gradle
+++ b/build.gradle
@@ -24,7 +24,7 @@ apply plugin: 'idea'
 defaultTasks 'build'
 
 group = 'com.lmax'
-version = new Version(major: 3, minor: 3, revision: 11)
+version = new Version(major: 3, minor: 4, revision: 2)
 
 ext {
     fullName = 'Disruptor Framework'
@@ -61,8 +61,8 @@ idea.module {
     scopes.TEST.plus += [ configurations.perfCompile ]
 }
 
-sourceCompatibility = 1.6
-targetCompatibility = 1.6
+sourceCompatibility = 1.7
+targetCompatibility = 1.7
 
 
 compileJava {
@@ -175,7 +175,7 @@ task perfJar(type: Jar) {
 }
 
 task wrapper(type: Wrapper) {
-    gradleVersion = '4.2'
+    gradleVersion = '4.3'
 }
 
 class Version {


=====================================
runoffheap.sh
=====================================
--- a/runoffheap.sh
+++ b/runoffheap.sh
@@ -14,4 +14,8 @@ echo "Done"
 echo "Running OnHeap..."
 $BIN $ARGS $GCARGS -Xloggc:custom-gc.log $CPATH com.lmax.disruptor.offheap.OneToOneOnHeapThroughputTest
 echo "Done"
+
+echo "Running Sliced OnHeap..."
+$BIN $ARGS $GCARGS -Xloggc:custom-gc.log $CPATH -Dsliced=true com.lmax.disruptor.offheap.OneToOneOnHeapThroughputTest
+echo "Done"
      


=====================================
src/main/java/com/lmax/disruptor/AggregateEventHandler.java
=====================================
--- a/src/main/java/com/lmax/disruptor/AggregateEventHandler.java
+++ b/src/main/java/com/lmax/disruptor/AggregateEventHandler.java
@@ -30,6 +30,7 @@ public final class AggregateEventHandler<T>
      *
      * @param eventHandlers to be called in sequence.
      */
+    @SafeVarargs
     public AggregateEventHandler(final EventHandler<T>... eventHandlers)
     {
         this.eventHandlers = eventHandlers;


=====================================
src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java
@@ -19,6 +19,8 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.lmax.disruptor.util.ThreadHints;
+
 /**
  * Blocking strategy that uses a lock and condition variable for {@link EventProcessor}s waiting on a barrier.
  * <p>
@@ -54,6 +56,7 @@ public final class BlockingWaitStrategy implements WaitStrategy
         while ((availableSequence = dependentSequence.get()) < sequence)
         {
             barrier.checkAlert();
+            ThreadHints.onSpinWait();
         }
 
         return availableSequence;


=====================================
src/main/java/com/lmax/disruptor/BusySpinWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/BusySpinWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/BusySpinWaitStrategy.java
@@ -16,6 +16,8 @@
 package com.lmax.disruptor;
 
 
+import com.lmax.disruptor.util.ThreadHints;
+
 /**
  * Busy Spin strategy that uses a busy spin loop for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier.
  * <p>
@@ -34,6 +36,7 @@ public final class BusySpinWaitStrategy implements WaitStrategy
         while ((availableSequence = dependentSequence.get()) < sequence)
         {
             barrier.checkAlert();
+            ThreadHints.onSpinWait();
         }
 
         return availableSequence;


=====================================
src/main/java/com/lmax/disruptor/EventHandler.java
=====================================
--- a/src/main/java/com/lmax/disruptor/EventHandler.java
+++ b/src/main/java/com/lmax/disruptor/EventHandler.java
@@ -24,7 +24,12 @@ package com.lmax.disruptor;
 public interface EventHandler<T>
 {
     /**
-     * Called when a publisher has published an event to the {@link RingBuffer}
+     * Called when a publisher has published an event to the {@link RingBuffer}.  The {@link BatchEventProcessor} will
+     * read messages from the {@link RingBuffer} in batches, where a batch is all of the events available to be
+     * processed without having to wait for any new event to arrive.  This can be useful for event handlers that need
+     * to do slower operations like I/O as they can group together the data from multiple events into a single
+     * operation.  Implementations should ensure that the operation is always performed when endOfBatch is true as
+     * the time between that message an the next one is inderminate.
      *
      * @param event      published to the {@link RingBuffer}
      * @param sequence   of the event being processed


=====================================
src/main/java/com/lmax/disruptor/EventProcessor.java
=====================================
--- a/src/main/java/com/lmax/disruptor/EventProcessor.java
+++ b/src/main/java/com/lmax/disruptor/EventProcessor.java
@@ -16,7 +16,10 @@
 package com.lmax.disruptor;
 
 /**
- * EventProcessors waitFor events to become available for consumption from the {@link RingBuffer}
+ * An EventProcessor needs to be an implementation of a runnable that will poll for events from the {@link RingBuffer}
+ * using the appropriate wait strategy.  It is unlikely that you will need to implement this interface yourself.
+ * Look at using the {@link EventHandler} interface along with the pre-supplied BatchEventProcessor in the first
+ * instance.
  * <p>
  * An EventProcessor will generally be associated with a Thread for execution.
  */


=====================================
src/main/java/com/lmax/disruptor/Foo.java deleted
=====================================
--- a/src/main/java/com/lmax/disruptor/Foo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.lmax.disruptor;
-
-/**
- * Created by barkerm on 13/06/17.
- */
-public class Foo
-{
-    int a;
-    int b;
-    short c;
-    short d;
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        Foo foo = (Foo) o;
-
-        if (a != foo.a) return false;
-        if (b != foo.b) return false;
-        if (c != foo.c) return false;
-        return d == foo.d;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int result = a;
-        result = 31 * result + b;
-        result = 31 * result + (int) c;
-        result = 31 * result + (int) d;
-        return result;
-    }
-}


=====================================
src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java
@@ -20,6 +20,8 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.lmax.disruptor.util.ThreadHints;
+
 /**
  * Variation of the {@link BlockingWaitStrategy} that attempts to elide conditional wake-ups when
  * the lock is uncontended.  Shows performance improvements on microbenchmarks.  However this
@@ -66,6 +68,7 @@ public final class LiteBlockingWaitStrategy implements WaitStrategy
         while ((availableSequence = dependentSequence.get()) < sequence)
         {
             barrier.checkAlert();
+            ThreadHints.onSpinWait();
         }
 
         return availableSequence;


=====================================
src/main/java/com/lmax/disruptor/SequenceGroup.java
=====================================
--- a/src/main/java/com/lmax/disruptor/SequenceGroup.java
+++ b/src/main/java/com/lmax/disruptor/SequenceGroup.java
@@ -60,9 +60,9 @@ public final class SequenceGroup extends Sequence
     public void set(final long value)
     {
         final Sequence[] sequences = this.sequences;
-        for (int i = 0, size = sequences.length; i < size; i++)
+        for (Sequence sequence : sequences)
         {
-            sequences[i].set(value);
+            sequence.set(value);
         }
     }
 


=====================================
src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
@@ -19,12 +19,14 @@ import java.util.concurrent.locks.LockSupport;
 
 /**
  * Sleeping strategy that initially spins, then uses a Thread.yield(), and
- * eventually sleep (<code>LockSupport.parkNanos(1)</code>) for the minimum
+ * eventually sleep (<code>LockSupport.parkNanos(n)</code>) for the minimum
  * number of nanos the OS and JVM will allow while the
  * {@link com.lmax.disruptor.EventProcessor}s are waiting on a barrier.
  * <p>
  * This strategy is a good compromise between performance and CPU resource.
- * Latency spikes can occur after quiet periods.
+ * Latency spikes can occur after quiet periods.  It will also reduce the impact
+ * on the producing thread as it will not need signal any conditional variables
+ * to wake up the event handling thread.
  */
 public final class SleepingWaitStrategy implements WaitStrategy
 {


=====================================
src/main/java/com/lmax/disruptor/WorkerPool.java
=====================================
--- a/src/main/java/com/lmax/disruptor/WorkerPool.java
+++ b/src/main/java/com/lmax/disruptor/WorkerPool.java
@@ -45,6 +45,7 @@ public final class WorkerPool<T>
      * @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.
      * @param workHandlers     to distribute the work load across.
      */
+    @SafeVarargs
     public WorkerPool(
         final RingBuffer<T> ringBuffer,
         final SequenceBarrier sequenceBarrier,
@@ -57,7 +58,7 @@ public final class WorkerPool<T>
 
         for (int i = 0; i < numWorkers; i++)
         {
-            workProcessors[i] = new WorkProcessor<T>(
+            workProcessors[i] = new WorkProcessor<>(
                 ringBuffer,
                 sequenceBarrier,
                 workHandlers[i],
@@ -75,6 +76,7 @@ public final class WorkerPool<T>
      * @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.
      * @param workHandlers     to distribute the work load across.
      */
+    @SafeVarargs
     public WorkerPool(
         final EventFactory<T> eventFactory,
         final ExceptionHandler<? super T> exceptionHandler,
@@ -87,7 +89,7 @@ public final class WorkerPool<T>
 
         for (int i = 0; i < numWorkers; i++)
         {
-            workProcessors[i] = new WorkProcessor<T>(
+            workProcessors[i] = new WorkProcessor<>(
                 ringBuffer,
                 barrier,
                 workHandlers[i],


=====================================
src/main/java/com/lmax/disruptor/YieldingWaitStrategy.java
=====================================
--- a/src/main/java/com/lmax/disruptor/YieldingWaitStrategy.java
+++ b/src/main/java/com/lmax/disruptor/YieldingWaitStrategy.java
@@ -20,7 +20,8 @@ package com.lmax.disruptor;
  * Yielding strategy that uses a Thread.yield() for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier
  * after an initially spinning.
  * <p>
- * This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes.
+ * This strategy will use 100% CPU, but will more readily give up the CPU than a busy spin strategy if other threads
+ * require CPU resource.
  */
 public final class YieldingWaitStrategy implements WaitStrategy
 {


=====================================
src/main/java/com/lmax/disruptor/collections/Histogram.java deleted
=====================================
--- a/src/main/java/com/lmax/disruptor/collections/Histogram.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Copyright 2011 LMAX Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.lmax.disruptor.collections;
-
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.util.Arrays;
-
-/**
- * <p>Histogram for tracking the frequency of observations of values below interval upper bounds.</p>
- *
- * <p>This class is useful for recording timings across a large number of observations
- * when high performance is required.</p>
- *
- * <p>The interval bounds are used to define the ranges of the histogram buckets. If provided bounds
- * are [10,20,30,40,50] then there will be five buckets, accessible by index 0-4. Any value
- * 0-10 will fall into the first interval bar, values 11-20 will fall into the
- * second bar, and so on.</p>
- */
- at Deprecated
-public final class Histogram
-{
-    // tracks the upper intervals of each of the buckets/bars
-    private final long[] upperBounds;
-    // tracks the count of the corresponding bucket
-    private final long[] counts;
-    // minimum value so far observed
-    private long minValue = Long.MAX_VALUE;
-    // maximum value so far observed
-    private long maxValue = 0L;
-
-    /**
-     * Create a new Histogram with a provided list of interval bounds.
-     *
-     * @param upperBounds of the intervals. Bounds must be provided in order least to greatest, and
-     *                    lowest bound must be greater than or equal to 1.
-     * @throws IllegalArgumentException if any of the upper bounds are less than or equal to zero
-     * @throws IllegalArgumentException if the bounds are not in order, least to greatest
-     */
-    public Histogram(final long[] upperBounds)
-    {
-        validateBounds(upperBounds);
-
-        this.upperBounds = Arrays.copyOf(upperBounds, upperBounds.length);
-        this.counts = new long[upperBounds.length];
-    }
-
-    /**
-     * Validates the input bounds; used by constructor only.
-     */
-    private void validateBounds(final long[] upperBounds)
-    {
-        long lastBound = -1L;
-        if (upperBounds.length <= 0)
-        {
-            throw new IllegalArgumentException("Must provide at least one interval");
-        }
-        for (final long bound : upperBounds)
-        {
-            if (bound <= 0L)
-            {
-                throw new IllegalArgumentException("Bounds must be positive values");
-            }
-
-            if (bound <= lastBound)
-            {
-                throw new IllegalArgumentException("bound " + bound + " is not greater than " + lastBound);
-            }
-
-            lastBound = bound;
-        }
-    }
-
-    /**
-     * Size of the list of interval bars (ie: count of interval bars).
-     *
-     * @return size of the interval bar list.
-     */
-    public int getSize()
-    {
-        return upperBounds.length;
-    }
-
-    /**
-     * Get the upper bound of an interval for an index.
-     *
-     * @param index of the upper bound.
-     * @return the interval upper bound for the index.
-     */
-    public long getUpperBoundAt(final int index)
-    {
-        return upperBounds[index];
-    }
-
-    /**
-     * Get the count of observations at a given index.
-     *
-     * @param index of the observations counter.
-     * @return the count of observations at a given index.
-     */
-    public long getCountAt(final int index)
-    {
-        return counts[index];
-    }
-
-    /**
-     * Add an observation to the histogram and increment the counter for the interval it matches.
-     *
-     * @param value for the observation to be added.
-     * @return return true if in the range of intervals and successfully added observation; otherwise false.
-     */
-    public boolean addObservation(final long value)
-    {
-        int low = 0;
-        int high = upperBounds.length - 1;
-
-        // do a classic binary search to find the high value
-        while (low < high)
-        {
-            int mid = low + ((high - low) >> 1);
-            if (upperBounds[mid] < value)
-            {
-                low = mid + 1;
-            }
-            else
-            {
-                high = mid;
-            }
-        }
-
-        // if the binary search found an eligible bucket, increment
-        if (value <= upperBounds[high])
-        {
-            counts[high]++;
-            trackRange(value);
-
-            return true;
-        }
-
-        // otherwise value was not found
-        return false;
-    }
-
-    /**
-     * Track minimum and maximum observations
-     */
-    private void trackRange(final long value)
-    {
-        if (value < minValue)
-        {
-            minValue = value;
-        }
-
-        if (value > maxValue)
-        {
-            maxValue = value;
-        }
-    }
-
-    /**
-     * <p>Add observations from another Histogram into this one.</p>
-     *
-     * <p>Histograms must have the same intervals.</p>
-     *
-     * @param histogram from which to add the observation counts.
-     * @throws IllegalArgumentException if interval count or values do not match exactly
-     */
-    public void addObservations(final Histogram histogram)
-    {
-        // validate the intervals
-        if (upperBounds.length != histogram.upperBounds.length)
-        {
-            throw new IllegalArgumentException("Histograms must have matching intervals");
-        }
-
-        for (int i = 0, size = upperBounds.length; i < size; i++)
-        {
-            if (upperBounds[i] != histogram.upperBounds[i])
-            {
-                throw new IllegalArgumentException("Histograms must have matching intervals");
-            }
-        }
-
-        // increment all of the internal counts
-        for (int i = 0, size = counts.length; i < size; i++)
-        {
-            counts[i] += histogram.counts[i];
-        }
-
-        // refresh the minimum and maximum observation ranges
-        trackRange(histogram.minValue);
-        trackRange(histogram.maxValue);
-    }
-
-    /**
-     * Clear the list of interval counters
-     */
-    public void clear()
-    {
-        maxValue = 0L;
-        minValue = Long.MAX_VALUE;
-
-        for (int i = 0, size = counts.length; i < size; i++)
-        {
-            counts[i] = 0L;
-        }
-    }
-
-    /**
-     * Count total number of recorded observations.
-     *
-     * @return the total number of recorded observations.
-     */
-    public long getCount()
-    {
-        long count = 0L;
-
-        for (int i = 0, size = counts.length; i < size; i++)
-        {
-            count += counts[i];
-        }
-
-        return count;
-    }
-
-    /**
-     * Get the minimum observed value.
-     *
-     * @return the minimum value observed.
-     */
-    public long getMin()
-    {
-        return minValue;
-    }
-
-    /**
-     * Get the maximum observed value.
-     *
-     * @return the maximum of the observed values;
-     */
-    public long getMax()
-    {
-        return maxValue;
-    }
-
-    /**
-     * <p>Calculate the mean of all recorded observations.</p>
-     *
-     * <p>The mean is calculated by summing the mid points of each interval multiplied by the count
-     * for that interval, then dividing by the total count of observations.  The max and min are
-     * considered for adjusting the top and bottom bin when calculating the mid point, this
-     * minimises skew if the observed values are very far away from the possible histogram values.</p>
-     *
-     * @return the mean of all recorded observations.
-     */
-    public BigDecimal getMean()
-    {
-        // early exit to avoid divide by zero later
-        if (0L == getCount())
-        {
-            return BigDecimal.ZERO;
-        }
-
-        // precalculate the initial lower bound; needed in the loop
-        long lowerBound = counts[0] > 0L ? minValue : 0L;
-        // use BigDecimal to avoid precision errors
-        BigDecimal total = BigDecimal.ZERO;
-
-        // midpoint is calculated as the average between the lower and upper bound
-        // (after taking into account the min & max values seen)
-        // then, simply multiply midpoint by the count of values at the interval (intervalTotal)
-        // and add to running total (total)
-        for (int i = 0, size = upperBounds.length; i < size; i++)
-        {
-            if (0L != counts[i])
-            {
-                long upperBound = Math.min(upperBounds[i], maxValue);
-                long midPoint = lowerBound + ((upperBound - lowerBound) / 2L);
-
-                BigDecimal intervalTotal = new BigDecimal(midPoint).multiply(new BigDecimal(counts[i]));
-                total = total.add(intervalTotal);
-            }
-
-            // and recalculate the lower bound for the next time around the loop
-            lowerBound = Math.max(upperBounds[i] + 1L, minValue);
-        }
-
-        return total.divide(new BigDecimal(getCount()), 2, RoundingMode.HALF_UP);
-    }
-
-    /**
-     * Calculate the upper bound within which 99% of observations fall.
-     *
-     * @return the upper bound for 99% of observations.
-     */
-    public long getTwoNinesUpperBound()
-    {
-        return getUpperBoundForFactor(0.99d);
-    }
-
-    /**
-     * Calculate the upper bound within which 99.99% of observations fall.
-     *
-     * @return the upper bound for 99.99% of observations.
-     */
-    public long getFourNinesUpperBound()
-    {
-        return getUpperBoundForFactor(0.9999d);
-    }
-
-    /**
-     * <p>Get the interval upper bound for a given factor of the observation population.</p>
-     *
-     * <p>Note this does not get the actual percentile measurement, it only gets the bucket</p>
-     *
-     * @param factor representing the size of the population.
-     * @return the interval upper bound.
-     * @throws IllegalArgumentException if factor < 0.0 or factor > 1.0
-     */
-    public long getUpperBoundForFactor(final double factor)
-    {
-        if (0.0d >= factor || factor >= 1.0d)
-        {
-            throw new IllegalArgumentException("factor must be >= 0.0 and <= 1.0");
-        }
-
-        final long totalCount = getCount();
-        final long tailTotal = totalCount - Math.round(totalCount * factor);
-        long tailCount = 0L;
-
-        // reverse search the intervals ('tailCount' from end)
-        for (int i = counts.length - 1; i >= 0; i--)
-        {
-            if (0L != counts[i])
-            {
-                tailCount += counts[i];
-                if (tailCount >= tailTotal)
-                {
-                    return upperBounds[i];
-                }
-            }
-        }
-
-        return 0L;
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder();
-
-        sb.append("Histogram{");
-
-        sb.append("min=").append(getMin()).append(", ");
-        sb.append("max=").append(getMax()).append(", ");
-        sb.append("mean=").append(getMean()).append(", ");
-        sb.append("99%=").append(getTwoNinesUpperBound()).append(", ");
-        sb.append("99.99%=").append(getFourNinesUpperBound()).append(", ");
-
-        sb.append('[');
-        for (int i = 0, size = counts.length; i < size; i++)
-        {
-            sb.append(upperBounds[i]).append('=').append(counts[i]).append(", ");
-        }
-
-        if (counts.length > 0)
-        {
-            sb.setLength(sb.length() - 2);
-        }
-        sb.append(']');
-
-        sb.append('}');
-
-        return sb.toString();
-    }
-}


=====================================
src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java
=====================================
--- a/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java
+++ b/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java
@@ -11,7 +11,7 @@ import java.util.concurrent.ThreadFactory;
 public class BasicExecutor implements Executor
 {
     private final ThreadFactory factory;
-    private final Queue<Thread> threads = new ConcurrentLinkedQueue<Thread>();
+    private final Queue<Thread> threads = new ConcurrentLinkedQueue<>();
 
     public BasicExecutor(ThreadFactory factory)
     {


=====================================
src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java
=====================================
--- a/src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java
+++ b/src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java
@@ -27,17 +27,17 @@ import java.util.*;
 class ConsumerRepository<T> implements Iterable<ConsumerInfo>
 {
     private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
-        new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
+        new IdentityHashMap<>();
     private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
-        new IdentityHashMap<Sequence, ConsumerInfo>();
-    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
+        new IdentityHashMap<>();
+    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<>();
 
     public void add(
         final EventProcessor eventprocessor,
         final EventHandler<? super T> handler,
         final SequenceBarrier barrier)
     {
-        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
+        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<>(eventprocessor, handler, barrier);
         eventProcessorInfoByEventHandler.put(handler, consumerInfo);
         eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
         consumerInfos.add(consumerInfo);
@@ -45,14 +45,14 @@ class ConsumerRepository<T> implements Iterable<ConsumerInfo>
 
     public void add(final EventProcessor processor)
     {
-        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(processor, null, null);
+        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<>(processor, null, null);
         eventProcessorInfoBySequence.put(processor.getSequence(), consumerInfo);
         consumerInfos.add(consumerInfo);
     }
 
     public void add(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier)
     {
-        final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<T>(workerPool, sequenceBarrier);
+        final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<>(workerPool, sequenceBarrier);
         consumerInfos.add(workerPoolInfo);
         for (Sequence sequence : workerPool.getWorkerSequences())
         {
@@ -62,7 +62,7 @@ class ConsumerRepository<T> implements Iterable<ConsumerInfo>
 
     public Sequence[] getLastSequenceInChain(boolean includeStopped)
     {
-        List<Sequence> lastSequence = new ArrayList<Sequence>();
+        List<Sequence> lastSequence = new ArrayList<>();
         for (ConsumerInfo consumerInfo : consumerInfos)
         {
             if ((includeStopped || consumerInfo.isRunning()) && consumerInfo.isEndOfChain())


=====================================
src/main/java/com/lmax/disruptor/dsl/Disruptor.java
=====================================
--- a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java
+++ b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java
@@ -60,9 +60,9 @@ public class Disruptor<T>
 {
     private final RingBuffer<T> ringBuffer;
     private final Executor executor;
-    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
+    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();
+    private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>();
 
     /**
      * Create a new Disruptor. Will default to {@link com.lmax.disruptor.BlockingWaitStrategy} and
@@ -155,11 +155,14 @@ public class Disruptor<T>
      * process events before handler <code>B</code>:</p>
      * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>
      *
+     * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>
+     *
      * @param handlers the event handlers that will process events.
      * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
      */
     @SuppressWarnings("varargs")
-    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
+    @SafeVarargs
+    public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
     {
         return createEventProcessors(new Sequence[0], handlers);
     }
@@ -177,10 +180,13 @@ public class Disruptor<T>
      * {@link EventHandlerGroup#handleEventsWith(EventProcessorFactory...)} and {@link EventHandlerGroup#then(EventProcessorFactory...)}
      * which do have barrier sequences to provide.</p>
      *
+     * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>
+     *
      * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events.
      * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
      */
-    public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
+    @SafeVarargs
+    public final EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
     {
         final Sequence[] barrierSequences = new Sequence[0];
         return createEventProcessors(barrierSequences, eventProcessorFactories);
@@ -212,7 +218,7 @@ public class Disruptor<T>
 
         ringBuffer.addGatingSequences(sequences);
 
-        return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
+        return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors));
     }
 
 
@@ -224,8 +230,9 @@ public class Disruptor<T>
      * @param workHandlers the work handlers that will process events.
      * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
      */
+    @SafeVarargs
     @SuppressWarnings("varargs")
-    public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
+    public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
     {
         return createWorkerPool(new Sequence[0], workHandlers);
     }
@@ -270,7 +277,7 @@ public class Disruptor<T>
      */
     public ExceptionHandlerSetting<T> handleExceptionsFor(final EventHandler<T> eventHandler)
     {
-        return new ExceptionHandlerSetting<T>(eventHandler, consumerRepository);
+        return new ExceptionHandlerSetting<>(eventHandler, consumerRepository);
     }
 
     /**
@@ -283,8 +290,9 @@ public class Disruptor<T>
      *                 that will form the barrier for subsequent handlers or processors.
      * @return an {@link EventHandlerGroup} that can be used to setup a dependency barrier over the specified event handlers.
      */
+    @SafeVarargs
     @SuppressWarnings("varargs")
-    public EventHandlerGroup<T> after(final EventHandler<T>... handlers)
+    public final EventHandlerGroup<T> after(final EventHandler<T>... handlers)
     {
         final Sequence[] sequences = new Sequence[handlers.length];
         for (int i = 0, handlersLength = handlers.length; i < handlersLength; i++)
@@ -292,7 +300,7 @@ public class Disruptor<T>
             sequences[i] = consumerRepository.getSequenceFor(handlers[i]);
         }
 
-        return new EventHandlerGroup<T>(this, consumerRepository, sequences);
+        return new EventHandlerGroup<>(this, consumerRepository, sequences);
     }
 
     /**
@@ -310,7 +318,7 @@ public class Disruptor<T>
             consumerRepository.add(processor);
         }
 
-        return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
+        return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors));
     }
 
     /**
@@ -551,7 +559,7 @@ public class Disruptor<T>
             final EventHandler<? super T> eventHandler = eventHandlers[i];
 
             final BatchEventProcessor<T> batchEventProcessor =
-                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
+                new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
 
             if (exceptionHandler != null)
             {
@@ -564,7 +572,7 @@ public class Disruptor<T>
 
         updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
 
-        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
+        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
     }
 
     private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
@@ -596,7 +604,7 @@ public class Disruptor<T>
         final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
     {
         final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
-        final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
+        final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
 
 
         consumerRepository.add(workerPool, sequenceBarrier);
@@ -605,7 +613,7 @@ public class Disruptor<T>
 
         updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
 
-        return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
+        return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
     }
 
     private void checkNotStarted()


=====================================
src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
=====================================
--- a/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
+++ b/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
@@ -57,7 +57,7 @@ public class EventHandlerGroup<T>
         System.arraycopy(
             otherHandlerGroup.sequences, 0,
             combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length);
-        return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences);
+        return new EventHandlerGroup<>(disruptor, consumerRepository, combinedSequences);
     }
 
     /**
@@ -77,7 +77,7 @@ public class EventHandlerGroup<T>
         }
         System.arraycopy(sequences, 0, combinedSequences, processors.length, sequences.length);
 
-        return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences);
+        return new EventHandlerGroup<>(disruptor, consumerRepository, combinedSequences);
     }
 
     /**
@@ -92,7 +92,8 @@ public class EventHandlerGroup<T>
      * @param handlers the batch handlers that will process events.
      * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
      */
-    public EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
+    @SafeVarargs
+    public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
     {
         return handleEventsWith(handlers);
     }
@@ -107,7 +108,8 @@ public class EventHandlerGroup<T>
      * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events.
      * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
      */
-    public EventHandlerGroup<T> then(final EventProcessorFactory<T>... eventProcessorFactories)
+    @SafeVarargs
+    public final EventHandlerGroup<T> then(final EventProcessorFactory<T>... eventProcessorFactories)
     {
         return handleEventsWith(eventProcessorFactories);
     }
@@ -125,7 +127,8 @@ public class EventHandlerGroup<T>
      * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool.
      * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
      */
-    public EventHandlerGroup<T> thenHandleEventsWithWorkerPool(final WorkHandler<? super T>... handlers)
+    @SafeVarargs
+    public final EventHandlerGroup<T> thenHandleEventsWithWorkerPool(final WorkHandler<? super T>... handlers)
     {
         return handleEventsWithWorkerPool(handlers);
     }
@@ -142,7 +145,8 @@ public class EventHandlerGroup<T>
      * @param handlers the batch handlers that will process events.
      * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
      */
-    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
+    @SafeVarargs
+    public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
     {
         return disruptor.createEventProcessors(sequences, handlers);
     }
@@ -159,7 +163,8 @@ public class EventHandlerGroup<T>
      * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events.
      * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
      */
-    public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
+    @SafeVarargs
+    public final EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
     {
         return disruptor.createEventProcessors(sequences, eventProcessorFactories);
     }
@@ -177,7 +182,8 @@ public class EventHandlerGroup<T>
      * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool.
      * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
      */
-    public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<? super T>... handlers)
+    @SafeVarargs
+    public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<? super T>... handlers)
     {
         return disruptor.createWorkerPool(sequences, handlers);
     }


=====================================
src/main/java/com/lmax/disruptor/dsl/ExceptionHandlerSetting.java
=====================================
--- a/src/main/java/com/lmax/disruptor/dsl/ExceptionHandlerSetting.java
+++ b/src/main/java/com/lmax/disruptor/dsl/ExceptionHandlerSetting.java
@@ -17,6 +17,7 @@ package com.lmax.disruptor.dsl;
 
 import com.lmax.disruptor.BatchEventProcessor;
 import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventProcessor;
 import com.lmax.disruptor.ExceptionHandler;
 
 /**
@@ -44,10 +45,20 @@ public class ExceptionHandlerSetting<T>
      *
      * @param exceptionHandler the exception handler to use.
      */
+    @SuppressWarnings("unchecked")
     public void with(ExceptionHandler<? super T> exceptionHandler)
     {
-        ((BatchEventProcessor<T>) consumerRepository.getEventProcessorFor(eventHandler))
-            .setExceptionHandler(exceptionHandler);
-        consumerRepository.getBarrierFor(eventHandler).alert();
+        final EventProcessor eventProcessor = consumerRepository.getEventProcessorFor(eventHandler);
+        if (eventProcessor instanceof BatchEventProcessor)
+        {
+            ((BatchEventProcessor<T>) eventProcessor).setExceptionHandler(exceptionHandler);
+            consumerRepository.getBarrierFor(eventHandler).alert();
+        }
+        else
+        {
+            throw new RuntimeException(
+                "EventProcessor: " + eventProcessor + " is not a BatchEventProcessor " +
+                "and does not support exception handlers");
+        }
     }
 }


=====================================
src/main/java/com/lmax/disruptor/util/ThreadHints.java
=====================================
--- /dev/null
+++ b/src/main/java/com/lmax/disruptor/util/ThreadHints.java
@@ -0,0 +1,77 @@
+/*  Copyright 2016 Gil Tene
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.lmax.disruptor.util;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+
+
+import static java.lang.invoke.MethodType.methodType;
+
+/**
+ * This class captures possible hints that may be used by some
+ * runtimes to improve code performance. It is intended to capture hinting
+ * behaviours that are implemented in or anticipated to be spec'ed under the
+ * {@link java.lang.Thread} class in some Java SE versions, but missing in prior
+ * versions.
+ */
+public final class ThreadHints
+{
+    private static final MethodHandle ON_SPIN_WAIT_METHOD_HANDLE;
+
+    static
+    {
+        final MethodHandles.Lookup lookup = MethodHandles.lookup();
+
+        MethodHandle methodHandle = null;
+        try
+        {
+            methodHandle = lookup.findStatic(Thread.class, "onSpinWait", methodType(void.class));
+        }
+        catch (final Exception ignore)
+        {
+        }
+
+        ON_SPIN_WAIT_METHOD_HANDLE = methodHandle;
+    }
+
+    private ThreadHints()
+    {
+    }
+
+    /**
+     * Indicates that the caller is momentarily unable to progress, until the
+     * occurrence of one or more actions on the part of other activities.  By
+     * invoking this method within each iteration of a spin-wait loop construct,
+     * the calling thread indicates to the runtime that it is busy-waiting. The runtime
+     * may take action to improve the performance of invoking spin-wait loop constructions.
+     */
+    public static void onSpinWait()
+    {
+        // Call java.lang.Thread.onSpinWait() on Java SE versions that support it. Do nothing otherwise.
+        // This should optimize away to either nothing or to an inlining of java.lang.Thread.onSpinWait()
+        if (null != ON_SPIN_WAIT_METHOD_HANDLE)
+        {
+            try
+            {
+                ON_SPIN_WAIT_METHOD_HANDLE.invokeExact();
+            }
+            catch (final Throwable ignore)
+            {
+            }
+        }
+    }
+}
\ No newline at end of file


=====================================
src/main/java/com/lmax/disruptor/util/Util.java
=====================================
--- a/src/main/java/com/lmax/disruptor/util/Util.java
+++ b/src/main/java/com/lmax/disruptor/util/Util.java
@@ -16,16 +16,15 @@
 package com.lmax.disruptor.util;
 
 import java.lang.reflect.Field;
-import java.nio.Buffer;
-import java.nio.ByteBuffer;
 import java.security.AccessController;
 import java.security.PrivilegedExceptionAction;
 
-import sun.misc.Unsafe;
-
 import com.lmax.disruptor.EventProcessor;
 import com.lmax.disruptor.Sequence;
 
+
+import sun.misc.Unsafe;
+
 /**
  * Set of common functions used by the Disruptor
  */
@@ -127,28 +126,6 @@ public final class Util
     }
 
     /**
-     * Gets the address value for the memory that backs a direct byte buffer.
-     *
-     * @param buffer a direct buffer to get the address from.
-     * @return The system address for the buffers
-     */
-    @Deprecated
-    public static long getAddressFromDirectByteBuffer(ByteBuffer buffer)
-    {
-        try
-        {
-            Field addressField = Buffer.class.getDeclaredField("address");
-            addressField.setAccessible(true);
-            return addressField.getLong(buffer);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException("Unable to address field from ByteBuffer", e);
-        }
-    }
-
-
-    /**
      * Calculate the log base 2 of the supplied integer, essentially reports the location
      * of the highest bit.
      *


=====================================
src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java
=====================================
--- a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java
+++ b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java
@@ -1,24 +1,16 @@
 package com.lmax.disruptor.offheap;
 
+import com.lmax.disruptor.*;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.locks.LockSupport;
 
-import com.lmax.disruptor.AbstractPerfTestDisruptor;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.DataProvider;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.Sequence;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.Sequencer;
-import com.lmax.disruptor.SingleProducerSequencer;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.YieldingWaitStrategy;
-import com.lmax.disruptor.util.DaemonThreadFactory;
-
 public class OneToOneOffHeapThroughputTest extends AbstractPerfTestDisruptor
 {
     private static final int BLOCK_SIZE = 256;
@@ -99,9 +91,10 @@ public class OneToOneOffHeapThroughputTest extends AbstractPerfTestDisruptor
         @Override
         public void onEvent(ByteBuffer event, long sequence, boolean endOfBatch) throws Exception
         {
-            for (int i = 0; i < BLOCK_SIZE; i += 8)
+            final int start = event.position();
+            for (int i = start, size = start + BLOCK_SIZE; i < size; i += 8)
             {
-                total += event.getLong();
+                total += event.getLong(i);
             }
 
             if (--expectedCount == 0)
@@ -134,7 +127,7 @@ public class OneToOneOffHeapThroughputTest extends AbstractPerfTestDisruptor
             @Override
             protected ByteBuffer initialValue()
             {
-                return buffer.duplicate();
+                return buffer.duplicate().order(ByteOrder.nativeOrder());
             }
         };
 
@@ -143,7 +136,7 @@ public class OneToOneOffHeapThroughputTest extends AbstractPerfTestDisruptor
             this.sequencer = sequencer;
             this.entrySize = entrySize;
             this.mask = sequencer.getBufferSize() - 1;
-            buffer = ByteBuffer.allocateDirect(sequencer.getBufferSize() * entrySize);
+            buffer = ByteBuffer.allocateDirect(sequencer.getBufferSize() * entrySize).order(ByteOrder.nativeOrder());
         }
 
         public void addGatingSequences(Sequence sequence)


=====================================
src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java
=====================================
--- a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java
+++ b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java
@@ -1,32 +1,30 @@
 package com.lmax.disruptor.offheap;
 
+import com.lmax.disruptor.*;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.locks.LockSupport;
 
-import com.lmax.disruptor.AbstractPerfTestDisruptor;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.YieldingWaitStrategy;
-import com.lmax.disruptor.util.DaemonThreadFactory;
-
 public class OneToOneOnHeapThroughputTest extends AbstractPerfTestDisruptor
 {
     private static final int BLOCK_SIZE = 256;
     private static final int BUFFER_SIZE = 1024 * 1024;
     private static final long ITERATIONS = 1000 * 1000 * 10L;
 
+    private static final boolean SLICED_BUFFER = Boolean.getBoolean("sliced");
     private final Executor executor = Executors.newFixedThreadPool(1, DaemonThreadFactory.INSTANCE);
     private final WaitStrategy waitStrategy = new YieldingWaitStrategy();
     private final RingBuffer<ByteBuffer> buffer =
-        RingBuffer.createSingleProducer(BufferFactory.direct(BLOCK_SIZE), BUFFER_SIZE, waitStrategy);
-    private final ByteBufferHandler handler = new ByteBufferHandler();
+        RingBuffer.createSingleProducer(
+            SLICED_BUFFER ? SlicedBufferFactory.direct(BLOCK_SIZE, BUFFER_SIZE) : BufferFactory.direct(BLOCK_SIZE),
+            BUFFER_SIZE, waitStrategy);
+        private final ByteBufferHandler handler = new ByteBufferHandler();
     private final BatchEventProcessor<ByteBuffer> processor =
         new BatchEventProcessor<ByteBuffer>(buffer, buffer.newBarrier(), handler);
 
@@ -103,7 +101,7 @@ public class OneToOneOnHeapThroughputTest extends AbstractPerfTestDisruptor
         {
             for (int i = 0; i < BLOCK_SIZE; i += 8)
             {
-                total += event.getLong();
+                total += event.getLong(i);
             }
 
             if (--expectedCount == 0)
@@ -140,11 +138,11 @@ public class OneToOneOnHeapThroughputTest extends AbstractPerfTestDisruptor
         {
             if (isDirect)
             {
-                return ByteBuffer.allocateDirect(size);
+                return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
             }
             else
             {
-                return ByteBuffer.allocate(size);
+                return ByteBuffer.allocate(size).order(ByteOrder.nativeOrder());
             }
         }
 
@@ -159,4 +157,51 @@ public class OneToOneOnHeapThroughputTest extends AbstractPerfTestDisruptor
             return new BufferFactory(false, size);
         }
     }
+
+    private static final class SlicedBufferFactory implements EventFactory<ByteBuffer>
+    {
+        private final boolean isDirect;
+        private final int size;
+        private final int total;
+        private ByteBuffer buffer;
+
+        private SlicedBufferFactory(boolean isDirect, int size, int total)
+        {
+            this.isDirect = isDirect;
+            this.size = size;
+            this.total = total;
+            this.buffer =
+                (isDirect ? ByteBuffer.allocateDirect(size * total) : ByteBuffer.allocate(size * total))
+                    .order(ByteOrder.nativeOrder());
+            this.buffer.limit(0);
+        }
+
+        @Override
+        public ByteBuffer newInstance()
+        {
+            if (this.buffer.limit() == this.buffer.capacity())
+            {
+                this.buffer =
+                    (isDirect ? ByteBuffer.allocateDirect(size * total) : ByteBuffer.allocate(size * total))
+                        .order(ByteOrder.nativeOrder());
+                this.buffer.limit(0);
+            }
+            final int limit = this.buffer.limit();
+            this.buffer.limit(limit + size);
+            this.buffer.position(limit);
+            final ByteBuffer slice = this.buffer.slice().order(ByteOrder.nativeOrder());
+            return slice;
+        }
+
+        public static SlicedBufferFactory direct(int size, int total)
+        {
+            return new SlicedBufferFactory(true, size, total);
+        }
+
+        @SuppressWarnings("unused")
+        public static SlicedBufferFactory heap(int size, int total)
+        {
+            return new SlicedBufferFactory(false, size, total);
+        }
+    }
 }


=====================================
src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
+++ b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
@@ -205,7 +205,7 @@ public final class BatchEventProcessorTest
         };
 
         final LatchLifeCycleHandler h1 = new LatchLifeCycleHandler();
-        final BatchEventProcessor p1 = new BatchEventProcessor<Object>(dp, barrier, h1);
+        final BatchEventProcessor p1 = new BatchEventProcessor<>(dp, barrier, h1);
 
         Thread t1 = new Thread(p1);
         p1.halt();
@@ -217,7 +217,7 @@ public final class BatchEventProcessorTest
         for (int i = 0; i < 1000; i++)
         {
             final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
-            final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+            final BatchEventProcessor p2 = new BatchEventProcessor<>(dp, barrier, h2);
             Thread t2 = new Thread(p2);
             t2.start();
             p2.halt();
@@ -229,7 +229,7 @@ public final class BatchEventProcessorTest
         for (int i = 0; i < 1000; i++)
         {
             final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler();
-            final BatchEventProcessor p2 = new BatchEventProcessor<Object>(dp, barrier, h2);
+            final BatchEventProcessor p2 = new BatchEventProcessor<>(dp, barrier, h2);
             Thread t2 = new Thread(p2);
             t2.start();
             Thread.yield();


=====================================
src/test/java/com/lmax/disruptor/RingBufferTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/RingBufferTest.java
+++ b/src/test/java/com/lmax/disruptor/RingBufferTest.java
@@ -599,6 +599,22 @@ public class RingBufferTest
         try
         {
             ringBuffer.publishEvents(new EventTranslator[]{translator, translator, translator, translator}, 1, 0);
+        }
+        finally
+        {
+            assertEmptyRingBuffer(ringBuffer);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldNotTryPublishEventsWhenBatchSizeIs0() throws Exception
+    {
+        RingBuffer<Object[]> ringBuffer = RingBuffer.createSingleProducer(new ArrayFactory(1), 4);
+        EventTranslator<Object[]> translator = new NoArgEventTranslator();
+
+        try
+        {
             ringBuffer.tryPublishEvents(new EventTranslator[]{translator, translator, translator, translator}, 1, 0);
         }
         finally
@@ -717,7 +733,22 @@ public class RingBufferTest
         try
         {
             ringBuffer.publishEvents(translator, 1, 0, new String[]{"Foo", "Foo"});
-            assertFalse(ringBuffer.tryPublishEvents(translator, 1, 0, new String[]{"Foo", "Foo"}));
+        }
+        finally
+        {
+            assertEmptyRingBuffer(ringBuffer);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldNotTryPublishEventsOneArgWhenBatchSizeIs0() throws Exception
+    {
+        RingBuffer<Object[]> ringBuffer = RingBuffer.createSingleProducer(new ArrayFactory(1), 4);
+        EventTranslatorOneArg<Object[], String> translator = new OneArgEventTranslator();
+
+        try
+        {
+            ringBuffer.tryPublishEvents(translator, 1, 0, new String[]{"Foo", "Foo"});
         }
         finally
         {
@@ -829,10 +860,22 @@ public class RingBufferTest
         try
         {
             ringBuffer.publishEvents(translator, 1, 0, new String[]{"Foo", "Foo"}, new String[]{"Bar", "Bar"});
-            assertFalse(
-                ringBuffer.tryPublishEvents(
-                    translator, 1, 0, new String[]{"Foo", "Foo"},
-                    new String[]{"Bar", "Bar"}));
+        }
+        finally
+        {
+            assertEmptyRingBuffer(ringBuffer);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldNotTryPublishEventsTwoArgWhenBatchSizeIs0() throws Exception
+    {
+        RingBuffer<Object[]> ringBuffer = RingBuffer.createSingleProducer(new ArrayFactory(1), 4);
+        EventTranslatorTwoArg<Object[], String, String> translator = new TwoArgEventTranslator();
+
+        try
+        {
+            ringBuffer.tryPublishEvents(translator, 1, 0, new String[]{"Foo", "Foo"}, new String[]{"Bar", "Bar"});
         }
         finally
         {
@@ -946,10 +989,24 @@ public class RingBufferTest
             ringBuffer.publishEvents(
                 translator, 1, 0, new String[]{"Foo", "Foo"}, new String[]{"Bar", "Bar"},
                 new String[]{"Baz", "Baz"});
-            assertFalse(
-                ringBuffer.tryPublishEvents(
+        }
+        finally
+        {
+            assertEmptyRingBuffer(ringBuffer);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldNotTryPublishEventsThreeArgWhenBatchSizeIs0() throws Exception
+    {
+        RingBuffer<Object[]> ringBuffer = RingBuffer.createSingleProducer(new ArrayFactory(1), 4);
+        EventTranslatorThreeArg<Object[], String, String, String> translator = new ThreeArgEventTranslator();
+
+        try
+        {
+            ringBuffer.tryPublishEvents(
                     translator, 1, 0, new String[]{"Foo", "Foo"},
-                    new String[]{"Bar", "Bar"}, new String[]{"Baz", "Baz"}));
+                    new String[]{"Bar", "Bar"}, new String[]{"Baz", "Baz"});
         }
         finally
         {
@@ -1079,11 +1136,25 @@ public class RingBufferTest
                     "Foo2", "Bar2",
                     "Baz2", "Bam2"
                 });
-            assertFalse(
-                ringBuffer.tryPublishEvents(
+        }
+        finally
+        {
+            assertEmptyRingBuffer(ringBuffer);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldNotTryPublishEventsVarArgWhenBatchSizeIs0() throws Exception
+    {
+        RingBuffer<Object[]> ringBuffer = RingBuffer.createSingleProducer(new ArrayFactory(1), 4);
+        VarArgEventTranslator translator = new VarArgEventTranslator();
+
+        try
+        {
+            ringBuffer.tryPublishEvents(
                     translator, 1, 0, new String[]{"Foo0", "Bar0", "Baz0", "Bam0"},
                     new String[]{"Foo1", "Bar1", "Baz1", "Bam1"},
-                    new String[]{"Foo2", "Bar2", "Baz2", "Bam2"}));
+                    new String[]{"Foo2", "Bar2", "Baz2", "Bam2"});
         }
         finally
         {


=====================================
src/test/java/com/lmax/disruptor/collections/HistogramTest.java deleted
=====================================
--- a/src/test/java/com/lmax/disruptor/collections/HistogramTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Copyright 2011 LMAX Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.lmax.disruptor.collections;
-
-import org.junit.Test;
-
-import java.math.BigDecimal;
-
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-public final class HistogramTest
-{
-    public static final long[] INTERVALS = new long[]{1, 10, 100, 1000, Long.MAX_VALUE};
-    private Histogram histogram = new Histogram(INTERVALS);
-
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowExceptionOnInvalidBounds()
-    {
-        new Histogram(new long[]{});
-    }
-
-    @Test
-    public void shouldSizeBasedOnBucketConfiguration()
-    {
-        assertThat(Long.valueOf(histogram.getSize()), is(Long.valueOf(INTERVALS.length)));
-    }
-
-    @Test
-    public void shouldWalkIntervals()
-    {
-        for (int i = 0, size = histogram.getSize(); i < size; i++)
-        {
-            assertThat(Long.valueOf(histogram.getUpperBoundAt(i)), is(Long.valueOf(INTERVALS[i])));
-        }
-    }
-
-    @Test
-    public void shouldConfirmIntervalsAreInitialised()
-    {
-        for (int i = 0, size = histogram.getSize(); i < size; i++)
-        {
-            assertThat(Long.valueOf(histogram.getCountAt(i)), is(Long.valueOf(0L)));
-        }
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowExceptionWhenIntervalLessThanOrEqualToZero()
-    {
-        new Histogram(new long[]{-1, 10, 20});
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowExceptionWhenIntervalDoNotIncrease()
-    {
-        new Histogram(new long[]{1, 10, 10, 20});
-    }
-
-    @Test
-    public void shouldAddObservation()
-    {
-        assertTrue(histogram.addObservation(10L));
-        assertThat(Long.valueOf(histogram.getCountAt(1)), is(Long.valueOf(1L)));
-    }
-
-    @Test
-    public void shouldNotAddObservation()
-    {
-        Histogram histogram = new Histogram(new long[]{10, 20, 30});
-        assertFalse(histogram.addObservation(31));
-    }
-
-    @Test
-    public void shouldAddObservations()
-    {
-        addObservations(histogram, 10L, 30L, 50L);
-
-        Histogram histogram2 = new Histogram(INTERVALS);
-        addObservations(histogram2, 10L, 20L, 25L);
-
-        histogram.addObservations(histogram2);
-
-        assertThat(Long.valueOf(6L), is(Long.valueOf(histogram.getCount())));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowExceptionWhenIntervalsDoNotMatch()
-    {
-        long[] intervals = INTERVALS.clone();
-        intervals[0]++;
-
-        Histogram histogram2 = new Histogram(intervals);
-        histogram.addObservations(histogram2);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowExceptionWhenIntervalsLengthDoNotMatch()
-    {
-        Histogram histogram2 = new Histogram(new long[]{1L, 2L, 3L});
-        histogram.addObservations(histogram2);
-    }
-
-    @Test
-    public void shouldClearCounts()
-    {
-        addObservations(histogram, 1L, 7L, 10L, 3000L);
-        histogram.clear();
-
-        for (int i = 0, size = histogram.getSize(); i < size; i++)
-        {
-            assertThat(Long.valueOf(histogram.getCountAt(i)), is(Long.valueOf(0)));
-        }
-    }
-
-    @Test
-    public void shouldCountTotalObservations()
-    {
-        addObservations(histogram, 1L, 7L, 10L, 3000L);
-
-        assertThat(Long.valueOf(histogram.getCount()), is(Long.valueOf(4L)));
-    }
-
-    @Test
-    public void shouldGetMeanZeroObservation()
-    {
-        final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
-        final Histogram histogram = new Histogram(intervals);
-
-        assertThat(histogram.getMean(), is(BigDecimal.ZERO));
-    }
-
-    @Test
-    public void shouldGetMeanObservation()
-    {
-        final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
-        final Histogram histogram = new Histogram(intervals);
-
-        addObservations(histogram, 1L, 7L, 10L, 10L, 11L, 144L);
-
-        assertThat(histogram.getMean(), is(new BigDecimal("32.67")));
-    }
-
-    @Test
-    public void shouldCorrectMeanForSkewInTopAndBottomPopulatedIntervals()
-    {
-        final long[] intervals = new long[]{100, 110, 120, 130, 140, 150, 1000, 10000};
-        final Histogram histogram = new Histogram(intervals);
-
-        for (long i = 100; i < 152; i++)
-        {
-            histogram.addObservation(i);
-        }
-
-        assertThat(histogram.getMean(), is(new BigDecimal("125.02")));
-    }
-
-    @Test
-    public void shouldGetMaxObservation()
-    {
-        addObservations(histogram, 1L, 7L, 10L, 10L, 11L, 144L);
-
-        assertThat(Long.valueOf(histogram.getMax()), is(Long.valueOf(144L)));
-    }
-
-    @Test
-    public void shouldGetMinObservation()
-    {
-        addObservations(histogram, 1L, 7L, 10L, 10L, 11L, 144L);
-
-        assertThat(Long.valueOf(histogram.getMin()), is(Long.valueOf(1L)));
-    }
-
-    @Test
-    public void shouldGetMinAndMaxOfSingleObservation()
-    {
-        addObservations(histogram, 10L);
-
-        assertThat(Long.valueOf(histogram.getMin()), is(Long.valueOf(10L)));
-        assertThat(Long.valueOf(histogram.getMax()), is(Long.valueOf(10L)));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldGetExceptionUpperBoundForFactorZero()
-    {
-        final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
-        final Histogram histogram = new Histogram(intervals);
-
-        Long.valueOf(histogram.getUpperBoundForFactor(0.0));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldGetExceptionUpperBoundForFactorOne()
-    {
-        final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
-        final Histogram histogram = new Histogram(intervals);
-
-        Long.valueOf(histogram.getUpperBoundForFactor(1.0));
-    }
-
-    @Test
-    public void shouldGetZeroUpperBound()
-    {
-        final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
-        final Histogram histogram = new Histogram(intervals);
-
-        assertThat(Long.valueOf(histogram.getUpperBoundForFactor(0.5)), is(Long.valueOf(0L)));
-    }
-
-    @Test
-    public void shouldGetTwoNinesUpperBound()
-    {
-        final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
-        final Histogram histogram = new Histogram(intervals);
-
-        for (long i = 1; i < 101; i++)
-        {
-            histogram.addObservation(i);
-        }
-
-        assertThat(Long.valueOf(histogram.getTwoNinesUpperBound()), is(Long.valueOf(100L)));
-    }
-
-    @Test
-    public void shouldGetFourNinesUpperBound()
-    {
-        final long[] intervals = new long[]{1, 10, 100, 1000, 10000};
-        final Histogram histogram = new Histogram(intervals);
-
-        for (long i = 1; i < 102; i++)
-        {
-            histogram.addObservation(i);
-        }
-
-        assertThat(Long.valueOf(histogram.getFourNinesUpperBound()), is(Long.valueOf(1000L)));
-    }
-
-    @Test
-    public void shouldToString()
-    {
-        addObservations(histogram, 1L, 7L, 10L, 300L);
-
-        String expectedResults =
-            "Histogram{min=1, max=300, mean=53.25, 99%=1000, 99.99%=1000, [1=1, 10=2, 100=0, 1000=1, 9223372036854775807=0]}";
-        assertThat(histogram.toString(), is(expectedResults));
-    }
-
-    private void addObservations(final Histogram histogram, final long... observations)
-    {
-        for (int i = 0, size = observations.length; i < size; i++)
-        {
-            histogram.addObservation(observations[i]);
-        }
-    }
-}


=====================================
src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
+++ b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
@@ -20,6 +20,7 @@ import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.EventProcessor;
 import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.EventTranslatorOneArg;
 import com.lmax.disruptor.ExceptionHandler;
 import com.lmax.disruptor.FatalExceptionHandler;
 import com.lmax.disruptor.RingBuffer;
@@ -134,6 +135,40 @@ public class DisruptorTest
         }
     }
 
+
+    @Test
+    public void shouldBatchOfEvents() throws Exception
+    {
+        final CountDownLatch eventCounter = new CountDownLatch(2);
+        disruptor.handleEventsWith(new EventHandler<TestEvent>()
+        {
+            @Override
+            public void onEvent(final TestEvent event, final long sequence, final boolean endOfBatch) throws Exception
+            {
+                eventCounter.countDown();
+            }
+        });
+
+        disruptor.start();
+
+        disruptor.publishEvents(
+            new EventTranslatorOneArg<TestEvent, Object>()
+            {
+                @Override
+                public void translateTo(final TestEvent event, final long sequence, Object arg)
+                {
+                    lastPublishedEvent = event;
+                }
+            },
+            new Object[] { "a", "b" }
+        );
+
+        if (!eventCounter.await(5, TimeUnit.SECONDS))
+        {
+            fail("Did not process event published before start was called. Missed events: " + eventCounter.getCount());
+        }
+    }
+
     @Test
     public void shouldAddEventProcessorsAfterPublishing() throws Exception
     {
@@ -323,6 +358,7 @@ public class DisruptorTest
         disruptor.after(handler2);
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldSupportSpecifyingAExceptionHandlerForEventProcessors()
         throws Exception
@@ -341,6 +377,7 @@ public class DisruptorTest
         assertSame(testException, actualException);
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldOnlyApplyExceptionsHandlersSpecifiedViaHandleExceptionsWithOnNewEventProcessors()
         throws Exception


=====================================
src/test/java/com/lmax/disruptor/example/ShutdownOnError.java
=====================================
--- /dev/null
+++ b/src/test/java/com/lmax/disruptor/example/ShutdownOnError.java
@@ -0,0 +1,132 @@
+package com.lmax.disruptor.example;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ShutdownOnError
+{
+    private static class Event
+    {
+        public long value;
+
+        public static final EventFactory<Event> FACTORY = new EventFactory<Event>()
+        {
+            @Override
+            public Event newInstance()
+            {
+                return new Event();
+            }
+        };
+    }
+
+    private static class DefaultThreadFactory implements ThreadFactory
+    {
+        @Override
+        public Thread newThread(Runnable r)
+        {
+            return new Thread(r);
+        }
+    }
+
+    private static class Handler implements EventHandler<Event>
+    {
+        @Override
+        public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception
+        {
+            // do work, if a failure occurs throw exception.
+        }
+    }
+
+    private static final class ErrorHandler implements ExceptionHandler<Event>
+    {
+        private final AtomicBoolean running;
+
+        private ErrorHandler(AtomicBoolean running)
+        {
+            this.running = running;
+        }
+
+        @Override
+        public void handleEventException(Throwable ex, long sequence, Event event)
+        {
+            if (execeptionIsFatal(ex))
+            {
+                throw new RuntimeException(ex);
+            }
+        }
+
+        private boolean execeptionIsFatal(Throwable ex)
+        {
+            // Do what is appropriate here.
+            return true;
+        }
+
+        @Override
+        public void handleOnStartException(Throwable ex)
+        {
+
+        }
+
+        @Override
+        public void handleOnShutdownException(Throwable ex)
+        {
+
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        Disruptor<Event> disruptor = new Disruptor<>(Event.FACTORY, 1024, new DefaultThreadFactory());
+
+        AtomicBoolean running = new AtomicBoolean(true);
+
+        ErrorHandler errorHandler = new ErrorHandler(running);
+
+        final Handler handler = new Handler();
+        disruptor.handleEventsWith(handler);
+        disruptor.handleExceptionsFor(handler).with(errorHandler);
+
+        simplePublish(disruptor, running);
+    }
+
+    private static void simplePublish(Disruptor<Event> disruptor, AtomicBoolean running)
+    {
+        while (running.get())
+        {
+            disruptor.publishEvent(new EventTranslator<Event>()
+            {
+                @Override
+                public void translateTo(Event event, long sequence)
+                {
+                    event.value = sequence;
+                }
+            });
+        }
+    }
+
+    private static void smarterPublish(Disruptor<Event> disruptor, AtomicBoolean running)
+    {
+        final RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();
+
+        boolean publishOk;
+        do
+        {
+            publishOk = ringBuffer.tryPublishEvent(new EventTranslator<Event>()
+            {
+                @Override
+                public void translateTo(Event event, long sequence)
+                {
+                    event.value = sequence;
+                }
+            });
+        }
+        while (publishOk && running.get());
+    }
+}


=====================================
src/test/java/com/lmax/disruptor/util/UtilTest.java
=====================================
--- a/src/test/java/com/lmax/disruptor/util/UtilTest.java
+++ b/src/test/java/com/lmax/disruptor/util/UtilTest.java
@@ -16,15 +16,10 @@
 package com.lmax.disruptor.util;
 
 import com.lmax.disruptor.Sequence;
+
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertThat;
-
 public final class UtilTest
 {
     @Test
@@ -57,11 +52,4 @@ public final class UtilTest
 
         Assert.assertEquals(Long.MAX_VALUE, Util.getMinimumSequence(sequences));
     }
-
-    @Test
-    public void shouldGetByteBufferAddress() throws Exception
-    {
-        ByteBuffer buffer = ByteBuffer.allocateDirect(16);
-        assertThat(Util.getAddressFromDirectByteBuffer(buffer), is(not(0L)));
-    }
 }



View it on GitLab: https://salsa.debian.org/java-team/disruptor/commit/fa9451b4f7fd9acaee9df7b620f8ce48d806bfe0

-- 
View it on GitLab: https://salsa.debian.org/java-team/disruptor/commit/fa9451b4f7fd9acaee9df7b620f8ce48d806bfe0
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20180625/51b5e8d0/attachment.html>


More information about the pkg-java-commits mailing list