[jboss-xnio] 07/11: Revert "Revert "Imported Upstream version 3.3.3""

Markus Koschany apo-guest at moszumanska.debian.org
Mon Dec 21 22:41:48 UTC 2015


This is an automated email from the git hooks/post-receive script.

apo-guest pushed a commit to branch master
in repository jboss-xnio.

commit bfd68eb5d25756f2eefdf01a36a3389bce69e4ec
Author: Markus Koschany <apo at debian.org>
Date:   Mon Dec 21 23:20:43 2015 +0100

    Revert "Revert "Imported Upstream version 3.3.3""
    
    This reverts commit 45497b8d8818a62c0359395e2ea2e0701ba75be8.
---
 api/pom.xml                                        |   2 +-
 .../main/java/org/xnio/ByteBufferSlicePool.java    |  55 +++--
 api/src/main/java/org/xnio/XnioWorker.java         |  10 +-
 api/src/test/java/org/xnio/XnioWorkerTestCase.java |   5 +
 .../test/java/org/xnio/mock/XnioWorkerMock.java    |   6 +-
 nio-impl/pom.xml                                   |   2 +-
 .../src/main/java/org/xnio/nio/ChannelClosed.java  |  26 +++
 .../org/xnio/nio/NioSocketStreamConnection.java    |  10 +-
 .../src/main/java/org/xnio/nio/NioTcpServer.java   |  26 ++-
 .../main/java/org/xnio/nio/NioTcpServerHandle.java |   4 +-
 nio-impl/src/main/java/org/xnio/nio/NioXnio.java   |  33 ++-
 .../src/main/java/org/xnio/nio/NioXnioWorker.java  |  36 ++-
 .../{NioTcpServer.java => QueuedNioTcpServer.java} | 253 ++++++++++-----------
 .../org/xnio/nio/QueuedNioTcpServerHandle.java     |  60 +++++
 .../src/main/java/org/xnio/nio/WorkerThread.java   |  34 ++-
 .../xnio/nio/test/IllegalConnectionTestCase.java   |   2 +
 .../java/org/xnio/nio/test/TcpChannelTestCase.java |   7 +-
 .../org/xnio/nio/test/TcpConnectionTestCase.java   |   7 +-
 pom.xml                                            |   2 +-
 19 files changed, 383 insertions(+), 197 deletions(-)

diff --git a/api/pom.xml b/api/pom.xml
index 40724ae..35f64c4 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -36,7 +36,7 @@
     <parent>
         <groupId>org.jboss.xnio</groupId>
         <artifactId>xnio-all</artifactId>
-        <version>3.3.2.Final</version>
+        <version>3.3.3.Final</version>
     </parent>
 
     <dependencies>
diff --git a/api/src/main/java/org/xnio/ByteBufferSlicePool.java b/api/src/main/java/org/xnio/ByteBufferSlicePool.java
index 61913c4..2f5f1ef 100644
--- a/api/src/main/java/org/xnio/ByteBufferSlicePool.java
+++ b/api/src/main/java/org/xnio/ByteBufferSlicePool.java
@@ -61,7 +61,7 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
     private final ThreadLocal<ThreadLocalCache> localQueueHolder = new ThreadLocal<ThreadLocalCache>() {
         protected ThreadLocalCache initialValue() {
             //noinspection serial
-            return new ThreadLocalCache(this);
+            return new ThreadLocalCache();
         }
 
         public void remove() {
@@ -120,13 +120,16 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
 
     /** {@inheritDoc} */
     public Pooled<ByteBuffer> allocate() {
-        ThreadLocalCache localCache = localQueueHolder.get();
-        if(localCache.outstanding != threadLocalQueueSize) {
-            localCache.outstanding++;
-        }
-        Slice slice = localCache.queue.poll();
-        if (slice != null) {
-            return new PooledByteBuffer(slice, slice.slice());
+        Slice slice;
+        if (threadLocalQueueSize > 0) {
+            ThreadLocalCache localCache = localQueueHolder.get();
+            if(localCache.outstanding != threadLocalQueueSize) {
+                localCache.outstanding++;
+            }
+            slice = localCache.queue.poll();
+            if (slice != null) {
+                return new PooledByteBuffer(slice, slice.slice());
+            }
         }
         final Queue<Slice> sliceQueue = this.sliceQueue;
         slice = sliceQueue.poll();
@@ -159,17 +162,21 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
     }
 
     private void doFree(Slice region) {
-        final ThreadLocalCache localCache = localQueueHolder.get();
-        boolean cacheOk = false;
-        if(localCache.outstanding > 0) {
-            localCache.outstanding--;
-            cacheOk = true;
-        }
-        ArrayDeque<Slice> localQueue = localCache.queue;
-        if (localQueue.size() == threadLocalQueueSize || !cacheOk) {
-            sliceQueue.add(region);
+        if (threadLocalQueueSize > 0) {
+            final ThreadLocalCache localCache = localQueueHolder.get();
+            boolean cacheOk = false;
+            if(localCache.outstanding > 0) {
+                localCache.outstanding--;
+                cacheOk = true;
+            }
+            ArrayDeque<Slice> localQueue = localCache.queue;
+            if (localQueue.size() == threadLocalQueueSize || !cacheOk) {
+                sliceQueue.add(region);
+            } else {
+                localQueue.add(region);
+            }
         } else {
-            localQueue.add(region);
+            sliceQueue.add(region);
         }
     }
 
@@ -245,22 +252,24 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
 
     private final class ThreadLocalCache {
 
-        private final ThreadLocal<ThreadLocalCache> threadLocal;
-
         final ArrayDeque<Slice> queue =  new ArrayDeque<Slice>(threadLocalQueueSize) {
 
             /**
              * This sucks but there's no other way to ensure these buffers are returned to the pool.
              */
             protected void finalize() {
-                threadLocal.remove();
+                final ArrayDeque<Slice> deque = queue;
+                Slice slice = deque.poll();
+                while (slice != null) {
+                    doFree(slice);
+                    slice = deque.poll();
+                }
             }
         };
 
         int outstanding = 0;
 
-        private ThreadLocalCache(ThreadLocal<ThreadLocalCache> threadLocal) {
-            this.threadLocal = threadLocal;
+        ThreadLocalCache() {
         }
     }
 }
diff --git a/api/src/main/java/org/xnio/XnioWorker.java b/api/src/main/java/org/xnio/XnioWorker.java
index 52a01af..dacc224 100644
--- a/api/src/main/java/org/xnio/XnioWorker.java
+++ b/api/src/main/java/org/xnio/XnioWorker.java
@@ -199,7 +199,7 @@ public abstract class XnioWorker extends AbstractExecutorService implements Conf
             }
 
             public XnioIoThread getIoThread() {
-                return chooseThread();
+                return server.getIoThread();
             }
 
             public void close() throws IOException {
@@ -702,6 +702,14 @@ public abstract class XnioWorker extends AbstractExecutorService implements Conf
     }
 
     /**
+     * Get an I/O thread from this worker.  The thread is chosen based on the given hash code.
+     *
+     * @param hashCode the hash code
+     * @return the thread
+     */
+    public abstract XnioIoThread getIoThread(int hashCode);
+
+    /**
      * Get the user task to run once termination is complete.
      *
      * @return the termination task
diff --git a/api/src/test/java/org/xnio/XnioWorkerTestCase.java b/api/src/test/java/org/xnio/XnioWorkerTestCase.java
index 49c0566..4f20017 100644
--- a/api/src/test/java/org/xnio/XnioWorkerTestCase.java
+++ b/api/src/test/java/org/xnio/XnioWorkerTestCase.java
@@ -805,6 +805,11 @@ public class XnioWorkerTestCase {
             }
 
             @Override
+            public XnioIoThread getIoThread(final int hashCode) {
+                return new XnioIoThreadMock(this);
+            }
+
+            @Override
             public int getIoThreadCount() {
                 return 0;
             }
diff --git a/api/src/test/java/org/xnio/mock/XnioWorkerMock.java b/api/src/test/java/org/xnio/mock/XnioWorkerMock.java
index dc635a9..f489eef 100644
--- a/api/src/test/java/org/xnio/mock/XnioWorkerMock.java
+++ b/api/src/test/java/org/xnio/mock/XnioWorkerMock.java
@@ -97,6 +97,11 @@ public class XnioWorkerMock extends XnioWorker {
         return mockThread;
     }
 
+    @Override
+    public XnioIoThreadMock getIoThread(final int hashCode) {
+        return mockThread;
+    }
+
     /**
      * Returns the connect behavior of this worker mock.
      */
@@ -179,5 +184,4 @@ public class XnioWorkerMock extends XnioWorker {
     @Override
     public void awaitTermination() throws InterruptedException {
     }
-
 }
diff --git a/nio-impl/pom.xml b/nio-impl/pom.xml
index 4c0a65b..badecd8 100644
--- a/nio-impl/pom.xml
+++ b/nio-impl/pom.xml
@@ -31,7 +31,7 @@
     <parent>
         <groupId>org.jboss.xnio</groupId>
         <artifactId>xnio-all</artifactId>
-        <version>3.3.2.Final</version>
+        <version>3.3.3.Final</version>
     </parent>
     
     <properties>
diff --git a/nio-impl/src/main/java/org/xnio/nio/ChannelClosed.java b/nio-impl/src/main/java/org/xnio/nio/ChannelClosed.java
new file mode 100644
index 0000000..166bc93
--- /dev/null
+++ b/nio-impl/src/main/java/org/xnio/nio/ChannelClosed.java
@@ -0,0 +1,26 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2015 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.xnio.nio;
+
+/**
+ * @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
+ */
+public interface ChannelClosed {
+    void channelClosed();
+}
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java b/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java
index 867ec4c..fc18c5b 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java
@@ -34,14 +34,14 @@ import org.xnio.Options;
  */
 final class NioSocketStreamConnection extends AbstractNioStreamConnection {
 
-    private final NioTcpServerHandle serverConduit;
+    private final ChannelClosed closedHandle;
     private final NioSocketConduit conduit;
 
-    NioSocketStreamConnection(final WorkerThread workerThread, final SelectionKey key, final NioTcpServerHandle serverConduit) {
+    NioSocketStreamConnection(final WorkerThread workerThread, final SelectionKey key, final ChannelClosed closedHandle) {
         super(workerThread);
         conduit = new NioSocketConduit(workerThread, key, this);
         key.attach(conduit);
-        this.serverConduit = serverConduit;
+        this.closedHandle = closedHandle;
         setSinkConduit(conduit);
         setSourceConduit(conduit);
     }
@@ -135,8 +135,8 @@ final class NioSocketStreamConnection extends AbstractNioStreamConnection {
             conduit.getSocketChannel().close();
         } catch (ClosedChannelException ignored) {
         } finally {
-            final NioTcpServerHandle conduit = this.serverConduit;
-            if (conduit!= null) conduit.channelClosed();
+            final ChannelClosed closedHandle = this.closedHandle;
+            if (closedHandle!= null) closedHandle.channelClosed();
         }
     }
 
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java b/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
index 9e659d8..a8f8351 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
@@ -20,6 +20,7 @@ package org.xnio.nio;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
@@ -34,6 +35,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.jboss.logging.Logger;
 import org.xnio.IoUtils;
+import org.xnio.LocalSocketAddress;
 import org.xnio.Option;
 import org.xnio.ChannelListener;
 import org.xnio.OptionMap;
@@ -384,6 +386,25 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
         try {
             accepted = channel.accept();
             if (accepted != null) try {
+                final SocketAddress localAddress = accepted.getLocalAddress();
+                int hash;
+                if (localAddress instanceof InetSocketAddress) {
+                    final InetSocketAddress address = (InetSocketAddress) localAddress;
+                    hash = address.getAddress().hashCode() * 23 + address.getPort();
+                } else if (localAddress instanceof LocalSocketAddress) {
+                    hash = ((LocalSocketAddress) localAddress).getName().hashCode();
+                } else {
+                    hash = localAddress.hashCode();
+                }
+                final SocketAddress remoteAddress = accepted.getRemoteAddress();
+                if (remoteAddress instanceof InetSocketAddress) {
+                    final InetSocketAddress address = (InetSocketAddress) remoteAddress;
+                    hash = (address.getAddress().hashCode() * 23 + address.getPort()) * 23 + hash;
+                } else if (remoteAddress instanceof LocalSocketAddress) {
+                    hash = ((LocalSocketAddress) remoteAddress).getName().hashCode() * 23 + hash;
+                } else {
+                    hash = localAddress.hashCode() * 23 + hash;
+                }
                 accepted.configureBlocking(false);
                 final Socket socket = accepted.socket();
                 socket.setKeepAlive(keepAlive != 0);
@@ -391,8 +412,9 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
                 socket.setTcpNoDelay(tcpNoDelay != 0);
                 final int sendBuffer = this.sendBuffer;
                 if (sendBuffer > 0) socket.setSendBufferSize(sendBuffer);
-                final SelectionKey selectionKey = current.registerChannel(accepted);
-                final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(current, selectionKey, handle);
+                final WorkerThread ioThread = worker.getIoThread(hash);
+                final SelectionKey selectionKey = ioThread.registerChannel(accepted);
+                final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(ioThread, selectionKey, handle);
                 newConnection.setOption(Options.READ_TIMEOUT, Integer.valueOf(readTimeout));
                 newConnection.setOption(Options.WRITE_TIMEOUT, Integer.valueOf(writeTimeout));
                 ok = true;
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java b/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java
index 1b20c21..0dd63d3 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java
@@ -27,7 +27,7 @@ import static org.xnio.IoUtils.safeClose;
 /**
 * @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
 */
-final class NioTcpServerHandle extends NioHandle {
+final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
 
     private final Runnable freeTask;
     private final NioTcpServer server;
@@ -91,7 +91,7 @@ final class NioTcpServerHandle extends NioHandle {
         }
     }
 
-    void channelClosed() {
+    public void channelClosed() {
         final WorkerThread thread = getWorkerThread();
         if (thread == currentThread()) {
             freeConnection();
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioXnio.java b/nio-impl/src/main/java/org/xnio/nio/NioXnio.java
index e9d5280..e7fcc0c 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioXnio.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioXnio.java
@@ -215,22 +215,25 @@ final class NioXnio extends Xnio {
         return super.createFileSystemWatcher(name, options);
     }
 
-    private final ThreadLocal<Selector> selectorThreadLocal = new ThreadLocal<Selector>() {
+    private final ThreadLocal<FinalizableSelectorHolder> selectorThreadLocal = new ThreadLocal<FinalizableSelectorHolder>() {
         public void remove() {
             // if no selector was created, none will be closed
-            IoUtils.safeClose(get());
+            FinalizableSelectorHolder holder = get();
+            if(holder != null) {
+                IoUtils.safeClose(holder.selector);
+            }
             super.remove();
         }
     };
 
     Selector getSelector() throws IOException {
-        final ThreadLocal<Selector> threadLocal = selectorThreadLocal;
-        Selector selector = threadLocal.get();
-        if (selector == null) {
-            selector = tempSelectorCreator.open();
-            threadLocal.set(selector);
+        final ThreadLocal<FinalizableSelectorHolder> threadLocal = selectorThreadLocal;
+        FinalizableSelectorHolder holder = threadLocal.get();
+        if (holder == null) {
+            holder = new FinalizableSelectorHolder(tempSelectorCreator.open());
+            threadLocal.set(holder);
         }
-        return selector;
+        return holder.selector;
     }
 
     private static class DefaultSelectorCreator implements SelectorCreator {
@@ -292,4 +295,18 @@ final class NioXnio extends Xnio {
     protected static Closeable register(XnioServerMXBean serverMXBean) {
         return Xnio.register(serverMXBean);
     }
+
+    private static final class FinalizableSelectorHolder {
+        final Selector selector;
+
+
+        private FinalizableSelectorHolder(Selector selector) {
+            this.selector = selector;
+        }
+
+        @Override
+        protected void finalize() throws Throwable {
+            IoUtils.safeClose(selector);
+        }
+    }
 }
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java b/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java
index bdd7fdc..9a36028 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java
@@ -27,7 +27,7 @@ import java.net.StandardProtocolFamily;
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.util.List;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -60,6 +60,7 @@ final class NioXnioWorker extends XnioWorker {
     private volatile int state = 1;
 
     private final WorkerThread[] workerThreads;
+    private final WorkerThread acceptThread;
     private final Closeable mbeanHandle;
 
     @SuppressWarnings("unused")
@@ -100,6 +101,10 @@ final class NioXnioWorker extends XnioWorker {
                 }
                 workerThreads[i] = workerThread;
             }
+            acceptThread = new WorkerThread(this, xnio.mainSelectorCreator.open(), String.format("%s Accept", workerName), threadGroup, workerStackSize, threadCount);
+            if (markWorkerThreadAsDaemon) {
+                acceptThread.setDaemon(true);
+            }
             ok = true;
         } finally {
             if (! ok) {
@@ -145,9 +150,15 @@ final class NioXnioWorker extends XnioWorker {
             openResourceUnconditionally();
             worker.start();
         }
+        openResourceUnconditionally();
+        acceptThread.start();
     }
 
     protected WorkerThread chooseThread() {
+        return getIoThread(ThreadLocalRandom.current().nextInt());
+    }
+
+    public WorkerThread getIoThread(final int hashCode) {
         final WorkerThread[] workerThreads = this.workerThreads;
         final int length = workerThreads.length;
         if (length == 0) {
@@ -156,8 +167,7 @@ final class NioXnioWorker extends XnioWorker {
         if (length == 1) {
             return workerThreads[0];
         }
-        final Random random = IoUtils.getThreadLocalRandom();
-        return workerThreads[random.nextInt(length)];
+        return workerThreads[Math.floorMod(hashCode, length)];
     }
 
     public int getIoThreadCount() {
@@ -181,10 +191,17 @@ final class NioXnioWorker extends XnioWorker {
             } else {
                 channel.socket().bind(bindAddress);
             }
-            final NioTcpServer server = new NioTcpServer(this, channel, optionMap);
-            server.setAcceptListener(acceptListener);
-            ok = true;
-            return server;
+            if (false) {
+                final NioTcpServer server = new NioTcpServer(this, channel, optionMap);
+                server.setAcceptListener(acceptListener);
+                ok = true;
+                return server;
+            } else {
+                final QueuedNioTcpServer server = new QueuedNioTcpServer(this, channel, optionMap);
+                server.setAcceptListener(acceptListener);
+                ok = true;
+                return server;
+            }
         } finally {
             if (! ok) {
                 IoUtils.safeClose(channel);
@@ -275,6 +292,7 @@ final class NioXnioWorker extends XnioWorker {
             for (WorkerThread worker : workerThreads) {
                 worker.shutdown();
             }
+            acceptThread.shutdown();
             shutDownTaskPool();
             return;
         }
@@ -352,4 +370,8 @@ final class NioXnioWorker extends XnioWorker {
     public NioXnio getXnio() {
         return (NioXnio) super.getXnio();
     }
+
+    WorkerThread getAcceptThread() {
+        return acceptThread;
+    }
 }
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java b/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServer.java
similarity index 68%
copy from nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
copy to nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServer.java
index 9e659d8..a401c45 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
+++ b/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServer.java
@@ -1,7 +1,7 @@
 /*
- * JBoss, Home of Professional Open Source.
- * Copyright 2012 Red Hat, Inc., and individual contributors
- * as indicated by the @author tags.
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2015 Red Hat, Inc. and/or its affiliates.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -18,24 +18,33 @@
 
 package org.xnio.nio;
 
+import static org.xnio.IoUtils.safeClose;
+import static org.xnio.nio.Log.log;
+import static org.xnio.nio.Log.tcpServerLog;
+
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
 import org.jboss.logging.Logger;
-import org.xnio.IoUtils;
-import org.xnio.Option;
 import org.xnio.ChannelListener;
+import org.xnio.ChannelListeners;
+import org.xnio.LocalSocketAddress;
+import org.xnio.Option;
 import org.xnio.OptionMap;
 import org.xnio.Options;
 import org.xnio.StreamConnection;
@@ -45,21 +54,20 @@ import org.xnio.channels.AcceptingChannel;
 import org.xnio.channels.UnsupportedOptionException;
 import org.xnio.management.XnioServerMXBean;
 
-import static org.xnio.IoUtils.safeClose;
-import static org.xnio.nio.Log.log;
-import static org.xnio.nio.Log.tcpServerLog;
-
-final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements AcceptingChannel<StreamConnection>, AcceptListenerSettable<NioTcpServer> {
-    private static final String FQCN = NioTcpServer.class.getName();
+final class QueuedNioTcpServer extends AbstractNioChannel<QueuedNioTcpServer> implements AcceptingChannel<StreamConnection>, AcceptListenerSettable<QueuedNioTcpServer> {
+    private static final String FQCN = QueuedNioTcpServer.class.getName();
 
-    private volatile ChannelListener<? super NioTcpServer> acceptListener;
+    private volatile ChannelListener<? super QueuedNioTcpServer> acceptListener;
 
-    private final NioTcpServerHandle[] handles;
+    private final QueuedNioTcpServerHandle handle;
+    private final WorkerThread thread;
 
     private final ServerSocketChannel channel;
     private final ServerSocket socket;
     private final Closeable mbeanHandle;
 
+    private final List<BlockingQueue<SocketChannel>> acceptQueues;
+
     private static final Set<Option<?>> options = Option.setBuilder()
             .add(Options.REUSE_ADDRESSES)
             .add(Options.RECEIVE_BUFFER)
@@ -87,8 +95,6 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
     private volatile int readTimeout;
     @SuppressWarnings("unused")
     private volatile int writeTimeout;
-    private volatile int tokenConnectionCount;
-    volatile boolean resumed;
 
     private static final long CONN_LOW_MASK     = 0x000000007FFFFFFFL;
     private static final long CONN_LOW_BIT      = 0L;
@@ -99,34 +105,36 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
     @SuppressWarnings("unused")
     private static final long CONN_HIGH_ONE     = 1L << CONN_HIGH_BIT;
 
-    private static final AtomicIntegerFieldUpdater<NioTcpServer> keepAliveUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "keepAlive");
-    private static final AtomicIntegerFieldUpdater<NioTcpServer> oobInlineUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "oobInline");
-    private static final AtomicIntegerFieldUpdater<NioTcpServer> tcpNoDelayUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "tcpNoDelay");
-    private static final AtomicIntegerFieldUpdater<NioTcpServer> sendBufferUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "sendBuffer");
-    private static final AtomicIntegerFieldUpdater<NioTcpServer> readTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "readTimeout");
-    private static final AtomicIntegerFieldUpdater<NioTcpServer> writeTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "writeTimeout");
-
-    private static final AtomicLongFieldUpdater<NioTcpServer> connectionStatusUpdater = AtomicLongFieldUpdater.newUpdater(NioTcpServer.class, "connectionStatus");
+    private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> keepAliveUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "keepAlive");
+    private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> oobInlineUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "oobInline");
+    private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> tcpNoDelayUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "tcpNoDelay");
+    private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> sendBufferUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "sendBuffer");
+    private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> readTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "readTimeout");
+    private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> writeTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "writeTimeout");
+
+    private static final AtomicLongFieldUpdater<QueuedNioTcpServer> connectionStatusUpdater = AtomicLongFieldUpdater.newUpdater(QueuedNioTcpServer.class, "connectionStatus");
+    private final Runnable acceptTask = new Runnable() {
+        public void run() {
+            final WorkerThread current = WorkerThread.getCurrent();
+            assert current != null;
+            final BlockingQueue<SocketChannel> queue = acceptQueues.get(current.getNumber());
+            ChannelListeners.invokeChannelListener(QueuedNioTcpServer.this, getAcceptListener());
+            if (! queue.isEmpty()) {
+                current.execute(this);
+            }
+        }
+    };
 
-    NioTcpServer(final NioXnioWorker worker, final ServerSocketChannel channel, final OptionMap optionMap) throws IOException {
+    QueuedNioTcpServer(final NioXnioWorker worker, final ServerSocketChannel channel, final OptionMap optionMap) throws IOException {
         super(worker);
         this.channel = channel;
-        final WorkerThread[] threads = worker.getAll();
-        final int threadCount = threads.length;
-        if (threadCount == 0) {
-            throw log.noThreads();
-        }
-        final int tokens = optionMap.get(Options.BALANCING_TOKENS, -1);
-        final int connections = optionMap.get(Options.BALANCING_CONNECTIONS, 16);
-        if (tokens != -1) {
-            if (tokens < 1 || tokens >= threadCount) {
-                throw log.balancingTokens();
-            }
-            if (connections < 1) {
-                throw log.balancingConnectionCount();
-            }
-            tokenConnectionCount = connections;
+        this.thread = worker.getAcceptThread();
+        final WorkerThread[] workerThreads = worker.getAll();
+        final List<BlockingQueue<SocketChannel>> acceptQueues = new ArrayList<>(workerThreads.length);
+        for (int i = 0; i < workerThreads.length; i++) {
+            acceptQueues.add(i, new LinkedBlockingQueue<SocketChannel>());
         }
+        this.acceptQueues = acceptQueues;
         socket = channel.socket();
         if (optionMap.contains(Options.SEND_BUFFER)) {
             final int sendBufferSize = optionMap.get(Options.SEND_BUFFER, DEFAULT_BUFFER_SIZE);
@@ -150,11 +158,11 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
         if (optionMap.contains(Options.WRITE_TIMEOUT)) {
             writeTimeoutUpdater.lazySet(this, optionMap.get(Options.WRITE_TIMEOUT, 0));
         }
-        int perThreadLow, perThreadLowRem;
-        int perThreadHigh, perThreadHighRem;
+        final int highWater;
+        final int lowWater;
         if (optionMap.contains(Options.CONNECTION_HIGH_WATER) || optionMap.contains(Options.CONNECTION_LOW_WATER)) {
-            final int highWater = optionMap.get(Options.CONNECTION_HIGH_WATER, Integer.MAX_VALUE);
-            final int lowWater = optionMap.get(Options.CONNECTION_LOW_WATER, highWater);
+            highWater = optionMap.get(Options.CONNECTION_HIGH_WATER, Integer.MAX_VALUE);
+            lowWater = optionMap.get(Options.CONNECTION_LOW_WATER, highWater);
             if (highWater <= 0) {
                 throw badHighWater();
             }
@@ -163,29 +171,14 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
             }
             final long highLowWater = (long) highWater << CONN_HIGH_BIT | (long) lowWater << CONN_LOW_BIT;
             connectionStatusUpdater.lazySet(this, highLowWater);
-            perThreadLow = lowWater / threadCount;
-            perThreadLowRem = lowWater % threadCount;
-            perThreadHigh = highWater / threadCount;
-            perThreadHighRem = highWater % threadCount;
         } else {
-            perThreadLow = Integer.MAX_VALUE;
-            perThreadLowRem = 0;
-            perThreadHigh = Integer.MAX_VALUE;
-            perThreadHighRem = 0;
+            highWater = Integer.MAX_VALUE;
+            lowWater = Integer.MAX_VALUE;
             connectionStatusUpdater.lazySet(this, CONN_LOW_MASK | CONN_HIGH_MASK);
         }
-        final NioTcpServerHandle[] handles = new NioTcpServerHandle[threadCount];
-        for (int i = 0, length = threadCount; i < length; i++) {
-            final SelectionKey key = threads[i].registerChannel(channel);
-            handles[i] = new NioTcpServerHandle(this, key, threads[i], i < perThreadHighRem ? perThreadHigh + 1 : perThreadHigh, i < perThreadLowRem ? perThreadLow + 1 : perThreadLow);
-            key.attach(handles[i]);
-        }
-        this.handles = handles;
-        if (tokens > 0) {
-            for (int i = 0; i < threadCount; i ++) {
-                handles[i].initializeTokenCount(i < tokens ? connections : 0);
-            }
-        }
+        final SelectionKey key = thread.registerChannel(channel);
+        handle = new QueuedNioTcpServerHandle(this, thread, key, highWater, lowWater);
+        key.attach(handle);
         mbeanHandle = NioXnio.register(new XnioServerMXBean() {
             public String getProviderName() {
                 return "nio";
@@ -200,22 +193,7 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
             }
 
             public int getConnectionCount() {
-                final AtomicInteger counter = new AtomicInteger();
-                final CountDownLatch latch = new CountDownLatch(handles.length);
-                for (final NioTcpServerHandle handle : handles) {
-                    handle.getWorkerThread().execute(new Runnable() {
-                        public void run() {
-                            counter.getAndAdd(handle.getConnectionCount());
-                            latch.countDown();
-                        }
-                    });
-                }
-                try {
-                    latch.await();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-                return counter.get();
+                return handle.getConnectionCount();
             }
 
             public int getConnectionLimitHighWater() {
@@ -240,9 +218,7 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
         try {
             channel.close();
         } finally {
-            for (NioTcpServerHandle handle : handles) {
-                handle.getWorkerThread().cancelKey(handle.getSelectionKey());
-            }
+            handle.getWorkerThread().cancelKey(handle.getSelectionKey());
             safeClose(mbeanHandle);
         }
     }
@@ -283,7 +259,7 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
         if (option == Options.REUSE_ADDRESSES) {
             old = Boolean.valueOf(socket.getReuseAddress());
             socket.setReuseAddress(Options.REUSE_ADDRESSES.cast(value, Boolean.FALSE).booleanValue());
-        } else if (option == Options.RECEIVE_BUFFER) { 
+        } else if (option == Options.RECEIVE_BUFFER) {
             old = Integer.valueOf(socket.getReceiveBufferSize());
             final int newValue = Options.RECEIVE_BUFFER.cast(value, Integer.valueOf(DEFAULT_BUFFER_SIZE)).intValue();
             if (newValue < 1) {
@@ -345,23 +321,6 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
             }
             newVal = (long)newLowWater << CONN_LOW_BIT | (long)newHighWater << CONN_HIGH_BIT;
         } while (! connectionStatusUpdater.compareAndSet(this, oldVal, newVal));
-
-        final NioTcpServerHandle[] conduits = handles;
-        final int threadCount = conduits.length;
-
-        int perThreadLow, perThreadLowRem;
-        int perThreadHigh, perThreadHighRem;
-
-        perThreadLow = newLowWater / threadCount;
-        perThreadLowRem = newLowWater % threadCount;
-        perThreadHigh = newHighWater / threadCount;
-        perThreadHighRem = newHighWater % threadCount;
-
-        for (int i = 0; i < conduits.length; i++) {
-            NioTcpServerHandle conduit = conduits[i];
-            conduit.executeSetTask(i < perThreadHighRem ? perThreadHigh + 1 : perThreadHigh, i < perThreadLowRem ? perThreadLow + 1 : perThreadLow);
-        }
-
         return oldVal;
     }
 
@@ -375,14 +334,14 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
 
     public NioSocketStreamConnection accept() throws IOException {
         final WorkerThread current = WorkerThread.getCurrent();
-        final NioTcpServerHandle handle = handles[current.getNumber()];
-        if (! handle.getConnection()) {
+        if (current == null) {
             return null;
         }
+        final BlockingQueue<SocketChannel> socketChannels = acceptQueues.get(current.getNumber());
         final SocketChannel accepted;
         boolean ok = false;
         try {
-            accepted = channel.accept();
+            accepted = socketChannels.poll();
             if (accepted != null) try {
                 accepted.configureBlocking(false);
                 final Socket socket = accepted.socket();
@@ -415,16 +374,16 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
         return String.format("TCP server (NIO) <%s>", Integer.toHexString(hashCode()));
     }
 
-    public ChannelListener<? super NioTcpServer> getAcceptListener() {
+    public ChannelListener<? super QueuedNioTcpServer> getAcceptListener() {
         return acceptListener;
     }
 
-    public void setAcceptListener(final ChannelListener<? super NioTcpServer> acceptListener) {
+    public void setAcceptListener(final ChannelListener<? super QueuedNioTcpServer> acceptListener) {
         this.acceptListener = acceptListener;
     }
 
-    public ChannelListener.Setter<NioTcpServer> getAcceptSetter() {
-        return new AcceptListenerSettable.Setter<NioTcpServer>(this);
+    public ChannelListener.Setter<QueuedNioTcpServer> getAcceptSetter() {
+        return new Setter<QueuedNioTcpServer>(this);
     }
 
     public boolean isOpen() {
@@ -441,37 +400,21 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
     }
 
     public void suspendAccepts() {
-        resumed = false;
-        doResume(0);
+        handle.suspend(0);
     }
 
     public void resumeAccepts() {
-        resumed = true;
-        doResume(SelectionKey.OP_ACCEPT);
+        handle.resume(SelectionKey.OP_ACCEPT);
     }
 
     public boolean isAcceptResumed() {
-        return resumed;
-    }
-
-    private void doResume(final int op) {
-        if (op == 0) {
-            for (NioTcpServerHandle handle : handles) {
-                handle.suspend();
-            }
-        } else {
-            for (NioTcpServerHandle handle : handles) {
-                handle.resume();
-            }
-        }
+        return handle.isResumed(SelectionKey.OP_ACCEPT);
     }
 
     public void wakeupAccepts() {
         tcpServerLog.logf(FQCN, Logger.Level.TRACE, null, "Wake up accepts on %s", this);
         resumeAccepts();
-        final NioTcpServerHandle[] handles = this.handles;
-        final int idx = IoUtils.getThreadLocalRandom().nextInt(handles.length);
-        handles[idx].wakeup(SelectionKey.OP_ACCEPT);
+        handle.wakeup(SelectionKey.OP_ACCEPT);
     }
 
     public void awaitAcceptable() throws IOException {
@@ -487,11 +430,53 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
         return getIoThread();
     }
 
-    NioTcpServerHandle getHandle(final int number) {
-        return handles[number];
-    }
+    void handleReady() {
+        try {
+            final SocketChannel accepted = channel.accept();
+            boolean ok = false;
+            if (accepted != null) try {
+                final SocketAddress localAddress = accepted.getLocalAddress();
+                int hash;
+                if (localAddress instanceof InetSocketAddress) {
+                    final InetSocketAddress address = (InetSocketAddress) localAddress;
+                    hash = address.getAddress().hashCode() * 23 + address.getPort();
+                } else if (localAddress instanceof LocalSocketAddress) {
+                    hash = ((LocalSocketAddress) localAddress).getName().hashCode();
+                } else {
+                    hash = localAddress.hashCode();
+                }
+                final SocketAddress remoteAddress = accepted.getRemoteAddress();
+                if (remoteAddress instanceof InetSocketAddress) {
+                    final InetSocketAddress address = (InetSocketAddress) remoteAddress;
+                    hash = (address.getAddress().hashCode() * 23 + address.getPort()) * 23 + hash;
+                } else if (remoteAddress instanceof LocalSocketAddress) {
+                    hash = ((LocalSocketAddress) remoteAddress).getName().hashCode() * 23 + hash;
+                } else {
+                    hash = localAddress.hashCode() * 23 + hash;
+                }
+                accepted.configureBlocking(false);
+                final Socket socket = accepted.socket();
+                socket.setKeepAlive(keepAlive != 0);
+                socket.setOOBInline(oobInline != 0);
+                socket.setTcpNoDelay(tcpNoDelay != 0);
+                final int sendBuffer = this.sendBuffer;
+                if (sendBuffer > 0) socket.setSendBufferSize(sendBuffer);
+                final WorkerThread ioThread = worker.getIoThread(hash);
+                final SelectionKey selectionKey = ioThread.registerChannel(accepted);
+                final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(ioThread, selectionKey, handle);
+                newConnection.setOption(Options.READ_TIMEOUT, Integer.valueOf(readTimeout));
+                newConnection.setOption(Options.WRITE_TIMEOUT, Integer.valueOf(writeTimeout));
+                ok = true;
+                final int number = ioThread.getNumber();
+                final BlockingQueue<SocketChannel> queue = acceptQueues.get(number);
+                queue.add(accepted);
+                // todo: only execute if necessary
+                ioThread.execute(acceptTask);
+            } finally {
+                if (! ok) safeClose(accepted);
+            }
+        } catch (IOException ignored) {
 
-    int getTokenConnectionCount() {
-        return tokenConnectionCount;
+        }
     }
 }
diff --git a/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServerHandle.java b/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServerHandle.java
new file mode 100644
index 0000000..8a86b70
--- /dev/null
+++ b/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServerHandle.java
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2015 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.xnio.nio;
+
+import static org.xnio.IoUtils.safeClose;
+
+import java.nio.channels.SelectionKey;
+
+/**
+ * @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
+ */
+final class QueuedNioTcpServerHandle extends NioHandle implements ChannelClosed {
+
+    private final QueuedNioTcpServer server;
+
+    QueuedNioTcpServerHandle(final QueuedNioTcpServer server, final WorkerThread workerThread, final SelectionKey key, final int highWater, final int lowWater) {
+        super(workerThread, key);
+        this.server = server;
+    }
+
+    void handleReady(final int ops) {
+        server.handleReady();
+    }
+
+    void forceTermination() {
+        safeClose(server);
+    }
+
+    void terminated() {
+        server.invokeCloseHandler();
+    }
+
+    public void channelClosed() {
+        freeConnection();
+    }
+
+    void freeConnection() {
+        // ignore for now
+    }
+
+    int getConnectionCount() {
+        return -1;
+    }
+}
diff --git a/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java b/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java
index 5ccdb96..19ca1a0 100644
--- a/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java
+++ b/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java
@@ -97,7 +97,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
 
     static {
         OLD_LOCKING = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.nio.old-locking", "false")));
-        THREAD_SAFE_SELECTION_KEYS = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.xnio.thread-safe-selection-keys", "false")));
+        THREAD_SAFE_SELECTION_KEYS = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.nio.thread-safe-selection-keys", "false")));
     }
 
     WorkerThread(final NioXnioWorker worker, final Selector selector, final String name, final ThreadGroup group, final long stackSize, final int number) {
@@ -419,6 +419,8 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
         }
     }
 
+    volatile boolean polling;
+
     public void run() {
         final Selector selector = this.selector;
         try {
@@ -499,11 +501,29 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
                         selector.selectNow();
                     } else if (delayTime == Long.MAX_VALUE) {
                         selectorLog.tracef("Beginning select on %s", selector);
-                        selector.select();
+                        polling = true;
+                        try {
+                            if (workQueue.peek() != null) {
+                                selector.selectNow();
+                            } else {
+                                selector.select();
+                            }
+                        } finally {
+                            polling = false;
+                        }
                     } else {
                         final long millis = 1L + delayTime / 1000000L;
                         selectorLog.tracef("Beginning select on %s (with timeout)", selector);
-                        selector.select(millis);
+                        polling = true;
+                        try {
+                            if (workQueue.peek() != null) {
+                                selector.selectNow();
+                            } else {
+                                selector.select(millis);
+                            }
+                        } finally {
+                            polling = false;
+                        }
                     }
                 } catch (CancelledKeyException ignored) {
                     // Mac and other buggy implementations sometimes spits these out
@@ -570,7 +590,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
         synchronized (workLock) {
             selectorWorkQueue.add(command);
         }
-        if(currentThread() != this) {
+        if (polling) { // flag is always false if we're the same thread
             selector.wakeup();
         }
     }
@@ -605,7 +625,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
             queue.add(key);
             if (queue.iterator().next() == key) {
                 // we're the next one up; poke the selector to update its delay time
-                if(currentThread() != this) {
+                if (polling) { // flag is always false if we're the same thread
                     selector.wakeup();
                 }
             }
@@ -665,7 +685,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
             try {
                 return channel.register(selector, 0);
             } finally {
-                selector.wakeup();
+                if (polling) selector.wakeup();
             }
         } else {
             final SynchTask task = new SynchTask();
@@ -744,7 +764,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
         } else {
             try {
                 key.interestOps(key.interestOps() | ops);
-                selector.wakeup();
+                if (polling) selector.wakeup();
             } catch (CancelledKeyException ignored) {
             }
         }
diff --git a/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java b/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java
index 0ad5726..19d4369 100644
--- a/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java
+++ b/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.xnio.OptionMap;
 import org.xnio.Options;
@@ -62,6 +63,7 @@ public class IllegalConnectionTestCase {
     }
 
     @Test
+    @Ignore
     public void illegalAcceptThreads() throws IOException {
         IllegalArgumentException expected;
 
diff --git a/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java b/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java
index 0ff0675..126f238 100644
--- a/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java
+++ b/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java
@@ -30,6 +30,7 @@ import java.net.SocketAddress;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.xnio.ChannelListener;
 import org.xnio.FutureResult;
@@ -80,8 +81,8 @@ public class TcpChannelTestCase extends AbstractNioStreamChannelTest {
         }
         final IoFuture<ConnectedStreamChannel> connectedStreamChannel = xnioWorker.connectStream(bindAddress, null, optionMap);
         final FutureResult<ConnectedStreamChannel> accepted = new FutureResult<ConnectedStreamChannel>(xnioWorker);
-        server.getAcceptThread().execute(new Runnable() {
-            public void run() {
+        server.getAcceptSetter().set(new ChannelListener<AcceptingChannel<? extends ConnectedStreamChannel>>() {
+            public void handleEvent(final AcceptingChannel<? extends ConnectedStreamChannel> channel) {
                 try {
                     accepted.setResult(server.accept());
                 } catch (IOException e) {
@@ -89,6 +90,7 @@ public class TcpChannelTestCase extends AbstractNioStreamChannelTest {
                 }
             }
         });
+        server.resumeAccepts();
         serverChannel = accepted.getIoFuture().get();
         channel = connectedStreamChannel.get();
         assertNotNull(serverChannel);
@@ -98,6 +100,7 @@ public class TcpChannelTestCase extends AbstractNioStreamChannelTest {
     }
 
     @Test
+    @Ignore("unreliable")
     public void optionSetup() throws IOException {
         initChannels();
         final Option<?>[] unsupportedOptions = OptionHelper.getNotSupportedOptions(Options.CLOSE_ABORT,
diff --git a/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java b/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java
index b997afb..13c1387 100644
--- a/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java
+++ b/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java
@@ -30,6 +30,7 @@ import java.net.SocketAddress;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.xnio.ChannelListener;
 import org.xnio.FutureResult;
@@ -80,8 +81,8 @@ public class TcpConnectionTestCase extends AbstractStreamSinkSourceChannelTest<S
         }
         final IoFuture<StreamConnection> openedConnection = xnioWorker.openStreamConnection(bindAddress, null, optionMap);
         final FutureResult<StreamConnection> accepted = new FutureResult<StreamConnection>(xnioWorker);
-        server.getIoThread().execute(new Runnable() {
-            public void run() {
+        server.getAcceptSetter().set(new ChannelListener<AcceptingChannel<? extends StreamConnection>>() {
+            public void handleEvent(final AcceptingChannel<? extends StreamConnection> channel) {
                 try {
                     accepted.setResult(server.accept());
                 } catch (IOException e) {
@@ -89,6 +90,7 @@ public class TcpConnectionTestCase extends AbstractStreamSinkSourceChannelTest<S
                 }
             }
         });
+        server.resumeAccepts();
         serverConnection = accepted.getIoFuture().get();
         connection = openedConnection.get();
         assertNotNull(serverConnection);
@@ -98,6 +100,7 @@ public class TcpConnectionTestCase extends AbstractStreamSinkSourceChannelTest<S
     }
 
     @Test
+    @Ignore("unreliable")
     public void optionSetup() throws IOException {
         initChannels();
         final Option<?>[] unsupportedOptions = OptionHelper.getNotSupportedOptions(Options.CLOSE_ABORT,
diff --git a/pom.xml b/pom.xml
index 09ee69f..5ca90db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,7 +32,7 @@
     <artifactId>xnio-all</artifactId>
     <packaging>pom</packaging>
     <name>XNIO Parent POM</name>
-    <version>3.3.2.Final</version>
+    <version>3.3.3.Final</version>
     <description>The aggregator POM of the XNIO project</description>
 
     <modules>

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-java/jboss-xnio.git



More information about the pkg-java-commits mailing list