[jboss-xnio] 07/11: Revert "Revert "Imported Upstream version 3.3.3""
Markus Koschany
apo-guest at moszumanska.debian.org
Mon Dec 21 22:41:48 UTC 2015
This is an automated email from the git hooks/post-receive script.
apo-guest pushed a commit to branch master
in repository jboss-xnio.
commit bfd68eb5d25756f2eefdf01a36a3389bce69e4ec
Author: Markus Koschany <apo at debian.org>
Date: Mon Dec 21 23:20:43 2015 +0100
Revert "Revert "Imported Upstream version 3.3.3""
This reverts commit 45497b8d8818a62c0359395e2ea2e0701ba75be8.
---
api/pom.xml | 2 +-
.../main/java/org/xnio/ByteBufferSlicePool.java | 55 +++--
api/src/main/java/org/xnio/XnioWorker.java | 10 +-
api/src/test/java/org/xnio/XnioWorkerTestCase.java | 5 +
.../test/java/org/xnio/mock/XnioWorkerMock.java | 6 +-
nio-impl/pom.xml | 2 +-
.../src/main/java/org/xnio/nio/ChannelClosed.java | 26 +++
.../org/xnio/nio/NioSocketStreamConnection.java | 10 +-
.../src/main/java/org/xnio/nio/NioTcpServer.java | 26 ++-
.../main/java/org/xnio/nio/NioTcpServerHandle.java | 4 +-
nio-impl/src/main/java/org/xnio/nio/NioXnio.java | 33 ++-
.../src/main/java/org/xnio/nio/NioXnioWorker.java | 36 ++-
.../{NioTcpServer.java => QueuedNioTcpServer.java} | 253 ++++++++++-----------
.../org/xnio/nio/QueuedNioTcpServerHandle.java | 60 +++++
.../src/main/java/org/xnio/nio/WorkerThread.java | 34 ++-
.../xnio/nio/test/IllegalConnectionTestCase.java | 2 +
.../java/org/xnio/nio/test/TcpChannelTestCase.java | 7 +-
.../org/xnio/nio/test/TcpConnectionTestCase.java | 7 +-
pom.xml | 2 +-
19 files changed, 383 insertions(+), 197 deletions(-)
diff --git a/api/pom.xml b/api/pom.xml
index 40724ae..35f64c4 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -36,7 +36,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
- <version>3.3.2.Final</version>
+ <version>3.3.3.Final</version>
</parent>
<dependencies>
diff --git a/api/src/main/java/org/xnio/ByteBufferSlicePool.java b/api/src/main/java/org/xnio/ByteBufferSlicePool.java
index 61913c4..2f5f1ef 100644
--- a/api/src/main/java/org/xnio/ByteBufferSlicePool.java
+++ b/api/src/main/java/org/xnio/ByteBufferSlicePool.java
@@ -61,7 +61,7 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
private final ThreadLocal<ThreadLocalCache> localQueueHolder = new ThreadLocal<ThreadLocalCache>() {
protected ThreadLocalCache initialValue() {
//noinspection serial
- return new ThreadLocalCache(this);
+ return new ThreadLocalCache();
}
public void remove() {
@@ -120,13 +120,16 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
/** {@inheritDoc} */
public Pooled<ByteBuffer> allocate() {
- ThreadLocalCache localCache = localQueueHolder.get();
- if(localCache.outstanding != threadLocalQueueSize) {
- localCache.outstanding++;
- }
- Slice slice = localCache.queue.poll();
- if (slice != null) {
- return new PooledByteBuffer(slice, slice.slice());
+ Slice slice;
+ if (threadLocalQueueSize > 0) {
+ ThreadLocalCache localCache = localQueueHolder.get();
+ if(localCache.outstanding != threadLocalQueueSize) {
+ localCache.outstanding++;
+ }
+ slice = localCache.queue.poll();
+ if (slice != null) {
+ return new PooledByteBuffer(slice, slice.slice());
+ }
}
final Queue<Slice> sliceQueue = this.sliceQueue;
slice = sliceQueue.poll();
@@ -159,17 +162,21 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
}
private void doFree(Slice region) {
- final ThreadLocalCache localCache = localQueueHolder.get();
- boolean cacheOk = false;
- if(localCache.outstanding > 0) {
- localCache.outstanding--;
- cacheOk = true;
- }
- ArrayDeque<Slice> localQueue = localCache.queue;
- if (localQueue.size() == threadLocalQueueSize || !cacheOk) {
- sliceQueue.add(region);
+ if (threadLocalQueueSize > 0) {
+ final ThreadLocalCache localCache = localQueueHolder.get();
+ boolean cacheOk = false;
+ if(localCache.outstanding > 0) {
+ localCache.outstanding--;
+ cacheOk = true;
+ }
+ ArrayDeque<Slice> localQueue = localCache.queue;
+ if (localQueue.size() == threadLocalQueueSize || !cacheOk) {
+ sliceQueue.add(region);
+ } else {
+ localQueue.add(region);
+ }
} else {
- localQueue.add(region);
+ sliceQueue.add(region);
}
}
@@ -245,22 +252,24 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
private final class ThreadLocalCache {
- private final ThreadLocal<ThreadLocalCache> threadLocal;
-
final ArrayDeque<Slice> queue = new ArrayDeque<Slice>(threadLocalQueueSize) {
/**
* This sucks but there's no other way to ensure these buffers are returned to the pool.
*/
protected void finalize() {
- threadLocal.remove();
+ final ArrayDeque<Slice> deque = queue;
+ Slice slice = deque.poll();
+ while (slice != null) {
+ doFree(slice);
+ slice = deque.poll();
+ }
}
};
int outstanding = 0;
- private ThreadLocalCache(ThreadLocal<ThreadLocalCache> threadLocal) {
- this.threadLocal = threadLocal;
+ ThreadLocalCache() {
}
}
}
diff --git a/api/src/main/java/org/xnio/XnioWorker.java b/api/src/main/java/org/xnio/XnioWorker.java
index 52a01af..dacc224 100644
--- a/api/src/main/java/org/xnio/XnioWorker.java
+++ b/api/src/main/java/org/xnio/XnioWorker.java
@@ -199,7 +199,7 @@ public abstract class XnioWorker extends AbstractExecutorService implements Conf
}
public XnioIoThread getIoThread() {
- return chooseThread();
+ return server.getIoThread();
}
public void close() throws IOException {
@@ -702,6 +702,14 @@ public abstract class XnioWorker extends AbstractExecutorService implements Conf
}
/**
+ * Get an I/O thread from this worker. The thread is chosen based on the given hash code.
+ *
+ * @param hashCode the hash code
+ * @return the thread
+ */
+ public abstract XnioIoThread getIoThread(int hashCode);
+
+ /**
* Get the user task to run once termination is complete.
*
* @return the termination task
diff --git a/api/src/test/java/org/xnio/XnioWorkerTestCase.java b/api/src/test/java/org/xnio/XnioWorkerTestCase.java
index 49c0566..4f20017 100644
--- a/api/src/test/java/org/xnio/XnioWorkerTestCase.java
+++ b/api/src/test/java/org/xnio/XnioWorkerTestCase.java
@@ -805,6 +805,11 @@ public class XnioWorkerTestCase {
}
@Override
+ public XnioIoThread getIoThread(final int hashCode) {
+ return new XnioIoThreadMock(this);
+ }
+
+ @Override
public int getIoThreadCount() {
return 0;
}
diff --git a/api/src/test/java/org/xnio/mock/XnioWorkerMock.java b/api/src/test/java/org/xnio/mock/XnioWorkerMock.java
index dc635a9..f489eef 100644
--- a/api/src/test/java/org/xnio/mock/XnioWorkerMock.java
+++ b/api/src/test/java/org/xnio/mock/XnioWorkerMock.java
@@ -97,6 +97,11 @@ public class XnioWorkerMock extends XnioWorker {
return mockThread;
}
+ @Override
+ public XnioIoThreadMock getIoThread(final int hashCode) {
+ return mockThread;
+ }
+
/**
* Returns the connect behavior of this worker mock.
*/
@@ -179,5 +184,4 @@ public class XnioWorkerMock extends XnioWorker {
@Override
public void awaitTermination() throws InterruptedException {
}
-
}
diff --git a/nio-impl/pom.xml b/nio-impl/pom.xml
index 4c0a65b..badecd8 100644
--- a/nio-impl/pom.xml
+++ b/nio-impl/pom.xml
@@ -31,7 +31,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
- <version>3.3.2.Final</version>
+ <version>3.3.3.Final</version>
</parent>
<properties>
diff --git a/nio-impl/src/main/java/org/xnio/nio/ChannelClosed.java b/nio-impl/src/main/java/org/xnio/nio/ChannelClosed.java
new file mode 100644
index 0000000..166bc93
--- /dev/null
+++ b/nio-impl/src/main/java/org/xnio/nio/ChannelClosed.java
@@ -0,0 +1,26 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2015 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.xnio.nio;
+
+/**
+ * @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
+ */
+public interface ChannelClosed {
+ void channelClosed();
+}
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java b/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java
index 867ec4c..fc18c5b 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java
@@ -34,14 +34,14 @@ import org.xnio.Options;
*/
final class NioSocketStreamConnection extends AbstractNioStreamConnection {
- private final NioTcpServerHandle serverConduit;
+ private final ChannelClosed closedHandle;
private final NioSocketConduit conduit;
- NioSocketStreamConnection(final WorkerThread workerThread, final SelectionKey key, final NioTcpServerHandle serverConduit) {
+ NioSocketStreamConnection(final WorkerThread workerThread, final SelectionKey key, final ChannelClosed closedHandle) {
super(workerThread);
conduit = new NioSocketConduit(workerThread, key, this);
key.attach(conduit);
- this.serverConduit = serverConduit;
+ this.closedHandle = closedHandle;
setSinkConduit(conduit);
setSourceConduit(conduit);
}
@@ -135,8 +135,8 @@ final class NioSocketStreamConnection extends AbstractNioStreamConnection {
conduit.getSocketChannel().close();
} catch (ClosedChannelException ignored) {
} finally {
- final NioTcpServerHandle conduit = this.serverConduit;
- if (conduit!= null) conduit.channelClosed();
+ final ChannelClosed closedHandle = this.closedHandle;
+ if (closedHandle!= null) closedHandle.channelClosed();
}
}
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java b/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
index 9e659d8..a8f8351 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
@@ -20,6 +20,7 @@ package org.xnio.nio;
import java.io.Closeable;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
@@ -34,6 +35,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.jboss.logging.Logger;
import org.xnio.IoUtils;
+import org.xnio.LocalSocketAddress;
import org.xnio.Option;
import org.xnio.ChannelListener;
import org.xnio.OptionMap;
@@ -384,6 +386,25 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
try {
accepted = channel.accept();
if (accepted != null) try {
+ final SocketAddress localAddress = accepted.getLocalAddress();
+ int hash;
+ if (localAddress instanceof InetSocketAddress) {
+ final InetSocketAddress address = (InetSocketAddress) localAddress;
+ hash = address.getAddress().hashCode() * 23 + address.getPort();
+ } else if (localAddress instanceof LocalSocketAddress) {
+ hash = ((LocalSocketAddress) localAddress).getName().hashCode();
+ } else {
+ hash = localAddress.hashCode();
+ }
+ final SocketAddress remoteAddress = accepted.getRemoteAddress();
+ if (remoteAddress instanceof InetSocketAddress) {
+ final InetSocketAddress address = (InetSocketAddress) remoteAddress;
+ hash = (address.getAddress().hashCode() * 23 + address.getPort()) * 23 + hash;
+ } else if (remoteAddress instanceof LocalSocketAddress) {
+ hash = ((LocalSocketAddress) remoteAddress).getName().hashCode() * 23 + hash;
+ } else {
+ hash = localAddress.hashCode() * 23 + hash;
+ }
accepted.configureBlocking(false);
final Socket socket = accepted.socket();
socket.setKeepAlive(keepAlive != 0);
@@ -391,8 +412,9 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
socket.setTcpNoDelay(tcpNoDelay != 0);
final int sendBuffer = this.sendBuffer;
if (sendBuffer > 0) socket.setSendBufferSize(sendBuffer);
- final SelectionKey selectionKey = current.registerChannel(accepted);
- final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(current, selectionKey, handle);
+ final WorkerThread ioThread = worker.getIoThread(hash);
+ final SelectionKey selectionKey = ioThread.registerChannel(accepted);
+ final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(ioThread, selectionKey, handle);
newConnection.setOption(Options.READ_TIMEOUT, Integer.valueOf(readTimeout));
newConnection.setOption(Options.WRITE_TIMEOUT, Integer.valueOf(writeTimeout));
ok = true;
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java b/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java
index 1b20c21..0dd63d3 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java
@@ -27,7 +27,7 @@ import static org.xnio.IoUtils.safeClose;
/**
* @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
*/
-final class NioTcpServerHandle extends NioHandle {
+final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
private final Runnable freeTask;
private final NioTcpServer server;
@@ -91,7 +91,7 @@ final class NioTcpServerHandle extends NioHandle {
}
}
- void channelClosed() {
+ public void channelClosed() {
final WorkerThread thread = getWorkerThread();
if (thread == currentThread()) {
freeConnection();
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioXnio.java b/nio-impl/src/main/java/org/xnio/nio/NioXnio.java
index e9d5280..e7fcc0c 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioXnio.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioXnio.java
@@ -215,22 +215,25 @@ final class NioXnio extends Xnio {
return super.createFileSystemWatcher(name, options);
}
- private final ThreadLocal<Selector> selectorThreadLocal = new ThreadLocal<Selector>() {
+ private final ThreadLocal<FinalizableSelectorHolder> selectorThreadLocal = new ThreadLocal<FinalizableSelectorHolder>() {
public void remove() {
// if no selector was created, none will be closed
- IoUtils.safeClose(get());
+ FinalizableSelectorHolder holder = get();
+ if(holder != null) {
+ IoUtils.safeClose(holder.selector);
+ }
super.remove();
}
};
Selector getSelector() throws IOException {
- final ThreadLocal<Selector> threadLocal = selectorThreadLocal;
- Selector selector = threadLocal.get();
- if (selector == null) {
- selector = tempSelectorCreator.open();
- threadLocal.set(selector);
+ final ThreadLocal<FinalizableSelectorHolder> threadLocal = selectorThreadLocal;
+ FinalizableSelectorHolder holder = threadLocal.get();
+ if (holder == null) {
+ holder = new FinalizableSelectorHolder(tempSelectorCreator.open());
+ threadLocal.set(holder);
}
- return selector;
+ return holder.selector;
}
private static class DefaultSelectorCreator implements SelectorCreator {
@@ -292,4 +295,18 @@ final class NioXnio extends Xnio {
protected static Closeable register(XnioServerMXBean serverMXBean) {
return Xnio.register(serverMXBean);
}
+
+ private static final class FinalizableSelectorHolder {
+ final Selector selector;
+
+
+ private FinalizableSelectorHolder(Selector selector) {
+ this.selector = selector;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ IoUtils.safeClose(selector);
+ }
+ }
}
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java b/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java
index bdd7fdc..9a36028 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java
@@ -27,7 +27,7 @@ import java.net.StandardProtocolFamily;
import java.nio.channels.DatagramChannel;
import java.nio.channels.ServerSocketChannel;
import java.util.List;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -60,6 +60,7 @@ final class NioXnioWorker extends XnioWorker {
private volatile int state = 1;
private final WorkerThread[] workerThreads;
+ private final WorkerThread acceptThread;
private final Closeable mbeanHandle;
@SuppressWarnings("unused")
@@ -100,6 +101,10 @@ final class NioXnioWorker extends XnioWorker {
}
workerThreads[i] = workerThread;
}
+ acceptThread = new WorkerThread(this, xnio.mainSelectorCreator.open(), String.format("%s Accept", workerName), threadGroup, workerStackSize, threadCount);
+ if (markWorkerThreadAsDaemon) {
+ acceptThread.setDaemon(true);
+ }
ok = true;
} finally {
if (! ok) {
@@ -145,9 +150,15 @@ final class NioXnioWorker extends XnioWorker {
openResourceUnconditionally();
worker.start();
}
+ openResourceUnconditionally();
+ acceptThread.start();
}
protected WorkerThread chooseThread() {
+ return getIoThread(ThreadLocalRandom.current().nextInt());
+ }
+
+ public WorkerThread getIoThread(final int hashCode) {
final WorkerThread[] workerThreads = this.workerThreads;
final int length = workerThreads.length;
if (length == 0) {
@@ -156,8 +167,7 @@ final class NioXnioWorker extends XnioWorker {
if (length == 1) {
return workerThreads[0];
}
- final Random random = IoUtils.getThreadLocalRandom();
- return workerThreads[random.nextInt(length)];
+ return workerThreads[Math.floorMod(hashCode, length)];
}
public int getIoThreadCount() {
@@ -181,10 +191,17 @@ final class NioXnioWorker extends XnioWorker {
} else {
channel.socket().bind(bindAddress);
}
- final NioTcpServer server = new NioTcpServer(this, channel, optionMap);
- server.setAcceptListener(acceptListener);
- ok = true;
- return server;
+ if (false) {
+ final NioTcpServer server = new NioTcpServer(this, channel, optionMap);
+ server.setAcceptListener(acceptListener);
+ ok = true;
+ return server;
+ } else {
+ final QueuedNioTcpServer server = new QueuedNioTcpServer(this, channel, optionMap);
+ server.setAcceptListener(acceptListener);
+ ok = true;
+ return server;
+ }
} finally {
if (! ok) {
IoUtils.safeClose(channel);
@@ -275,6 +292,7 @@ final class NioXnioWorker extends XnioWorker {
for (WorkerThread worker : workerThreads) {
worker.shutdown();
}
+ acceptThread.shutdown();
shutDownTaskPool();
return;
}
@@ -352,4 +370,8 @@ final class NioXnioWorker extends XnioWorker {
public NioXnio getXnio() {
return (NioXnio) super.getXnio();
}
+
+ WorkerThread getAcceptThread() {
+ return acceptThread;
+ }
}
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java b/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServer.java
similarity index 68%
copy from nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
copy to nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServer.java
index 9e659d8..a401c45 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
+++ b/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServer.java
@@ -1,7 +1,7 @@
/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2012 Red Hat, Inc., and individual contributors
- * as indicated by the @author tags.
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2015 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,24 +18,33 @@
package org.xnio.nio;
+import static org.xnio.IoUtils.safeClose;
+import static org.xnio.nio.Log.log;
+import static org.xnio.nio.Log.tcpServerLog;
+
import java.io.Closeable;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
import org.jboss.logging.Logger;
-import org.xnio.IoUtils;
-import org.xnio.Option;
import org.xnio.ChannelListener;
+import org.xnio.ChannelListeners;
+import org.xnio.LocalSocketAddress;
+import org.xnio.Option;
import org.xnio.OptionMap;
import org.xnio.Options;
import org.xnio.StreamConnection;
@@ -45,21 +54,20 @@ import org.xnio.channels.AcceptingChannel;
import org.xnio.channels.UnsupportedOptionException;
import org.xnio.management.XnioServerMXBean;
-import static org.xnio.IoUtils.safeClose;
-import static org.xnio.nio.Log.log;
-import static org.xnio.nio.Log.tcpServerLog;
-
-final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements AcceptingChannel<StreamConnection>, AcceptListenerSettable<NioTcpServer> {
- private static final String FQCN = NioTcpServer.class.getName();
+final class QueuedNioTcpServer extends AbstractNioChannel<QueuedNioTcpServer> implements AcceptingChannel<StreamConnection>, AcceptListenerSettable<QueuedNioTcpServer> {
+ private static final String FQCN = QueuedNioTcpServer.class.getName();
- private volatile ChannelListener<? super NioTcpServer> acceptListener;
+ private volatile ChannelListener<? super QueuedNioTcpServer> acceptListener;
- private final NioTcpServerHandle[] handles;
+ private final QueuedNioTcpServerHandle handle;
+ private final WorkerThread thread;
private final ServerSocketChannel channel;
private final ServerSocket socket;
private final Closeable mbeanHandle;
+ private final List<BlockingQueue<SocketChannel>> acceptQueues;
+
private static final Set<Option<?>> options = Option.setBuilder()
.add(Options.REUSE_ADDRESSES)
.add(Options.RECEIVE_BUFFER)
@@ -87,8 +95,6 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
private volatile int readTimeout;
@SuppressWarnings("unused")
private volatile int writeTimeout;
- private volatile int tokenConnectionCount;
- volatile boolean resumed;
private static final long CONN_LOW_MASK = 0x000000007FFFFFFFL;
private static final long CONN_LOW_BIT = 0L;
@@ -99,34 +105,36 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
@SuppressWarnings("unused")
private static final long CONN_HIGH_ONE = 1L << CONN_HIGH_BIT;
- private static final AtomicIntegerFieldUpdater<NioTcpServer> keepAliveUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "keepAlive");
- private static final AtomicIntegerFieldUpdater<NioTcpServer> oobInlineUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "oobInline");
- private static final AtomicIntegerFieldUpdater<NioTcpServer> tcpNoDelayUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "tcpNoDelay");
- private static final AtomicIntegerFieldUpdater<NioTcpServer> sendBufferUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "sendBuffer");
- private static final AtomicIntegerFieldUpdater<NioTcpServer> readTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "readTimeout");
- private static final AtomicIntegerFieldUpdater<NioTcpServer> writeTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(NioTcpServer.class, "writeTimeout");
-
- private static final AtomicLongFieldUpdater<NioTcpServer> connectionStatusUpdater = AtomicLongFieldUpdater.newUpdater(NioTcpServer.class, "connectionStatus");
+ private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> keepAliveUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "keepAlive");
+ private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> oobInlineUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "oobInline");
+ private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> tcpNoDelayUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "tcpNoDelay");
+ private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> sendBufferUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "sendBuffer");
+ private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> readTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "readTimeout");
+ private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> writeTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "writeTimeout");
+
+ private static final AtomicLongFieldUpdater<QueuedNioTcpServer> connectionStatusUpdater = AtomicLongFieldUpdater.newUpdater(QueuedNioTcpServer.class, "connectionStatus");
+ private final Runnable acceptTask = new Runnable() {
+ public void run() {
+ final WorkerThread current = WorkerThread.getCurrent();
+ assert current != null;
+ final BlockingQueue<SocketChannel> queue = acceptQueues.get(current.getNumber());
+ ChannelListeners.invokeChannelListener(QueuedNioTcpServer.this, getAcceptListener());
+ if (! queue.isEmpty()) {
+ current.execute(this);
+ }
+ }
+ };
- NioTcpServer(final NioXnioWorker worker, final ServerSocketChannel channel, final OptionMap optionMap) throws IOException {
+ QueuedNioTcpServer(final NioXnioWorker worker, final ServerSocketChannel channel, final OptionMap optionMap) throws IOException {
super(worker);
this.channel = channel;
- final WorkerThread[] threads = worker.getAll();
- final int threadCount = threads.length;
- if (threadCount == 0) {
- throw log.noThreads();
- }
- final int tokens = optionMap.get(Options.BALANCING_TOKENS, -1);
- final int connections = optionMap.get(Options.BALANCING_CONNECTIONS, 16);
- if (tokens != -1) {
- if (tokens < 1 || tokens >= threadCount) {
- throw log.balancingTokens();
- }
- if (connections < 1) {
- throw log.balancingConnectionCount();
- }
- tokenConnectionCount = connections;
+ this.thread = worker.getAcceptThread();
+ final WorkerThread[] workerThreads = worker.getAll();
+ final List<BlockingQueue<SocketChannel>> acceptQueues = new ArrayList<>(workerThreads.length);
+ for (int i = 0; i < workerThreads.length; i++) {
+ acceptQueues.add(i, new LinkedBlockingQueue<SocketChannel>());
}
+ this.acceptQueues = acceptQueues;
socket = channel.socket();
if (optionMap.contains(Options.SEND_BUFFER)) {
final int sendBufferSize = optionMap.get(Options.SEND_BUFFER, DEFAULT_BUFFER_SIZE);
@@ -150,11 +158,11 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
if (optionMap.contains(Options.WRITE_TIMEOUT)) {
writeTimeoutUpdater.lazySet(this, optionMap.get(Options.WRITE_TIMEOUT, 0));
}
- int perThreadLow, perThreadLowRem;
- int perThreadHigh, perThreadHighRem;
+ final int highWater;
+ final int lowWater;
if (optionMap.contains(Options.CONNECTION_HIGH_WATER) || optionMap.contains(Options.CONNECTION_LOW_WATER)) {
- final int highWater = optionMap.get(Options.CONNECTION_HIGH_WATER, Integer.MAX_VALUE);
- final int lowWater = optionMap.get(Options.CONNECTION_LOW_WATER, highWater);
+ highWater = optionMap.get(Options.CONNECTION_HIGH_WATER, Integer.MAX_VALUE);
+ lowWater = optionMap.get(Options.CONNECTION_LOW_WATER, highWater);
if (highWater <= 0) {
throw badHighWater();
}
@@ -163,29 +171,14 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
}
final long highLowWater = (long) highWater << CONN_HIGH_BIT | (long) lowWater << CONN_LOW_BIT;
connectionStatusUpdater.lazySet(this, highLowWater);
- perThreadLow = lowWater / threadCount;
- perThreadLowRem = lowWater % threadCount;
- perThreadHigh = highWater / threadCount;
- perThreadHighRem = highWater % threadCount;
} else {
- perThreadLow = Integer.MAX_VALUE;
- perThreadLowRem = 0;
- perThreadHigh = Integer.MAX_VALUE;
- perThreadHighRem = 0;
+ highWater = Integer.MAX_VALUE;
+ lowWater = Integer.MAX_VALUE;
connectionStatusUpdater.lazySet(this, CONN_LOW_MASK | CONN_HIGH_MASK);
}
- final NioTcpServerHandle[] handles = new NioTcpServerHandle[threadCount];
- for (int i = 0, length = threadCount; i < length; i++) {
- final SelectionKey key = threads[i].registerChannel(channel);
- handles[i] = new NioTcpServerHandle(this, key, threads[i], i < perThreadHighRem ? perThreadHigh + 1 : perThreadHigh, i < perThreadLowRem ? perThreadLow + 1 : perThreadLow);
- key.attach(handles[i]);
- }
- this.handles = handles;
- if (tokens > 0) {
- for (int i = 0; i < threadCount; i ++) {
- handles[i].initializeTokenCount(i < tokens ? connections : 0);
- }
- }
+ final SelectionKey key = thread.registerChannel(channel);
+ handle = new QueuedNioTcpServerHandle(this, thread, key, highWater, lowWater);
+ key.attach(handle);
mbeanHandle = NioXnio.register(new XnioServerMXBean() {
public String getProviderName() {
return "nio";
@@ -200,22 +193,7 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
}
public int getConnectionCount() {
- final AtomicInteger counter = new AtomicInteger();
- final CountDownLatch latch = new CountDownLatch(handles.length);
- for (final NioTcpServerHandle handle : handles) {
- handle.getWorkerThread().execute(new Runnable() {
- public void run() {
- counter.getAndAdd(handle.getConnectionCount());
- latch.countDown();
- }
- });
- }
- try {
- latch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return counter.get();
+ return handle.getConnectionCount();
}
public int getConnectionLimitHighWater() {
@@ -240,9 +218,7 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
try {
channel.close();
} finally {
- for (NioTcpServerHandle handle : handles) {
- handle.getWorkerThread().cancelKey(handle.getSelectionKey());
- }
+ handle.getWorkerThread().cancelKey(handle.getSelectionKey());
safeClose(mbeanHandle);
}
}
@@ -283,7 +259,7 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
if (option == Options.REUSE_ADDRESSES) {
old = Boolean.valueOf(socket.getReuseAddress());
socket.setReuseAddress(Options.REUSE_ADDRESSES.cast(value, Boolean.FALSE).booleanValue());
- } else if (option == Options.RECEIVE_BUFFER) {
+ } else if (option == Options.RECEIVE_BUFFER) {
old = Integer.valueOf(socket.getReceiveBufferSize());
final int newValue = Options.RECEIVE_BUFFER.cast(value, Integer.valueOf(DEFAULT_BUFFER_SIZE)).intValue();
if (newValue < 1) {
@@ -345,23 +321,6 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
}
newVal = (long)newLowWater << CONN_LOW_BIT | (long)newHighWater << CONN_HIGH_BIT;
} while (! connectionStatusUpdater.compareAndSet(this, oldVal, newVal));
-
- final NioTcpServerHandle[] conduits = handles;
- final int threadCount = conduits.length;
-
- int perThreadLow, perThreadLowRem;
- int perThreadHigh, perThreadHighRem;
-
- perThreadLow = newLowWater / threadCount;
- perThreadLowRem = newLowWater % threadCount;
- perThreadHigh = newHighWater / threadCount;
- perThreadHighRem = newHighWater % threadCount;
-
- for (int i = 0; i < conduits.length; i++) {
- NioTcpServerHandle conduit = conduits[i];
- conduit.executeSetTask(i < perThreadHighRem ? perThreadHigh + 1 : perThreadHigh, i < perThreadLowRem ? perThreadLow + 1 : perThreadLow);
- }
-
return oldVal;
}
@@ -375,14 +334,14 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
public NioSocketStreamConnection accept() throws IOException {
final WorkerThread current = WorkerThread.getCurrent();
- final NioTcpServerHandle handle = handles[current.getNumber()];
- if (! handle.getConnection()) {
+ if (current == null) {
return null;
}
+ final BlockingQueue<SocketChannel> socketChannels = acceptQueues.get(current.getNumber());
final SocketChannel accepted;
boolean ok = false;
try {
- accepted = channel.accept();
+ accepted = socketChannels.poll();
if (accepted != null) try {
accepted.configureBlocking(false);
final Socket socket = accepted.socket();
@@ -415,16 +374,16 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
return String.format("TCP server (NIO) <%s>", Integer.toHexString(hashCode()));
}
- public ChannelListener<? super NioTcpServer> getAcceptListener() {
+ public ChannelListener<? super QueuedNioTcpServer> getAcceptListener() {
return acceptListener;
}
- public void setAcceptListener(final ChannelListener<? super NioTcpServer> acceptListener) {
+ public void setAcceptListener(final ChannelListener<? super QueuedNioTcpServer> acceptListener) {
this.acceptListener = acceptListener;
}
- public ChannelListener.Setter<NioTcpServer> getAcceptSetter() {
- return new AcceptListenerSettable.Setter<NioTcpServer>(this);
+ public ChannelListener.Setter<QueuedNioTcpServer> getAcceptSetter() {
+ return new Setter<QueuedNioTcpServer>(this);
}
public boolean isOpen() {
@@ -441,37 +400,21 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
}
public void suspendAccepts() {
- resumed = false;
- doResume(0);
+ handle.suspend(0);
}
public void resumeAccepts() {
- resumed = true;
- doResume(SelectionKey.OP_ACCEPT);
+ handle.resume(SelectionKey.OP_ACCEPT);
}
public boolean isAcceptResumed() {
- return resumed;
- }
-
- private void doResume(final int op) {
- if (op == 0) {
- for (NioTcpServerHandle handle : handles) {
- handle.suspend();
- }
- } else {
- for (NioTcpServerHandle handle : handles) {
- handle.resume();
- }
- }
+ return handle.isResumed(SelectionKey.OP_ACCEPT);
}
public void wakeupAccepts() {
tcpServerLog.logf(FQCN, Logger.Level.TRACE, null, "Wake up accepts on %s", this);
resumeAccepts();
- final NioTcpServerHandle[] handles = this.handles;
- final int idx = IoUtils.getThreadLocalRandom().nextInt(handles.length);
- handles[idx].wakeup(SelectionKey.OP_ACCEPT);
+ handle.wakeup(SelectionKey.OP_ACCEPT);
}
public void awaitAcceptable() throws IOException {
@@ -487,11 +430,53 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
return getIoThread();
}
- NioTcpServerHandle getHandle(final int number) {
- return handles[number];
- }
+ void handleReady() {
+ try {
+ final SocketChannel accepted = channel.accept();
+ boolean ok = false;
+ if (accepted != null) try {
+ final SocketAddress localAddress = accepted.getLocalAddress();
+ int hash;
+ if (localAddress instanceof InetSocketAddress) {
+ final InetSocketAddress address = (InetSocketAddress) localAddress;
+ hash = address.getAddress().hashCode() * 23 + address.getPort();
+ } else if (localAddress instanceof LocalSocketAddress) {
+ hash = ((LocalSocketAddress) localAddress).getName().hashCode();
+ } else {
+ hash = localAddress.hashCode();
+ }
+ final SocketAddress remoteAddress = accepted.getRemoteAddress();
+ if (remoteAddress instanceof InetSocketAddress) {
+ final InetSocketAddress address = (InetSocketAddress) remoteAddress;
+ hash = (address.getAddress().hashCode() * 23 + address.getPort()) * 23 + hash;
+ } else if (remoteAddress instanceof LocalSocketAddress) {
+ hash = ((LocalSocketAddress) remoteAddress).getName().hashCode() * 23 + hash;
+ } else {
+ hash = localAddress.hashCode() * 23 + hash;
+ }
+ accepted.configureBlocking(false);
+ final Socket socket = accepted.socket();
+ socket.setKeepAlive(keepAlive != 0);
+ socket.setOOBInline(oobInline != 0);
+ socket.setTcpNoDelay(tcpNoDelay != 0);
+ final int sendBuffer = this.sendBuffer;
+ if (sendBuffer > 0) socket.setSendBufferSize(sendBuffer);
+ final WorkerThread ioThread = worker.getIoThread(hash);
+ final SelectionKey selectionKey = ioThread.registerChannel(accepted);
+ final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(ioThread, selectionKey, handle);
+ newConnection.setOption(Options.READ_TIMEOUT, Integer.valueOf(readTimeout));
+ newConnection.setOption(Options.WRITE_TIMEOUT, Integer.valueOf(writeTimeout));
+ ok = true;
+ final int number = ioThread.getNumber();
+ final BlockingQueue<SocketChannel> queue = acceptQueues.get(number);
+ queue.add(accepted);
+ // todo: only execute if necessary
+ ioThread.execute(acceptTask);
+ } finally {
+ if (! ok) safeClose(accepted);
+ }
+ } catch (IOException ignored) {
- int getTokenConnectionCount() {
- return tokenConnectionCount;
+ }
}
}
diff --git a/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServerHandle.java b/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServerHandle.java
new file mode 100644
index 0000000..8a86b70
--- /dev/null
+++ b/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServerHandle.java
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2015 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.xnio.nio;
+
+import static org.xnio.IoUtils.safeClose;
+
+import java.nio.channels.SelectionKey;
+
+/**
+ * @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
+ */
+final class QueuedNioTcpServerHandle extends NioHandle implements ChannelClosed {
+
+ private final QueuedNioTcpServer server;
+
+ QueuedNioTcpServerHandle(final QueuedNioTcpServer server, final WorkerThread workerThread, final SelectionKey key, final int highWater, final int lowWater) {
+ super(workerThread, key);
+ this.server = server;
+ }
+
+ void handleReady(final int ops) {
+ server.handleReady();
+ }
+
+ void forceTermination() {
+ safeClose(server);
+ }
+
+ void terminated() {
+ server.invokeCloseHandler();
+ }
+
+ public void channelClosed() {
+ freeConnection();
+ }
+
+ void freeConnection() {
+ // ignore for now
+ }
+
+ int getConnectionCount() {
+ return -1;
+ }
+}
diff --git a/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java b/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java
index 5ccdb96..19ca1a0 100644
--- a/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java
+++ b/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java
@@ -97,7 +97,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
static {
OLD_LOCKING = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.nio.old-locking", "false")));
- THREAD_SAFE_SELECTION_KEYS = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.xnio.thread-safe-selection-keys", "false")));
+ THREAD_SAFE_SELECTION_KEYS = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.nio.thread-safe-selection-keys", "false")));
}
WorkerThread(final NioXnioWorker worker, final Selector selector, final String name, final ThreadGroup group, final long stackSize, final int number) {
@@ -419,6 +419,8 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
}
}
+ volatile boolean polling;
+
public void run() {
final Selector selector = this.selector;
try {
@@ -499,11 +501,29 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
selector.selectNow();
} else if (delayTime == Long.MAX_VALUE) {
selectorLog.tracef("Beginning select on %s", selector);
- selector.select();
+ polling = true;
+ try {
+ if (workQueue.peek() != null) {
+ selector.selectNow();
+ } else {
+ selector.select();
+ }
+ } finally {
+ polling = false;
+ }
} else {
final long millis = 1L + delayTime / 1000000L;
selectorLog.tracef("Beginning select on %s (with timeout)", selector);
- selector.select(millis);
+ polling = true;
+ try {
+ if (workQueue.peek() != null) {
+ selector.selectNow();
+ } else {
+ selector.select(millis);
+ }
+ } finally {
+ polling = false;
+ }
}
} catch (CancelledKeyException ignored) {
// Mac and other buggy implementations sometimes spits these out
@@ -570,7 +590,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
synchronized (workLock) {
selectorWorkQueue.add(command);
}
- if(currentThread() != this) {
+ if (polling) { // flag is always false if we're the same thread
selector.wakeup();
}
}
@@ -605,7 +625,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
queue.add(key);
if (queue.iterator().next() == key) {
// we're the next one up; poke the selector to update its delay time
- if(currentThread() != this) {
+ if (polling) { // flag is always false if we're the same thread
selector.wakeup();
}
}
@@ -665,7 +685,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
try {
return channel.register(selector, 0);
} finally {
- selector.wakeup();
+ if (polling) selector.wakeup();
}
} else {
final SynchTask task = new SynchTask();
@@ -744,7 +764,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
} else {
try {
key.interestOps(key.interestOps() | ops);
- selector.wakeup();
+ if (polling) selector.wakeup();
} catch (CancelledKeyException ignored) {
}
}
diff --git a/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java b/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java
index 0ad5726..19d4369 100644
--- a/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java
+++ b/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.xnio.OptionMap;
import org.xnio.Options;
@@ -62,6 +63,7 @@ public class IllegalConnectionTestCase {
}
@Test
+ @Ignore
public void illegalAcceptThreads() throws IOException {
IllegalArgumentException expected;
diff --git a/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java b/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java
index 0ff0675..126f238 100644
--- a/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java
+++ b/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java
@@ -30,6 +30,7 @@ import java.net.SocketAddress;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.xnio.ChannelListener;
import org.xnio.FutureResult;
@@ -80,8 +81,8 @@ public class TcpChannelTestCase extends AbstractNioStreamChannelTest {
}
final IoFuture<ConnectedStreamChannel> connectedStreamChannel = xnioWorker.connectStream(bindAddress, null, optionMap);
final FutureResult<ConnectedStreamChannel> accepted = new FutureResult<ConnectedStreamChannel>(xnioWorker);
- server.getAcceptThread().execute(new Runnable() {
- public void run() {
+ server.getAcceptSetter().set(new ChannelListener<AcceptingChannel<? extends ConnectedStreamChannel>>() {
+ public void handleEvent(final AcceptingChannel<? extends ConnectedStreamChannel> channel) {
try {
accepted.setResult(server.accept());
} catch (IOException e) {
@@ -89,6 +90,7 @@ public class TcpChannelTestCase extends AbstractNioStreamChannelTest {
}
}
});
+ server.resumeAccepts();
serverChannel = accepted.getIoFuture().get();
channel = connectedStreamChannel.get();
assertNotNull(serverChannel);
@@ -98,6 +100,7 @@ public class TcpChannelTestCase extends AbstractNioStreamChannelTest {
}
@Test
+ @Ignore("unreliable")
public void optionSetup() throws IOException {
initChannels();
final Option<?>[] unsupportedOptions = OptionHelper.getNotSupportedOptions(Options.CLOSE_ABORT,
diff --git a/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java b/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java
index b997afb..13c1387 100644
--- a/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java
+++ b/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java
@@ -30,6 +30,7 @@ import java.net.SocketAddress;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.xnio.ChannelListener;
import org.xnio.FutureResult;
@@ -80,8 +81,8 @@ public class TcpConnectionTestCase extends AbstractStreamSinkSourceChannelTest<S
}
final IoFuture<StreamConnection> openedConnection = xnioWorker.openStreamConnection(bindAddress, null, optionMap);
final FutureResult<StreamConnection> accepted = new FutureResult<StreamConnection>(xnioWorker);
- server.getIoThread().execute(new Runnable() {
- public void run() {
+ server.getAcceptSetter().set(new ChannelListener<AcceptingChannel<? extends StreamConnection>>() {
+ public void handleEvent(final AcceptingChannel<? extends StreamConnection> channel) {
try {
accepted.setResult(server.accept());
} catch (IOException e) {
@@ -89,6 +90,7 @@ public class TcpConnectionTestCase extends AbstractStreamSinkSourceChannelTest<S
}
}
});
+ server.resumeAccepts();
serverConnection = accepted.getIoFuture().get();
connection = openedConnection.get();
assertNotNull(serverConnection);
@@ -98,6 +100,7 @@ public class TcpConnectionTestCase extends AbstractStreamSinkSourceChannelTest<S
}
@Test
+ @Ignore("unreliable")
public void optionSetup() throws IOException {
initChannels();
final Option<?>[] unsupportedOptions = OptionHelper.getNotSupportedOptions(Options.CLOSE_ABORT,
diff --git a/pom.xml b/pom.xml
index 09ee69f..5ca90db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,7 +32,7 @@
<artifactId>xnio-all</artifactId>
<packaging>pom</packaging>
<name>XNIO Parent POM</name>
- <version>3.3.2.Final</version>
+ <version>3.3.3.Final</version>
<description>The aggregator POM of the XNIO project</description>
<modules>
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-java/jboss-xnio.git
More information about the pkg-java-commits
mailing list