[jboss-xnio] 01/11: Revert "Imported Upstream version 3.3.3"
Markus Koschany
apo-guest at moszumanska.debian.org
Mon Dec 21 22:41:46 UTC 2015
This is an automated email from the git hooks/post-receive script.
apo-guest pushed a commit to branch master
in repository jboss-xnio.
commit 45497b8d8818a62c0359395e2ea2e0701ba75be8
Author: Markus Koschany <apo at debian.org>
Date: Mon Dec 21 18:57:58 2015 +0100
Revert "Imported Upstream version 3.3.3"
This reverts commit 753c3cc993b4d4ddae27235dcff940a496699a05.
---
api/pom.xml | 2 +-
.../main/java/org/xnio/ByteBufferSlicePool.java | 55 +--
api/src/main/java/org/xnio/XnioWorker.java | 10 +-
api/src/test/java/org/xnio/XnioWorkerTestCase.java | 5 -
.../test/java/org/xnio/mock/XnioWorkerMock.java | 6 +-
nio-impl/pom.xml | 2 +-
.../src/main/java/org/xnio/nio/ChannelClosed.java | 26 --
.../org/xnio/nio/NioSocketStreamConnection.java | 10 +-
.../src/main/java/org/xnio/nio/NioTcpServer.java | 26 +-
.../main/java/org/xnio/nio/NioTcpServerHandle.java | 4 +-
nio-impl/src/main/java/org/xnio/nio/NioXnio.java | 33 +-
.../src/main/java/org/xnio/nio/NioXnioWorker.java | 36 +-
.../main/java/org/xnio/nio/QueuedNioTcpServer.java | 482 ---------------------
.../org/xnio/nio/QueuedNioTcpServerHandle.java | 60 ---
.../src/main/java/org/xnio/nio/WorkerThread.java | 34 +-
.../xnio/nio/test/IllegalConnectionTestCase.java | 2 -
.../java/org/xnio/nio/test/TcpChannelTestCase.java | 7 +-
.../org/xnio/nio/test/TcpConnectionTestCase.java | 7 +-
pom.xml | 2 +-
19 files changed, 63 insertions(+), 746 deletions(-)
diff --git a/api/pom.xml b/api/pom.xml
index 35f64c4..40724ae 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -36,7 +36,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
- <version>3.3.3.Final</version>
+ <version>3.3.2.Final</version>
</parent>
<dependencies>
diff --git a/api/src/main/java/org/xnio/ByteBufferSlicePool.java b/api/src/main/java/org/xnio/ByteBufferSlicePool.java
index 2f5f1ef..61913c4 100644
--- a/api/src/main/java/org/xnio/ByteBufferSlicePool.java
+++ b/api/src/main/java/org/xnio/ByteBufferSlicePool.java
@@ -61,7 +61,7 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
private final ThreadLocal<ThreadLocalCache> localQueueHolder = new ThreadLocal<ThreadLocalCache>() {
protected ThreadLocalCache initialValue() {
//noinspection serial
- return new ThreadLocalCache();
+ return new ThreadLocalCache(this);
}
public void remove() {
@@ -120,16 +120,13 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
/** {@inheritDoc} */
public Pooled<ByteBuffer> allocate() {
- Slice slice;
- if (threadLocalQueueSize > 0) {
- ThreadLocalCache localCache = localQueueHolder.get();
- if(localCache.outstanding != threadLocalQueueSize) {
- localCache.outstanding++;
- }
- slice = localCache.queue.poll();
- if (slice != null) {
- return new PooledByteBuffer(slice, slice.slice());
- }
+ ThreadLocalCache localCache = localQueueHolder.get();
+ if(localCache.outstanding != threadLocalQueueSize) {
+ localCache.outstanding++;
+ }
+ Slice slice = localCache.queue.poll();
+ if (slice != null) {
+ return new PooledByteBuffer(slice, slice.slice());
}
final Queue<Slice> sliceQueue = this.sliceQueue;
slice = sliceQueue.poll();
@@ -162,21 +159,17 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
}
private void doFree(Slice region) {
- if (threadLocalQueueSize > 0) {
- final ThreadLocalCache localCache = localQueueHolder.get();
- boolean cacheOk = false;
- if(localCache.outstanding > 0) {
- localCache.outstanding--;
- cacheOk = true;
- }
- ArrayDeque<Slice> localQueue = localCache.queue;
- if (localQueue.size() == threadLocalQueueSize || !cacheOk) {
- sliceQueue.add(region);
- } else {
- localQueue.add(region);
- }
- } else {
+ final ThreadLocalCache localCache = localQueueHolder.get();
+ boolean cacheOk = false;
+ if(localCache.outstanding > 0) {
+ localCache.outstanding--;
+ cacheOk = true;
+ }
+ ArrayDeque<Slice> localQueue = localCache.queue;
+ if (localQueue.size() == threadLocalQueueSize || !cacheOk) {
sliceQueue.add(region);
+ } else {
+ localQueue.add(region);
}
}
@@ -252,24 +245,22 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
private final class ThreadLocalCache {
+ private final ThreadLocal<ThreadLocalCache> threadLocal;
+
final ArrayDeque<Slice> queue = new ArrayDeque<Slice>(threadLocalQueueSize) {
/**
* This sucks but there's no other way to ensure these buffers are returned to the pool.
*/
protected void finalize() {
- final ArrayDeque<Slice> deque = queue;
- Slice slice = deque.poll();
- while (slice != null) {
- doFree(slice);
- slice = deque.poll();
- }
+ threadLocal.remove();
}
};
int outstanding = 0;
- ThreadLocalCache() {
+ private ThreadLocalCache(ThreadLocal<ThreadLocalCache> threadLocal) {
+ this.threadLocal = threadLocal;
}
}
}
diff --git a/api/src/main/java/org/xnio/XnioWorker.java b/api/src/main/java/org/xnio/XnioWorker.java
index dacc224..52a01af 100644
--- a/api/src/main/java/org/xnio/XnioWorker.java
+++ b/api/src/main/java/org/xnio/XnioWorker.java
@@ -199,7 +199,7 @@ public abstract class XnioWorker extends AbstractExecutorService implements Conf
}
public XnioIoThread getIoThread() {
- return server.getIoThread();
+ return chooseThread();
}
public void close() throws IOException {
@@ -702,14 +702,6 @@ public abstract class XnioWorker extends AbstractExecutorService implements Conf
}
/**
- * Get an I/O thread from this worker. The thread is chosen based on the given hash code.
- *
- * @param hashCode the hash code
- * @return the thread
- */
- public abstract XnioIoThread getIoThread(int hashCode);
-
- /**
* Get the user task to run once termination is complete.
*
* @return the termination task
diff --git a/api/src/test/java/org/xnio/XnioWorkerTestCase.java b/api/src/test/java/org/xnio/XnioWorkerTestCase.java
index 4f20017..49c0566 100644
--- a/api/src/test/java/org/xnio/XnioWorkerTestCase.java
+++ b/api/src/test/java/org/xnio/XnioWorkerTestCase.java
@@ -805,11 +805,6 @@ public class XnioWorkerTestCase {
}
@Override
- public XnioIoThread getIoThread(final int hashCode) {
- return new XnioIoThreadMock(this);
- }
-
- @Override
public int getIoThreadCount() {
return 0;
}
diff --git a/api/src/test/java/org/xnio/mock/XnioWorkerMock.java b/api/src/test/java/org/xnio/mock/XnioWorkerMock.java
index f489eef..dc635a9 100644
--- a/api/src/test/java/org/xnio/mock/XnioWorkerMock.java
+++ b/api/src/test/java/org/xnio/mock/XnioWorkerMock.java
@@ -97,11 +97,6 @@ public class XnioWorkerMock extends XnioWorker {
return mockThread;
}
- @Override
- public XnioIoThreadMock getIoThread(final int hashCode) {
- return mockThread;
- }
-
/**
* Returns the connect behavior of this worker mock.
*/
@@ -184,4 +179,5 @@ public class XnioWorkerMock extends XnioWorker {
@Override
public void awaitTermination() throws InterruptedException {
}
+
}
diff --git a/nio-impl/pom.xml b/nio-impl/pom.xml
index badecd8..4c0a65b 100644
--- a/nio-impl/pom.xml
+++ b/nio-impl/pom.xml
@@ -31,7 +31,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
- <version>3.3.3.Final</version>
+ <version>3.3.2.Final</version>
</parent>
<properties>
diff --git a/nio-impl/src/main/java/org/xnio/nio/ChannelClosed.java b/nio-impl/src/main/java/org/xnio/nio/ChannelClosed.java
deleted file mode 100644
index 166bc93..0000000
--- a/nio-impl/src/main/java/org/xnio/nio/ChannelClosed.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Copyright 2015 Red Hat, Inc. and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.xnio.nio;
-
-/**
- * @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
- */
-public interface ChannelClosed {
- void channelClosed();
-}
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java b/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java
index fc18c5b..867ec4c 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioSocketStreamConnection.java
@@ -34,14 +34,14 @@ import org.xnio.Options;
*/
final class NioSocketStreamConnection extends AbstractNioStreamConnection {
- private final ChannelClosed closedHandle;
+ private final NioTcpServerHandle serverConduit;
private final NioSocketConduit conduit;
- NioSocketStreamConnection(final WorkerThread workerThread, final SelectionKey key, final ChannelClosed closedHandle) {
+ NioSocketStreamConnection(final WorkerThread workerThread, final SelectionKey key, final NioTcpServerHandle serverConduit) {
super(workerThread);
conduit = new NioSocketConduit(workerThread, key, this);
key.attach(conduit);
- this.closedHandle = closedHandle;
+ this.serverConduit = serverConduit;
setSinkConduit(conduit);
setSourceConduit(conduit);
}
@@ -135,8 +135,8 @@ final class NioSocketStreamConnection extends AbstractNioStreamConnection {
conduit.getSocketChannel().close();
} catch (ClosedChannelException ignored) {
} finally {
- final ChannelClosed closedHandle = this.closedHandle;
- if (closedHandle!= null) closedHandle.channelClosed();
+ final NioTcpServerHandle conduit = this.serverConduit;
+ if (conduit!= null) conduit.channelClosed();
}
}
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java b/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
index a8f8351..9e659d8 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioTcpServer.java
@@ -20,7 +20,6 @@ package org.xnio.nio;
import java.io.Closeable;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
@@ -35,7 +34,6 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.jboss.logging.Logger;
import org.xnio.IoUtils;
-import org.xnio.LocalSocketAddress;
import org.xnio.Option;
import org.xnio.ChannelListener;
import org.xnio.OptionMap;
@@ -386,25 +384,6 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
try {
accepted = channel.accept();
if (accepted != null) try {
- final SocketAddress localAddress = accepted.getLocalAddress();
- int hash;
- if (localAddress instanceof InetSocketAddress) {
- final InetSocketAddress address = (InetSocketAddress) localAddress;
- hash = address.getAddress().hashCode() * 23 + address.getPort();
- } else if (localAddress instanceof LocalSocketAddress) {
- hash = ((LocalSocketAddress) localAddress).getName().hashCode();
- } else {
- hash = localAddress.hashCode();
- }
- final SocketAddress remoteAddress = accepted.getRemoteAddress();
- if (remoteAddress instanceof InetSocketAddress) {
- final InetSocketAddress address = (InetSocketAddress) remoteAddress;
- hash = (address.getAddress().hashCode() * 23 + address.getPort()) * 23 + hash;
- } else if (remoteAddress instanceof LocalSocketAddress) {
- hash = ((LocalSocketAddress) remoteAddress).getName().hashCode() * 23 + hash;
- } else {
- hash = localAddress.hashCode() * 23 + hash;
- }
accepted.configureBlocking(false);
final Socket socket = accepted.socket();
socket.setKeepAlive(keepAlive != 0);
@@ -412,9 +391,8 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
socket.setTcpNoDelay(tcpNoDelay != 0);
final int sendBuffer = this.sendBuffer;
if (sendBuffer > 0) socket.setSendBufferSize(sendBuffer);
- final WorkerThread ioThread = worker.getIoThread(hash);
- final SelectionKey selectionKey = ioThread.registerChannel(accepted);
- final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(ioThread, selectionKey, handle);
+ final SelectionKey selectionKey = current.registerChannel(accepted);
+ final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(current, selectionKey, handle);
newConnection.setOption(Options.READ_TIMEOUT, Integer.valueOf(readTimeout));
newConnection.setOption(Options.WRITE_TIMEOUT, Integer.valueOf(writeTimeout));
ok = true;
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java b/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java
index 0dd63d3..1b20c21 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioTcpServerHandle.java
@@ -27,7 +27,7 @@ import static org.xnio.IoUtils.safeClose;
/**
* @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
*/
-final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
+final class NioTcpServerHandle extends NioHandle {
private final Runnable freeTask;
private final NioTcpServer server;
@@ -91,7 +91,7 @@ final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
}
}
- public void channelClosed() {
+ void channelClosed() {
final WorkerThread thread = getWorkerThread();
if (thread == currentThread()) {
freeConnection();
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioXnio.java b/nio-impl/src/main/java/org/xnio/nio/NioXnio.java
index e7fcc0c..e9d5280 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioXnio.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioXnio.java
@@ -215,25 +215,22 @@ final class NioXnio extends Xnio {
return super.createFileSystemWatcher(name, options);
}
- private final ThreadLocal<FinalizableSelectorHolder> selectorThreadLocal = new ThreadLocal<FinalizableSelectorHolder>() {
+ private final ThreadLocal<Selector> selectorThreadLocal = new ThreadLocal<Selector>() {
public void remove() {
// if no selector was created, none will be closed
- FinalizableSelectorHolder holder = get();
- if(holder != null) {
- IoUtils.safeClose(holder.selector);
- }
+ IoUtils.safeClose(get());
super.remove();
}
};
Selector getSelector() throws IOException {
- final ThreadLocal<FinalizableSelectorHolder> threadLocal = selectorThreadLocal;
- FinalizableSelectorHolder holder = threadLocal.get();
- if (holder == null) {
- holder = new FinalizableSelectorHolder(tempSelectorCreator.open());
- threadLocal.set(holder);
+ final ThreadLocal<Selector> threadLocal = selectorThreadLocal;
+ Selector selector = threadLocal.get();
+ if (selector == null) {
+ selector = tempSelectorCreator.open();
+ threadLocal.set(selector);
}
- return holder.selector;
+ return selector;
}
private static class DefaultSelectorCreator implements SelectorCreator {
@@ -295,18 +292,4 @@ final class NioXnio extends Xnio {
protected static Closeable register(XnioServerMXBean serverMXBean) {
return Xnio.register(serverMXBean);
}
-
- private static final class FinalizableSelectorHolder {
- final Selector selector;
-
-
- private FinalizableSelectorHolder(Selector selector) {
- this.selector = selector;
- }
-
- @Override
- protected void finalize() throws Throwable {
- IoUtils.safeClose(selector);
- }
- }
}
diff --git a/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java b/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java
index 9a36028..bdd7fdc 100644
--- a/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java
+++ b/nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java
@@ -27,7 +27,7 @@ import java.net.StandardProtocolFamily;
import java.nio.channels.DatagramChannel;
import java.nio.channels.ServerSocketChannel;
import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -60,7 +60,6 @@ final class NioXnioWorker extends XnioWorker {
private volatile int state = 1;
private final WorkerThread[] workerThreads;
- private final WorkerThread acceptThread;
private final Closeable mbeanHandle;
@SuppressWarnings("unused")
@@ -101,10 +100,6 @@ final class NioXnioWorker extends XnioWorker {
}
workerThreads[i] = workerThread;
}
- acceptThread = new WorkerThread(this, xnio.mainSelectorCreator.open(), String.format("%s Accept", workerName), threadGroup, workerStackSize, threadCount);
- if (markWorkerThreadAsDaemon) {
- acceptThread.setDaemon(true);
- }
ok = true;
} finally {
if (! ok) {
@@ -150,15 +145,9 @@ final class NioXnioWorker extends XnioWorker {
openResourceUnconditionally();
worker.start();
}
- openResourceUnconditionally();
- acceptThread.start();
}
protected WorkerThread chooseThread() {
- return getIoThread(ThreadLocalRandom.current().nextInt());
- }
-
- public WorkerThread getIoThread(final int hashCode) {
final WorkerThread[] workerThreads = this.workerThreads;
final int length = workerThreads.length;
if (length == 0) {
@@ -167,7 +156,8 @@ final class NioXnioWorker extends XnioWorker {
if (length == 1) {
return workerThreads[0];
}
- return workerThreads[Math.floorMod(hashCode, length)];
+ final Random random = IoUtils.getThreadLocalRandom();
+ return workerThreads[random.nextInt(length)];
}
public int getIoThreadCount() {
@@ -191,17 +181,10 @@ final class NioXnioWorker extends XnioWorker {
} else {
channel.socket().bind(bindAddress);
}
- if (false) {
- final NioTcpServer server = new NioTcpServer(this, channel, optionMap);
- server.setAcceptListener(acceptListener);
- ok = true;
- return server;
- } else {
- final QueuedNioTcpServer server = new QueuedNioTcpServer(this, channel, optionMap);
- server.setAcceptListener(acceptListener);
- ok = true;
- return server;
- }
+ final NioTcpServer server = new NioTcpServer(this, channel, optionMap);
+ server.setAcceptListener(acceptListener);
+ ok = true;
+ return server;
} finally {
if (! ok) {
IoUtils.safeClose(channel);
@@ -292,7 +275,6 @@ final class NioXnioWorker extends XnioWorker {
for (WorkerThread worker : workerThreads) {
worker.shutdown();
}
- acceptThread.shutdown();
shutDownTaskPool();
return;
}
@@ -370,8 +352,4 @@ final class NioXnioWorker extends XnioWorker {
public NioXnio getXnio() {
return (NioXnio) super.getXnio();
}
-
- WorkerThread getAcceptThread() {
- return acceptThread;
- }
}
diff --git a/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServer.java b/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServer.java
deleted file mode 100644
index a401c45..0000000
--- a/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServer.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Copyright 2015 Red Hat, Inc. and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.xnio.nio;
-
-import static org.xnio.IoUtils.safeClose;
-import static org.xnio.nio.Log.log;
-import static org.xnio.nio.Log.tcpServerLog;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
-import org.jboss.logging.Logger;
-import org.xnio.ChannelListener;
-import org.xnio.ChannelListeners;
-import org.xnio.LocalSocketAddress;
-import org.xnio.Option;
-import org.xnio.OptionMap;
-import org.xnio.Options;
-import org.xnio.StreamConnection;
-import org.xnio.XnioExecutor;
-import org.xnio.channels.AcceptListenerSettable;
-import org.xnio.channels.AcceptingChannel;
-import org.xnio.channels.UnsupportedOptionException;
-import org.xnio.management.XnioServerMXBean;
-
-final class QueuedNioTcpServer extends AbstractNioChannel<QueuedNioTcpServer> implements AcceptingChannel<StreamConnection>, AcceptListenerSettable<QueuedNioTcpServer> {
- private static final String FQCN = QueuedNioTcpServer.class.getName();
-
- private volatile ChannelListener<? super QueuedNioTcpServer> acceptListener;
-
- private final QueuedNioTcpServerHandle handle;
- private final WorkerThread thread;
-
- private final ServerSocketChannel channel;
- private final ServerSocket socket;
- private final Closeable mbeanHandle;
-
- private final List<BlockingQueue<SocketChannel>> acceptQueues;
-
- private static final Set<Option<?>> options = Option.setBuilder()
- .add(Options.REUSE_ADDRESSES)
- .add(Options.RECEIVE_BUFFER)
- .add(Options.SEND_BUFFER)
- .add(Options.KEEP_ALIVE)
- .add(Options.TCP_OOB_INLINE)
- .add(Options.TCP_NODELAY)
- .add(Options.CONNECTION_HIGH_WATER)
- .add(Options.CONNECTION_LOW_WATER)
- .add(Options.READ_TIMEOUT)
- .add(Options.WRITE_TIMEOUT)
- .create();
-
- @SuppressWarnings("unused")
- private volatile int keepAlive;
- @SuppressWarnings("unused")
- private volatile int oobInline;
- @SuppressWarnings("unused")
- private volatile int tcpNoDelay;
- @SuppressWarnings("unused")
- private volatile int sendBuffer = -1;
- @SuppressWarnings("unused")
- private volatile long connectionStatus = CONN_LOW_MASK | CONN_HIGH_MASK;
- @SuppressWarnings("unused")
- private volatile int readTimeout;
- @SuppressWarnings("unused")
- private volatile int writeTimeout;
-
- private static final long CONN_LOW_MASK = 0x000000007FFFFFFFL;
- private static final long CONN_LOW_BIT = 0L;
- @SuppressWarnings("unused")
- private static final long CONN_LOW_ONE = 1L;
- private static final long CONN_HIGH_MASK = 0x3FFFFFFF80000000L;
- private static final long CONN_HIGH_BIT = 31L;
- @SuppressWarnings("unused")
- private static final long CONN_HIGH_ONE = 1L << CONN_HIGH_BIT;
-
- private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> keepAliveUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "keepAlive");
- private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> oobInlineUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "oobInline");
- private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> tcpNoDelayUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "tcpNoDelay");
- private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> sendBufferUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "sendBuffer");
- private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> readTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "readTimeout");
- private static final AtomicIntegerFieldUpdater<QueuedNioTcpServer> writeTimeoutUpdater = AtomicIntegerFieldUpdater.newUpdater(QueuedNioTcpServer.class, "writeTimeout");
-
- private static final AtomicLongFieldUpdater<QueuedNioTcpServer> connectionStatusUpdater = AtomicLongFieldUpdater.newUpdater(QueuedNioTcpServer.class, "connectionStatus");
- private final Runnable acceptTask = new Runnable() {
- public void run() {
- final WorkerThread current = WorkerThread.getCurrent();
- assert current != null;
- final BlockingQueue<SocketChannel> queue = acceptQueues.get(current.getNumber());
- ChannelListeners.invokeChannelListener(QueuedNioTcpServer.this, getAcceptListener());
- if (! queue.isEmpty()) {
- current.execute(this);
- }
- }
- };
-
- QueuedNioTcpServer(final NioXnioWorker worker, final ServerSocketChannel channel, final OptionMap optionMap) throws IOException {
- super(worker);
- this.channel = channel;
- this.thread = worker.getAcceptThread();
- final WorkerThread[] workerThreads = worker.getAll();
- final List<BlockingQueue<SocketChannel>> acceptQueues = new ArrayList<>(workerThreads.length);
- for (int i = 0; i < workerThreads.length; i++) {
- acceptQueues.add(i, new LinkedBlockingQueue<SocketChannel>());
- }
- this.acceptQueues = acceptQueues;
- socket = channel.socket();
- if (optionMap.contains(Options.SEND_BUFFER)) {
- final int sendBufferSize = optionMap.get(Options.SEND_BUFFER, DEFAULT_BUFFER_SIZE);
- if (sendBufferSize < 1) {
- throw log.parameterOutOfRange("sendBufferSize");
- }
- sendBufferUpdater.set(this, sendBufferSize);
- }
- if (optionMap.contains(Options.KEEP_ALIVE)) {
- keepAliveUpdater.lazySet(this, optionMap.get(Options.KEEP_ALIVE, false) ? 1 : 0);
- }
- if (optionMap.contains(Options.TCP_OOB_INLINE)) {
- oobInlineUpdater.lazySet(this, optionMap.get(Options.TCP_OOB_INLINE, false) ? 1 : 0);
- }
- if (optionMap.contains(Options.TCP_NODELAY)) {
- tcpNoDelayUpdater.lazySet(this, optionMap.get(Options.TCP_NODELAY, false) ? 1 : 0);
- }
- if (optionMap.contains(Options.READ_TIMEOUT)) {
- readTimeoutUpdater.lazySet(this, optionMap.get(Options.READ_TIMEOUT, 0));
- }
- if (optionMap.contains(Options.WRITE_TIMEOUT)) {
- writeTimeoutUpdater.lazySet(this, optionMap.get(Options.WRITE_TIMEOUT, 0));
- }
- final int highWater;
- final int lowWater;
- if (optionMap.contains(Options.CONNECTION_HIGH_WATER) || optionMap.contains(Options.CONNECTION_LOW_WATER)) {
- highWater = optionMap.get(Options.CONNECTION_HIGH_WATER, Integer.MAX_VALUE);
- lowWater = optionMap.get(Options.CONNECTION_LOW_WATER, highWater);
- if (highWater <= 0) {
- throw badHighWater();
- }
- if (lowWater <= 0 || lowWater > highWater) {
- throw badLowWater(highWater);
- }
- final long highLowWater = (long) highWater << CONN_HIGH_BIT | (long) lowWater << CONN_LOW_BIT;
- connectionStatusUpdater.lazySet(this, highLowWater);
- } else {
- highWater = Integer.MAX_VALUE;
- lowWater = Integer.MAX_VALUE;
- connectionStatusUpdater.lazySet(this, CONN_LOW_MASK | CONN_HIGH_MASK);
- }
- final SelectionKey key = thread.registerChannel(channel);
- handle = new QueuedNioTcpServerHandle(this, thread, key, highWater, lowWater);
- key.attach(handle);
- mbeanHandle = NioXnio.register(new XnioServerMXBean() {
- public String getProviderName() {
- return "nio";
- }
-
- public String getWorkerName() {
- return worker.getName();
- }
-
- public String getBindAddress() {
- return String.valueOf(getLocalAddress());
- }
-
- public int getConnectionCount() {
- return handle.getConnectionCount();
- }
-
- public int getConnectionLimitHighWater() {
- return getHighWater(connectionStatus);
- }
-
- public int getConnectionLimitLowWater() {
- return getLowWater(connectionStatus);
- }
- });
- }
-
- private static IllegalArgumentException badLowWater(final int highWater) {
- return new IllegalArgumentException("Low water must be greater than 0 and less than or equal to high water (" + highWater + ")");
- }
-
- private static IllegalArgumentException badHighWater() {
- return new IllegalArgumentException("High water must be greater than 0");
- }
-
- public void close() throws IOException {
- try {
- channel.close();
- } finally {
- handle.getWorkerThread().cancelKey(handle.getSelectionKey());
- safeClose(mbeanHandle);
- }
- }
-
- public boolean supportsOption(final Option<?> option) {
- return options.contains(option);
- }
-
- public <T> T getOption(final Option<T> option) throws UnsupportedOptionException, IOException {
- if (option == Options.REUSE_ADDRESSES) {
- return option.cast(Boolean.valueOf(socket.getReuseAddress()));
- } else if (option == Options.RECEIVE_BUFFER) {
- return option.cast(Integer.valueOf(socket.getReceiveBufferSize()));
- } else if (option == Options.SEND_BUFFER) {
- final int value = sendBuffer;
- return value == -1 ? null : option.cast(Integer.valueOf(value));
- } else if (option == Options.KEEP_ALIVE) {
- return option.cast(Boolean.valueOf(keepAlive != 0));
- } else if (option == Options.TCP_OOB_INLINE) {
- return option.cast(Boolean.valueOf(oobInline != 0));
- } else if (option == Options.TCP_NODELAY) {
- return option.cast(Boolean.valueOf(tcpNoDelay != 0));
- } else if (option == Options.READ_TIMEOUT) {
- return option.cast(Integer.valueOf(readTimeout));
- } else if (option == Options.WRITE_TIMEOUT) {
- return option.cast(Integer.valueOf(writeTimeout));
- } else if (option == Options.CONNECTION_HIGH_WATER) {
- return option.cast(Integer.valueOf(getHighWater(connectionStatus)));
- } else if (option == Options.CONNECTION_LOW_WATER) {
- return option.cast(Integer.valueOf(getLowWater(connectionStatus)));
- } else {
- return null;
- }
- }
-
- public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
- final Object old;
- if (option == Options.REUSE_ADDRESSES) {
- old = Boolean.valueOf(socket.getReuseAddress());
- socket.setReuseAddress(Options.REUSE_ADDRESSES.cast(value, Boolean.FALSE).booleanValue());
- } else if (option == Options.RECEIVE_BUFFER) {
- old = Integer.valueOf(socket.getReceiveBufferSize());
- final int newValue = Options.RECEIVE_BUFFER.cast(value, Integer.valueOf(DEFAULT_BUFFER_SIZE)).intValue();
- if (newValue < 1) {
- throw log.optionOutOfRange("RECEIVE_BUFFER");
- }
- socket.setReceiveBufferSize(newValue);
- } else if (option == Options.SEND_BUFFER) {
- final int newValue = Options.SEND_BUFFER.cast(value, Integer.valueOf(DEFAULT_BUFFER_SIZE)).intValue();
- if (newValue < 1) {
- throw log.optionOutOfRange("SEND_BUFFER");
- }
- final int oldValue = sendBufferUpdater.getAndSet(this, newValue);
- old = oldValue == -1 ? null : Integer.valueOf(oldValue);
- } else if (option == Options.KEEP_ALIVE) {
- old = Boolean.valueOf(keepAliveUpdater.getAndSet(this, Options.KEEP_ALIVE.cast(value, Boolean.FALSE).booleanValue() ? 1 : 0) != 0);
- } else if (option == Options.TCP_OOB_INLINE) {
- old = Boolean.valueOf(oobInlineUpdater.getAndSet(this, Options.TCP_OOB_INLINE.cast(value, Boolean.FALSE).booleanValue() ? 1 : 0) != 0);
- } else if (option == Options.TCP_NODELAY) {
- old = Boolean.valueOf(tcpNoDelayUpdater.getAndSet(this, Options.TCP_NODELAY.cast(value, Boolean.FALSE).booleanValue() ? 1 : 0) != 0);
- } else if (option == Options.READ_TIMEOUT) {
- old = Integer.valueOf(readTimeoutUpdater.getAndSet(this, Options.READ_TIMEOUT.cast(value, Integer.valueOf(0)).intValue()));
- } else if (option == Options.WRITE_TIMEOUT) {
- old = Integer.valueOf(writeTimeoutUpdater.getAndSet(this, Options.WRITE_TIMEOUT.cast(value, Integer.valueOf(0)).intValue()));
- } else if (option == Options.CONNECTION_HIGH_WATER) {
- old = Integer.valueOf(getHighWater(updateWaterMark(-1, Options.CONNECTION_HIGH_WATER.cast(value, Integer.valueOf(Integer.MAX_VALUE)).intValue())));
- } else if (option == Options.CONNECTION_LOW_WATER) {
- old = Integer.valueOf(getLowWater(updateWaterMark(Options.CONNECTION_LOW_WATER.cast(value, Integer.valueOf(Integer.MAX_VALUE)).intValue(), -1)));
- } else {
- return null;
- }
- return option.cast(old);
- }
-
- private long updateWaterMark(int reqNewLowWater, int reqNewHighWater) {
- // at least one must be specified
- assert reqNewLowWater != -1 || reqNewHighWater != -1;
- // if both given, low must be less than high
- assert reqNewLowWater == -1 || reqNewHighWater == -1 || reqNewLowWater <= reqNewHighWater;
-
- long oldVal, newVal;
- int oldHighWater, oldLowWater;
- int newLowWater, newHighWater;
-
- do {
- oldVal = connectionStatus;
- oldLowWater = getLowWater(oldVal);
- oldHighWater = getHighWater(oldVal);
- newLowWater = reqNewLowWater == -1 ? oldLowWater : reqNewLowWater;
- newHighWater = reqNewHighWater == -1 ? oldHighWater : reqNewHighWater;
- // Make sure the new values make sense
- if (reqNewLowWater != -1 && newLowWater > newHighWater) {
- newHighWater = newLowWater;
- } else if (reqNewHighWater != -1 && newHighWater < newLowWater) {
- newLowWater = newHighWater;
- }
- // See if the change would be redundant
- if (oldLowWater == newLowWater && oldHighWater == newHighWater) {
- return oldVal;
- }
- newVal = (long)newLowWater << CONN_LOW_BIT | (long)newHighWater << CONN_HIGH_BIT;
- } while (! connectionStatusUpdater.compareAndSet(this, oldVal, newVal));
- return oldVal;
- }
-
- private static int getHighWater(final long value) {
- return (int) ((value & CONN_HIGH_MASK) >> CONN_HIGH_BIT);
- }
-
- private static int getLowWater(final long value) {
- return (int) ((value & CONN_LOW_MASK) >> CONN_LOW_BIT);
- }
-
- public NioSocketStreamConnection accept() throws IOException {
- final WorkerThread current = WorkerThread.getCurrent();
- if (current == null) {
- return null;
- }
- final BlockingQueue<SocketChannel> socketChannels = acceptQueues.get(current.getNumber());
- final SocketChannel accepted;
- boolean ok = false;
- try {
- accepted = socketChannels.poll();
- if (accepted != null) try {
- accepted.configureBlocking(false);
- final Socket socket = accepted.socket();
- socket.setKeepAlive(keepAlive != 0);
- socket.setOOBInline(oobInline != 0);
- socket.setTcpNoDelay(tcpNoDelay != 0);
- final int sendBuffer = this.sendBuffer;
- if (sendBuffer > 0) socket.setSendBufferSize(sendBuffer);
- final SelectionKey selectionKey = current.registerChannel(accepted);
- final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(current, selectionKey, handle);
- newConnection.setOption(Options.READ_TIMEOUT, Integer.valueOf(readTimeout));
- newConnection.setOption(Options.WRITE_TIMEOUT, Integer.valueOf(writeTimeout));
- ok = true;
- return newConnection;
- } finally {
- if (! ok) safeClose(accepted);
- }
- } catch (IOException e) {
- return null;
- } finally {
- if (! ok) {
- handle.freeConnection();
- }
- }
- // by contract, only a resume will do
- return null;
- }
-
- public String toString() {
- return String.format("TCP server (NIO) <%s>", Integer.toHexString(hashCode()));
- }
-
- public ChannelListener<? super QueuedNioTcpServer> getAcceptListener() {
- return acceptListener;
- }
-
- public void setAcceptListener(final ChannelListener<? super QueuedNioTcpServer> acceptListener) {
- this.acceptListener = acceptListener;
- }
-
- public ChannelListener.Setter<QueuedNioTcpServer> getAcceptSetter() {
- return new Setter<QueuedNioTcpServer>(this);
- }
-
- public boolean isOpen() {
- return channel.isOpen();
- }
-
- public SocketAddress getLocalAddress() {
- return socket.getLocalSocketAddress();
- }
-
- public <A extends SocketAddress> A getLocalAddress(final Class<A> type) {
- final SocketAddress address = getLocalAddress();
- return type.isInstance(address) ? type.cast(address) : null;
- }
-
- public void suspendAccepts() {
- handle.suspend(0);
- }
-
- public void resumeAccepts() {
- handle.resume(SelectionKey.OP_ACCEPT);
- }
-
- public boolean isAcceptResumed() {
- return handle.isResumed(SelectionKey.OP_ACCEPT);
- }
-
- public void wakeupAccepts() {
- tcpServerLog.logf(FQCN, Logger.Level.TRACE, null, "Wake up accepts on %s", this);
- resumeAccepts();
- handle.wakeup(SelectionKey.OP_ACCEPT);
- }
-
- public void awaitAcceptable() throws IOException {
- throw log.unsupported("awaitAcceptable");
- }
-
- public void awaitAcceptable(final long time, final TimeUnit timeUnit) throws IOException {
- throw log.unsupported("awaitAcceptable");
- }
-
- @Deprecated
- public XnioExecutor getAcceptThread() {
- return getIoThread();
- }
-
- void handleReady() {
- try {
- final SocketChannel accepted = channel.accept();
- boolean ok = false;
- if (accepted != null) try {
- final SocketAddress localAddress = accepted.getLocalAddress();
- int hash;
- if (localAddress instanceof InetSocketAddress) {
- final InetSocketAddress address = (InetSocketAddress) localAddress;
- hash = address.getAddress().hashCode() * 23 + address.getPort();
- } else if (localAddress instanceof LocalSocketAddress) {
- hash = ((LocalSocketAddress) localAddress).getName().hashCode();
- } else {
- hash = localAddress.hashCode();
- }
- final SocketAddress remoteAddress = accepted.getRemoteAddress();
- if (remoteAddress instanceof InetSocketAddress) {
- final InetSocketAddress address = (InetSocketAddress) remoteAddress;
- hash = (address.getAddress().hashCode() * 23 + address.getPort()) * 23 + hash;
- } else if (remoteAddress instanceof LocalSocketAddress) {
- hash = ((LocalSocketAddress) remoteAddress).getName().hashCode() * 23 + hash;
- } else {
- hash = localAddress.hashCode() * 23 + hash;
- }
- accepted.configureBlocking(false);
- final Socket socket = accepted.socket();
- socket.setKeepAlive(keepAlive != 0);
- socket.setOOBInline(oobInline != 0);
- socket.setTcpNoDelay(tcpNoDelay != 0);
- final int sendBuffer = this.sendBuffer;
- if (sendBuffer > 0) socket.setSendBufferSize(sendBuffer);
- final WorkerThread ioThread = worker.getIoThread(hash);
- final SelectionKey selectionKey = ioThread.registerChannel(accepted);
- final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(ioThread, selectionKey, handle);
- newConnection.setOption(Options.READ_TIMEOUT, Integer.valueOf(readTimeout));
- newConnection.setOption(Options.WRITE_TIMEOUT, Integer.valueOf(writeTimeout));
- ok = true;
- final int number = ioThread.getNumber();
- final BlockingQueue<SocketChannel> queue = acceptQueues.get(number);
- queue.add(accepted);
- // todo: only execute if necessary
- ioThread.execute(acceptTask);
- } finally {
- if (! ok) safeClose(accepted);
- }
- } catch (IOException ignored) {
-
- }
- }
-}
diff --git a/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServerHandle.java b/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServerHandle.java
deleted file mode 100644
index 8a86b70..0000000
--- a/nio-impl/src/main/java/org/xnio/nio/QueuedNioTcpServerHandle.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Copyright 2015 Red Hat, Inc. and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.xnio.nio;
-
-import static org.xnio.IoUtils.safeClose;
-
-import java.nio.channels.SelectionKey;
-
-/**
- * @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
- */
-final class QueuedNioTcpServerHandle extends NioHandle implements ChannelClosed {
-
- private final QueuedNioTcpServer server;
-
- QueuedNioTcpServerHandle(final QueuedNioTcpServer server, final WorkerThread workerThread, final SelectionKey key, final int highWater, final int lowWater) {
- super(workerThread, key);
- this.server = server;
- }
-
- void handleReady(final int ops) {
- server.handleReady();
- }
-
- void forceTermination() {
- safeClose(server);
- }
-
- void terminated() {
- server.invokeCloseHandler();
- }
-
- public void channelClosed() {
- freeConnection();
- }
-
- void freeConnection() {
- // ignore for now
- }
-
- int getConnectionCount() {
- return -1;
- }
-}
diff --git a/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java b/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java
index 19ca1a0..5ccdb96 100644
--- a/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java
+++ b/nio-impl/src/main/java/org/xnio/nio/WorkerThread.java
@@ -97,7 +97,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
static {
OLD_LOCKING = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.nio.old-locking", "false")));
- THREAD_SAFE_SELECTION_KEYS = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.nio.thread-safe-selection-keys", "false")));
+ THREAD_SAFE_SELECTION_KEYS = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.xnio.thread-safe-selection-keys", "false")));
}
WorkerThread(final NioXnioWorker worker, final Selector selector, final String name, final ThreadGroup group, final long stackSize, final int number) {
@@ -419,8 +419,6 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
}
}
- volatile boolean polling;
-
public void run() {
final Selector selector = this.selector;
try {
@@ -501,29 +499,11 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
selector.selectNow();
} else if (delayTime == Long.MAX_VALUE) {
selectorLog.tracef("Beginning select on %s", selector);
- polling = true;
- try {
- if (workQueue.peek() != null) {
- selector.selectNow();
- } else {
- selector.select();
- }
- } finally {
- polling = false;
- }
+ selector.select();
} else {
final long millis = 1L + delayTime / 1000000L;
selectorLog.tracef("Beginning select on %s (with timeout)", selector);
- polling = true;
- try {
- if (workQueue.peek() != null) {
- selector.selectNow();
- } else {
- selector.select(millis);
- }
- } finally {
- polling = false;
- }
+ selector.select(millis);
}
} catch (CancelledKeyException ignored) {
// Mac and other buggy implementations sometimes spits these out
@@ -590,7 +570,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
synchronized (workLock) {
selectorWorkQueue.add(command);
}
- if (polling) { // flag is always false if we're the same thread
+ if(currentThread() != this) {
selector.wakeup();
}
}
@@ -625,7 +605,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
queue.add(key);
if (queue.iterator().next() == key) {
// we're the next one up; poke the selector to update its delay time
- if (polling) { // flag is always false if we're the same thread
+ if(currentThread() != this) {
selector.wakeup();
}
}
@@ -685,7 +665,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
try {
return channel.register(selector, 0);
} finally {
- if (polling) selector.wakeup();
+ selector.wakeup();
}
} else {
final SynchTask task = new SynchTask();
@@ -764,7 +744,7 @@ final class WorkerThread extends XnioIoThread implements XnioExecutor {
} else {
try {
key.interestOps(key.interestOps() | ops);
- if (polling) selector.wakeup();
+ selector.wakeup();
} catch (CancelledKeyException ignored) {
}
}
diff --git a/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java b/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java
index 19d4369..0ad5726 100644
--- a/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java
+++ b/nio-impl/src/test/java/org/xnio/nio/test/IllegalConnectionTestCase.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.xnio.OptionMap;
import org.xnio.Options;
@@ -63,7 +62,6 @@ public class IllegalConnectionTestCase {
}
@Test
- @Ignore
public void illegalAcceptThreads() throws IOException {
IllegalArgumentException expected;
diff --git a/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java b/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java
index 126f238..0ff0675 100644
--- a/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java
+++ b/nio-impl/src/test/java/org/xnio/nio/test/TcpChannelTestCase.java
@@ -30,7 +30,6 @@ import java.net.SocketAddress;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.xnio.ChannelListener;
import org.xnio.FutureResult;
@@ -81,8 +80,8 @@ public class TcpChannelTestCase extends AbstractNioStreamChannelTest {
}
final IoFuture<ConnectedStreamChannel> connectedStreamChannel = xnioWorker.connectStream(bindAddress, null, optionMap);
final FutureResult<ConnectedStreamChannel> accepted = new FutureResult<ConnectedStreamChannel>(xnioWorker);
- server.getAcceptSetter().set(new ChannelListener<AcceptingChannel<? extends ConnectedStreamChannel>>() {
- public void handleEvent(final AcceptingChannel<? extends ConnectedStreamChannel> channel) {
+ server.getAcceptThread().execute(new Runnable() {
+ public void run() {
try {
accepted.setResult(server.accept());
} catch (IOException e) {
@@ -90,7 +89,6 @@ public class TcpChannelTestCase extends AbstractNioStreamChannelTest {
}
}
});
- server.resumeAccepts();
serverChannel = accepted.getIoFuture().get();
channel = connectedStreamChannel.get();
assertNotNull(serverChannel);
@@ -100,7 +98,6 @@ public class TcpChannelTestCase extends AbstractNioStreamChannelTest {
}
@Test
- @Ignore("unreliable")
public void optionSetup() throws IOException {
initChannels();
final Option<?>[] unsupportedOptions = OptionHelper.getNotSupportedOptions(Options.CLOSE_ABORT,
diff --git a/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java b/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java
index 13c1387..b997afb 100644
--- a/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java
+++ b/nio-impl/src/test/java/org/xnio/nio/test/TcpConnectionTestCase.java
@@ -30,7 +30,6 @@ import java.net.SocketAddress;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.xnio.ChannelListener;
import org.xnio.FutureResult;
@@ -81,8 +80,8 @@ public class TcpConnectionTestCase extends AbstractStreamSinkSourceChannelTest<S
}
final IoFuture<StreamConnection> openedConnection = xnioWorker.openStreamConnection(bindAddress, null, optionMap);
final FutureResult<StreamConnection> accepted = new FutureResult<StreamConnection>(xnioWorker);
- server.getAcceptSetter().set(new ChannelListener<AcceptingChannel<? extends StreamConnection>>() {
- public void handleEvent(final AcceptingChannel<? extends StreamConnection> channel) {
+ server.getIoThread().execute(new Runnable() {
+ public void run() {
try {
accepted.setResult(server.accept());
} catch (IOException e) {
@@ -90,7 +89,6 @@ public class TcpConnectionTestCase extends AbstractStreamSinkSourceChannelTest<S
}
}
});
- server.resumeAccepts();
serverConnection = accepted.getIoFuture().get();
connection = openedConnection.get();
assertNotNull(serverConnection);
@@ -100,7 +98,6 @@ public class TcpConnectionTestCase extends AbstractStreamSinkSourceChannelTest<S
}
@Test
- @Ignore("unreliable")
public void optionSetup() throws IOException {
initChannels();
final Option<?>[] unsupportedOptions = OptionHelper.getNotSupportedOptions(Options.CLOSE_ABORT,
diff --git a/pom.xml b/pom.xml
index 5ca90db..09ee69f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,7 +32,7 @@
<artifactId>xnio-all</artifactId>
<packaging>pom</packaging>
<name>XNIO Parent POM</name>
- <version>3.3.3.Final</version>
+ <version>3.3.2.Final</version>
<description>The aggregator POM of the XNIO project</description>
<modules>
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-java/jboss-xnio.git
More information about the pkg-java-commits
mailing list