[Git][java-team/jboss-xnio][upstream] New upstream version 3.8.5
Markus Koschany (@apo)
gitlab at salsa.debian.org
Sun Nov 28 16:19:52 GMT 2021
Markus Koschany pushed to branch upstream at Debian Java Maintainers / jboss-xnio
Commits:
9fe261ca by Markus Koschany at 2021-11-28T17:13:33+01:00
New upstream version 3.8.5
- - - - -
22 changed files:
- api/pom.xml
- api/src/main/java/org/xnio/StreamConnection.java
- api/src/main/java/org/xnio/_private/Messages.java
- api/src/main/java/org/xnio/ssl/JsseSslConduitEngine.java
- api/src/main/java/org/xnio/ssl/JsseSslStreamConnection.java
- api/src/main/java/org/xnio/ssl/JsseSslStreamSourceConduit.java
- api/src/test/java/org/xnio/channels/AbstractBlockingReadableByteChannelTest.java
- api/src/test/java/org/xnio/channels/AbstractBlockingWritableByteChannelTest.java
- nio-impl/pom.xml
- nio-impl/src/main/java/org/xnio/nio/Log.java
- nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java
- nio-impl/src/test/java/org/xnio/nio/test/AbstractNioSslTcpTest.java
- nio-impl/src/test/java/org/xnio/nio/test/AbstractNioTcpTest.java
- nio-impl/src/test/java/org/xnio/nio/test/NioSslBufferExpansionTcpChannelTestCase.java
- nio-impl/src/test/java/org/xnio/nio/test/NioSslBufferExpansionTcpConnectionTestCase.java
- nio-impl/src/test/java/org/xnio/nio/test/NioSslRandomlyTimedBufferExpansionTcpChannelTestCase.java
- nio-impl/src/test/java/org/xnio/nio/test/NioSslRandomlyTimedBufferExpansionTcpConnectionTestCase.java
- nio-impl/src/test/java/org/xnio/nio/test/NioSslTcpChannelTestCase.java
- nio-impl/src/test/java/org/xnio/nio/test/NioSslTcpConnectionTestCase.java
- nio-impl/src/test/java/org/xnio/nio/test/NioStartTLSTcpConnectionTestCase.java
- nio-impl/src/test/java/org/xnio/nio/test/XnioWorkerTestCase.java
- pom.xml
Changes:
=====================================
api/pom.xml
=====================================
@@ -37,7 +37,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
- <version>3.8.4.Final</version>
+ <version>3.8.5.Final</version>
</parent>
<dependencies>
=====================================
api/src/main/java/org/xnio/StreamConnection.java
=====================================
@@ -18,6 +18,8 @@
package org.xnio;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.xnio.channels.CloseListenerSettable;
import org.xnio.conduits.ConduitStreamSinkChannel;
import org.xnio.conduits.ConduitStreamSourceChannel;
@@ -30,12 +32,18 @@ import static org.xnio._private.Messages.msg;
* A connection between peers.
*
* @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
+ * @author Flavia Rainone
*/
public abstract class StreamConnection extends Connection implements CloseListenerSettable<StreamConnection> {
+ /**
+ * An empty listener used as a flag, to indicate that close listener has been invoked.
+ */
+ private static final ChannelListener<? super StreamConnection> INVOKED_CLOSE_LISTENER_FLAG = (StreamConnection connection)->{};
+
private ConduitStreamSourceChannel sourceChannel;
private ConduitStreamSinkChannel sinkChannel;
- private ChannelListener<? super StreamConnection> closeListener;
+ private AtomicReference<ChannelListener<? super StreamConnection>> closeListener;
/**
* Construct a new instance.
@@ -44,18 +52,40 @@ public abstract class StreamConnection extends Connection implements CloseListen
*/
protected StreamConnection(final XnioIoThread thread) {
super(thread);
+ closeListener = new AtomicReference<>();
}
public void setCloseListener(final ChannelListener<? super StreamConnection> listener) {
- this.closeListener = listener;
+ ChannelListener<? super StreamConnection> currentListener;
+ ChannelListener<? super StreamConnection> newListener;
+ do {
+ newListener = listener;
+ currentListener = closeListener.get();
+ if (currentListener != null) {
+ // channel is closed, just invoke the new listener and do not update closeListener
+ if (currentListener == INVOKED_CLOSE_LISTENER_FLAG) {
+ ChannelListeners.invokeChannelListener(this, listener);
+ return;
+ } else {
+ newListener = mergeListeners(currentListener, listener);
+ }
+ }
+ } while (!closeListener.compareAndSet(currentListener, newListener));
+ }
+
+ private final ChannelListener<? super StreamConnection> mergeListeners(final ChannelListener<? super StreamConnection> listener1, final ChannelListener<? super StreamConnection> listener2) {
+ return (StreamConnection channel) -> {
+ listener1.handleEvent(channel);
+ listener2.handleEvent(channel);
+ };
}
public ChannelListener<? super StreamConnection> getCloseListener() {
- return closeListener;
+ return closeListener.get();
}
public ChannelListener.Setter<? extends StreamConnection> getCloseSetter() {
- return new Setter<StreamConnection>(this);
+ return new Setter<>(this);
}
/**
@@ -77,7 +107,9 @@ public abstract class StreamConnection extends Connection implements CloseListen
}
void invokeCloseListener() {
- ChannelListeners.invokeChannelListener(this, closeListener);
+ // use a flag to indicate that closeListener has been invoked
+ final ChannelListener<? super StreamConnection> listener = closeListener.getAndSet(INVOKED_CLOSE_LISTENER_FLAG);
+ ChannelListeners.invokeChannelListener(this, listener);
}
private static <T> T notNull(T orig) throws IllegalStateException {
=====================================
api/src/main/java/org/xnio/_private/Messages.java
=====================================
@@ -336,13 +336,13 @@ public interface Messages extends BasicLogger {
@LogMessage(level = ERROR)
void exceptionHandlerException(@Cause Throwable cause);
- @Message(id = 1009, value = "Failed to accept a connection on %s: %s")
+ @Message(id = 1009, value = "Failed to accept a connection on %s")
@LogMessage(level = ERROR)
- void acceptFailed(AcceptingChannel<? extends ConnectedChannel> channel, IOException reason);
+ void acceptFailed(AcceptingChannel<? extends ConnectedChannel> channel, @Cause IOException reason);
- @Message(id = 1010, value = "Failed to submit task to executor: %s (closing %s)")
+ @Message(id = 1010, value = "Failed to submit task to executor: (closing %s)")
@LogMessage(level = ERROR)
- void executorSubmitFailed(RejectedExecutionException cause, Channel channel);
+ void executorSubmitFailed(@Cause RejectedExecutionException cause, Channel channel);
// Trace
=====================================
api/src/main/java/org/xnio/ssl/JsseSslConduitEngine.java
=====================================
@@ -19,16 +19,6 @@
package org.xnio.ssl;
-import static java.lang.Thread.currentThread;
-import static java.util.concurrent.locks.LockSupport.park;
-import static java.util.concurrent.locks.LockSupport.parkNanos;
-import static java.util.concurrent.locks.LockSupport.unpark;
-import static org.xnio.Bits.allAreClear;
-import static org.xnio.Bits.allAreSet;
-import static org.xnio.Bits.anyAreSet;
-import static org.xnio.Bits.intBitMask;
-import static org.xnio._private.Messages.msg;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
@@ -36,6 +26,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.locks.LockSupport.park;
+import static java.util.concurrent.locks.LockSupport.parkNanos;
+import static java.util.concurrent.locks.LockSupport.unpark;
+
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
@@ -45,14 +40,18 @@ import javax.net.ssl.SSLSession;
import org.jboss.logging.Logger;
-import org.xnio.BufferAllocator;
import org.xnio.Buffers;
-import org.xnio.ByteBufferSlicePool;
import org.xnio.Pool;
import org.xnio.Pooled;
import org.xnio.conduits.StreamSinkConduit;
import org.xnio.conduits.StreamSourceConduit;
+import static org.xnio.Bits.allAreClear;
+import static org.xnio.Bits.allAreSet;
+import static org.xnio.Bits.anyAreSet;
+import static org.xnio.Bits.intBitMask;
+import static org.xnio._private.Messages.msg;
+
/**
* {@link SSLEngine} wrapper, used by Jsse SSL conduits.
*
@@ -472,7 +471,6 @@ final class JsseSslConduitEngine {
// and flush any wrapped data we may have left
if (doFlush()) {
if (result.getStatus() == SSLEngineResult.Status.CLOSED) {
- closeOutbound();
return false;
}
if (!handleWrapResult(result = engineWrap(Buffers.EMPTY_BYTE_BUFFER, buffer), true) || !doFlush()) {
@@ -780,9 +778,6 @@ final class JsseSslConduitEngine {
int oldState, newState;
oldState = stateUpdater.get(this);
if (allAreSet(oldState, WRITE_COMPLETE)) {
- if (engine.isOutboundDone()) {
- connection.writeClosed();
- }
return true;
}
synchronized (getWrapLock()) {
@@ -791,9 +786,6 @@ final class JsseSslConduitEngine {
return false;
}
} else {
- if (engine.isOutboundDone()) {
- connection.writeClosed();
- }
return true;
}
}
@@ -802,10 +794,7 @@ final class JsseSslConduitEngine {
while (! stateUpdater.compareAndSet(this, oldState, newState)) {
oldState = stateUpdater.get(this);
if (allAreSet(oldState, WRITE_COMPLETE)) {
- if (engine.isOutboundDone()) {
- connection.writeClosed();
- }
- return true;//sinkConduit.flush();
+ return true;
}
newState = oldState | WRITE_COMPLETE;
}
@@ -813,9 +802,6 @@ final class JsseSslConduitEngine {
if (allAreSet(oldState, READ_SHUT_DOWN)) {
closeEngine();
}
- if (engine.isOutboundDone()) {
- connection.writeClosed();
- }
return true;
}
@@ -891,6 +877,10 @@ final class JsseSslConduitEngine {
}
}
} finally {
+ sourceConduit.terminateReads();
+ sinkConduit.terminateWrites();
+ connection.readClosed();
+ connection.writeClosed();
readBuffer.free();
receiveBuffer.free();
sendBuffer.free();
@@ -903,16 +893,18 @@ final class JsseSslConduitEngine {
* @throws IOException if an IO exception occurs
*/
public void closeOutbound() throws IOException {
+ if (isOutboundClosed()) //idempotent
+ return;
int old = setFlags(WRITE_SHUT_DOWN);
+ boolean sentCloseMessage = true;
try {
if (allAreClear(old, WRITE_SHUT_DOWN)) {
engine.closeOutbound();
synchronized (getWrapLock()) {
- wrapCloseMessage();
- flush();
+ sentCloseMessage = wrapCloseMessage() && flush();
}
}
- if (!allAreClear(old, READ_SHUT_DOWN)) {
+ if (!allAreClear(old, READ_SHUT_DOWN) && sentCloseMessage) {
closeEngine();
}
} catch (Exception e) {
@@ -925,7 +917,7 @@ final class JsseSslConduitEngine {
if(e instanceof IOException) {
throw (IOException) e;
} else {
- throw (RuntimeException)e;
+ throw (RuntimeException) e;
}
}
}
@@ -937,23 +929,18 @@ final class JsseSslConduitEngine {
* @throws IOException
*/
void close() throws IOException {
- if (isFirstHandshake()) {
- setFlags(WRITE_SHUT_DOWN|WRITE_COMPLETE|READ_SHUT_DOWN);
- closeEngine();
- } else {
+ try {
+ closeInbound();
+ } catch (Throwable t) {
try {
- closeInbound();
- } catch (Throwable t) {
- try {
- closeOutbound();
- } catch (Throwable t2) {
- t2.addSuppressed(t);
- throw t2;
- }
- throw t;
+ closeOutbound();
+ } catch (Throwable t2) {
+ t2.addSuppressed(t);
+ throw t2;
}
- closeOutbound();
+ throw t;
}
+ closeOutbound();
}
/**
@@ -1038,19 +1025,15 @@ final class JsseSslConduitEngine {
* @throws IOException if an IO exception occurs
*/
public void closeInbound() throws IOException {
- connection.readClosed();
int old = setFlags(READ_SHUT_DOWN);
+ boolean sentCloseMessage = true;
try {
- if (allAreClear(old, READ_SHUT_DOWN)) {
- sourceConduit.terminateReads();
- }
if (allAreSet(old, WRITE_SHUT_DOWN) && !allAreSet(old, WRITE_COMPLETE)) {
synchronized (getWrapLock()) {
- wrapCloseMessage();
- flush();
+ sentCloseMessage = wrapCloseMessage() && flush();
}
}
- if (allAreSet(old, WRITE_COMPLETE)) {
+ if (allAreSet(old, WRITE_COMPLETE) && sentCloseMessage) {
closeEngine();
}
} catch (Exception e) {
=====================================
api/src/main/java/org/xnio/ssl/JsseSslStreamConnection.java
=====================================
@@ -17,18 +17,18 @@
*/
package org.xnio.ssl;
-import static org.xnio.IoUtils.safeClose;
-
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
+import org.xnio.Connection;
import org.xnio.Option;
import org.xnio.Options;
import org.xnio.Pool;
@@ -37,6 +37,9 @@ import org.xnio.StreamConnection;
import org.xnio.conduits.StreamSinkConduit;
import org.xnio.conduits.StreamSourceConduit;
+import static org.xnio.Bits.allAreSet;
+import static org.xnio.IoUtils.safeClose;
+
/**
* StreamConnection with SSL support.
*
@@ -62,6 +65,16 @@ public final class JsseSslStreamConnection extends SslConnection {
*/
private final ChannelListener.SimpleSetter<SslConnection> handshakeSetter = new ChannelListener.SimpleSetter<SslConnection>();
+ @SuppressWarnings("unused")
+ private volatile int state;
+
+ private static final int FLAG_READ_CLOSE_REQUESTED = 0b0001;
+ private static final int FLAG_WRITE_CLOSE_REQUESTED = 0b0010;
+ private static final int FLAG_READ_CLOSED = 0b0100;
+ private static final int FLAG_WRITE_CLOSED = 0b1000;
+
+ private static final AtomicIntegerFieldUpdater<JsseSslStreamConnection> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(JsseSslStreamConnection.class, "state");
+
public JsseSslStreamConnection(StreamConnection connection, SSLEngine sslEngine, final boolean startTls) {
this(connection, sslEngine, JsseXnioSsl.bufferPool, JsseXnioSsl.bufferPool, startTls);
}
@@ -75,6 +88,8 @@ public final class JsseSslStreamConnection extends SslConnection {
tls = ! startTls;
setSinkConduit(new JsseSslStreamSinkConduit(sinkConduit, sslConduitEngine, tls));
setSourceConduit(new JsseSslStreamSourceConduit(sourceConduit, sslConduitEngine, tls));
+ getSourceChannel().setCloseListener(channel -> readClosed());
+ getSinkChannel().setCloseListener(channel -> writeClosed());
}
/** {@inheritDoc} */
@@ -104,28 +119,33 @@ public final class JsseSslStreamConnection extends SslConnection {
/** {@inheritDoc} */
@Override
protected void closeAction() throws IOException {
- if (tls) {
- try {
- getSinkChannel().getConduit().truncateWrites();
- } catch (IOException e) {
+ // when invoked from the outside world, like an attempt to close the connection
+ // we need to first invoke close on the engine
+ // when the engine is closed, it will invoke closeAction again, then we enter else block below
+ if (!sslConduitEngine.isClosed())
+ sslConduitEngine.close();
+ else {
+ if (tls) {
+ try {
+ getSinkChannel().getConduit().terminateWrites();
+ } catch (IOException e) {
+ try {
+ getSourceChannel().getConduit().terminateReads();
+ } catch (IOException ignored) {
+ }
+ safeClose(connection);
+ throw e;
+ }
try {
getSourceChannel().getConduit().terminateReads();
- } catch (IOException ignored) {
+ } catch (IOException e) {
+ safeClose(connection);
+ throw e;
}
- safeClose(connection);
- throw e;
+ super.closeAction();
}
- try {
- getSourceChannel().getConduit().terminateReads();
- } catch (IOException e) {
- safeClose(connection);
- throw e;
- }
- } else {
- //still need to close the engine, as it is allocated eagerly
- sslConduitEngine.close();
+ connection.close();
}
- connection.close();
}
/** {@inheritDoc} */
@@ -189,11 +209,64 @@ public final class JsseSslStreamConnection extends SslConnection {
}
protected boolean readClosed() {
- return super.readClosed();
+ final boolean closeRequestedNow;
+ synchronized(this) {
+ int oldVal, newVal;
+ do {
+ oldVal = state;
+ if (allAreSet(oldVal, FLAG_READ_CLOSE_REQUESTED)) {
+ break;
+ }
+ newVal = oldVal | FLAG_READ_CLOSE_REQUESTED;
+ } while (! stateUpdater.compareAndSet(this, oldVal, newVal));
+ closeRequestedNow = allAreSet(oldVal, FLAG_READ_CLOSE_REQUESTED);
+ if (sslConduitEngine.isClosed() || !tls/* || sslConduitEngine.isFirstHandshake()*/) {
+ do {
+ oldVal = state;
+ if (allAreSet(oldVal, FLAG_READ_CLOSED)) {
+ return false;
+ }
+ newVal = oldVal | FLAG_READ_CLOSED;
+ } while (! stateUpdater.compareAndSet(this, oldVal, newVal));
+ if (allAreSet(oldVal, FLAG_READ_CLOSED)) {
+ return false;
+ }
+ } else return closeRequestedNow;
+ }
+ // invoke super.readClosed when we know that engine is closed, with previous state check we make sure we don't invoke this twice
+ super.readClosed();
+ return closeRequestedNow;
}
protected boolean writeClosed() {
- return super.writeClosed();
+ final boolean closeRequestedNow;
+ synchronized(this) {
+ int oldVal, newVal;
+ do {
+ oldVal = state;
+ if (allAreSet(oldVal, FLAG_WRITE_CLOSE_REQUESTED)) {
+ break;
+ }
+ newVal = oldVal | FLAG_WRITE_CLOSE_REQUESTED;
+ } while (! stateUpdater.compareAndSet(this, oldVal, newVal));
+ closeRequestedNow = allAreSet(oldVal, FLAG_WRITE_CLOSE_REQUESTED);
+ if (sslConduitEngine.isClosed() || !tls/* || sslConduitEngine.isFirstHandshake()*/) {
+ do {
+ oldVal = state;
+ if (allAreSet(oldVal, FLAG_WRITE_CLOSED)) {
+ return false;
+ }
+ newVal = oldVal | FLAG_WRITE_CLOSED;
+ } while (! stateUpdater.compareAndSet(this, oldVal, newVal));
+ if (allAreSet(oldVal, FLAG_WRITE_CLOSED)) {
+ return false;
+ }
+ } else return closeRequestedNow;
+ }
+ // invoke super.writeClosed when we know that engine is closed, with previous state check we make sure we don't invoke this twice,
+ // and we also get the appropriate return value, that is independent if we internally invoke writeClosed or not
+ super.writeClosed();
+ return closeRequestedNow;
}
/**
@@ -206,4 +279,18 @@ public final class JsseSslStreamConnection extends SslConnection {
}
ChannelListeners.<SslConnection>invokeChannelListener(this, listener);
}
+
+ @Override
+ public boolean isReadShutdown() {
+ return allAreSet(state, FLAG_READ_CLOSE_REQUESTED);
+ }
+
+ /**
+ * Determine whether writes have been shut down on this connection.
+ *
+ * @return {@code true} if writes were shut down
+ */
+ public boolean isWriteShutdown() {
+ return allAreSet(state, FLAG_WRITE_CLOSE_REQUESTED);
+ }
}
=====================================
api/src/main/java/org/xnio/ssl/JsseSslStreamSourceConduit.java
=====================================
@@ -79,8 +79,17 @@ final class JsseSslStreamSourceConduit extends AbstractStreamSourceConduit<Strea
if ((!sslEngine.isDataAvailable() && sslEngine.isInboundClosed()) || sslEngine.isClosed()) {
return -1;
}
+ final boolean attemptToUnwrapFirst;
+ synchronized(sslEngine.getUnwrapLock()) {
+ attemptToUnwrapFirst = sslEngine.getUnwrapBuffer().remaining() > 0;
+ }
+ if (attemptToUnwrapFirst) {
+ final int unwrapResult = sslEngine.unwrap(dst);
+ if (unwrapResult > 0) {
+ return unwrapResult;
+ }
+ }
final int readResult;
- final int unwrapResult;
synchronized(sslEngine.getUnwrapLock()) {
final ByteBuffer unwrapBuffer = sslEngine.getUnwrapBuffer().compact();
try {
@@ -89,7 +98,7 @@ final class JsseSslStreamSourceConduit extends AbstractStreamSourceConduit<Strea
unwrapBuffer.flip();
}
}
- unwrapResult = sslEngine.unwrap(dst);
+ final int unwrapResult = sslEngine.unwrap(dst);
if (unwrapResult == 0 && readResult == -1) {
terminateReads();
return -1;
@@ -142,20 +151,21 @@ final class JsseSslStreamSourceConduit extends AbstractStreamSourceConduit<Strea
@Override
public void terminateReads() throws IOException {
- if (tls) {
+ if (!tls) {
+ super.terminateReads();
+ return;
+ }
+ try {
+ sslEngine.closeInbound();
+ } catch (IOException ex) {
try {
- sslEngine.closeInbound();
- } catch (IOException ex) {
- try {
- super.terminateReads();
- } catch (IOException e2) {
- e2.addSuppressed(ex);
- throw e2;
- }
- throw ex;
+ super.terminateReads();
+ } catch (IOException e2) {
+ e2.addSuppressed(ex);
+ throw e2;
}
+ throw ex;
}
- super.terminateReads();
}
@Override
=====================================
api/src/test/java/org/xnio/channels/AbstractBlockingReadableByteChannelTest.java
=====================================
@@ -278,7 +278,7 @@ public abstract class AbstractBlockingReadableByteChannelTest<T extends Scatteri
@Test
public void readBlocksUntilTimeoutWithBytearray5() throws Exception {
final T blockingChannel = createBlockingReadableByteChannel(channelMock);
- setReadTimeout(blockingChannel, 30000, TimeUnit.MICROSECONDS);
+ setReadTimeout(blockingChannel, 300000, TimeUnit.MICROSECONDS);
final ReadToBufferArray readRunnable = new ReadToBufferArray(blockingChannel);
final Thread readThread = new Thread(readRunnable);
readThread.start();
=====================================
api/src/test/java/org/xnio/channels/AbstractBlockingWritableByteChannelTest.java
=====================================
@@ -127,7 +127,7 @@ public abstract class AbstractBlockingWritableByteChannelTest<T extends Gatherin
@Test
public void writeBlocksUntilTimeout3() throws Exception {
- final T blockingChannel = createBlockingWritableByteChannel(channelMock, 0, TimeUnit.NANOSECONDS, 30000000, TimeUnit.NANOSECONDS);
+ final T blockingChannel = createBlockingWritableByteChannel(channelMock, 0, TimeUnit.NANOSECONDS, 300000000, TimeUnit.NANOSECONDS);
final Write writeRunnable = new Write(blockingChannel, "write... this");
final Thread writeThread = new Thread(writeRunnable);
writeThread.start();
@@ -305,7 +305,7 @@ public abstract class AbstractBlockingWritableByteChannelTest<T extends Gatherin
@Test
public void writeBufferArrayBlocksUntilTimeout5() throws Exception {
final T blockingChannel = createBlockingWritableByteChannel(channelMock);
- setWriteTimeout(blockingChannel, 2, TimeUnit.MICROSECONDS);
+ setWriteTimeout(blockingChannel, 20000, TimeUnit.MICROSECONDS);
final WriteBufferArray writeRunnable = new WriteBufferArray(blockingChannel, "2", "microseconds");
final Thread writeThread = new Thread(writeRunnable);
writeThread.start();
=====================================
nio-impl/pom.xml
=====================================
@@ -31,7 +31,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
- <version>3.8.4.Final</version>
+ <version>3.8.5.Final</version>
</parent>
<properties>
=====================================
nio-impl/src/main/java/org/xnio/nio/Log.java
=====================================
@@ -81,8 +81,8 @@ interface Log extends BasicLogger {
InterruptedIOException interruptedIO(@Field int bytesTransferred);
- @Message(id = 815, value = "Worker is shut down")
- ClosedWorkerException workerShutDown();
+ @Message(id = 815, value = "Worker is shut down: %s")
+ ClosedWorkerException workerShutDown(NioXnioWorker worker);
// Unsupported implementation operations - cross-check with xnio-api
=====================================
nio-impl/src/main/java/org/xnio/nio/NioXnioWorker.java
=====================================
@@ -244,7 +244,7 @@ final class NioXnioWorker extends XnioWorker {
void checkShutdown() throws ClosedWorkerException {
if (isShutdown())
- throw log.workerShutDown();
+ throw log.workerShutDown(this);
}
void closeResource() {
=====================================
nio-impl/src/test/java/org/xnio/nio/test/AbstractNioSslTcpTest.java
=====================================
@@ -17,9 +17,6 @@
*/
package org.xnio.nio.test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
@@ -28,12 +25,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Assert;
import org.junit.Test;
import org.xnio.ChannelListener;
import org.xnio.channels.ConnectedChannel;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
/**
* Superclass for all ssl tcp test cases.
*
@@ -165,6 +166,7 @@ public abstract class AbstractNioSslTcpTest<T extends ConnectedChannel, R extend
log.info("client closing connection");
clientOK.set(true);
channel.close();
+ //sourceChannel.shutdownReads();
return;
}
return;
@@ -204,6 +206,7 @@ public abstract class AbstractNioSslTcpTest<T extends ConnectedChannel, R extend
if (c == -1) {
log.info("server closing connection");
channel.close();
+ //sourceChannel.shutdownReads();
serverOK.set(true);
}
} catch (Throwable t) {
@@ -411,4 +414,20 @@ public abstract class AbstractNioSslTcpTest<T extends ConnectedChannel, R extend
assertEquals(clientSent.get(), serverReceived.get());
}
+
+ @Test
+ public void closeConnectionWithoutHandshake() throws Exception {
+ log.info("Test: closeConnectionWithoutHandshake");
+ final CountDownLatch latch = new CountDownLatch(2);
+ doConnectionTest(() -> log.info("Do nothing, go ahead and close it"), channel -> {
+ channel.getCloseSetter().set((ChannelListener<ConnectedChannel>) channel1 -> latch.countDown());
+ setReadListener(channel, sourceChannel -> log.info("client read event"));
+ setWriteListener(channel, sinkChannel -> log.info("client write event"));
+ }, channel -> {
+ channel.getCloseSetter().set((ChannelListener<ConnectedChannel>) channel12 -> latch.countDown());
+ setReadListener(channel, sourceChannel -> log.info("server read event"));
+ setWriteListener(channel, sinkChannel -> log.info("server write event"));
+ });
+ Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
+ }
}
=====================================
nio-impl/src/test/java/org/xnio/nio/test/AbstractNioTcpTest.java
=====================================
@@ -17,16 +17,10 @@
*/
package org.xnio.nio.test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.EOFException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -36,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.logging.Logger;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -53,6 +48,9 @@ import org.xnio.channels.ConnectedChannel;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
/**
* Abstract test for TCP connected channels.
*
@@ -176,6 +174,36 @@ public abstract class AbstractNioTcpTest<T extends ConnectedChannel, R extends S
}, null, ChannelListeners.closingChannelListener());
}
+ @Test
+ public void closeWorker() throws Exception {
+ log.info("Test: closeWorker");
+ final Xnio xnio = Xnio.getInstance("nio", AbstractNioTcpTest.class.getClassLoader());
+ final XnioWorker worker;
+ if (threads == 1) {
+ worker = xnio.createWorker(OptionMap.create(Options.READ_TIMEOUT, 10000, Options.WRITE_TIMEOUT, 10000));
+ } else {
+ worker = xnio.createWorker(OptionMap.create(Options.WORKER_IO_THREADS, threads));
+ }
+ CountDownLatch latch = new CountDownLatch(2);
+ ChannelListener<? super T> handler = connection -> {
+ connection.getCloseSetter().set((ChannelListener) connection1 -> latch.countDown());
+ };
+ try {
+ final AcceptingChannel<? extends T> server = startServer(worker, handler);
+ try {
+ final IoFuture<? extends T> ioFuture = connect(worker,
+ new InetSocketAddress(Inet4Address.getByAddress(new byte[] { 127, 0, 0, 1 }), SERVER_PORT),
+ new CatchingChannelListener<T>(handler, problems), null, clientOptionMap);
+ ioFuture.get();
+ } finally {
+ IoUtils.safeClose(server);
+ }
+ } finally {
+ worker.shutdown();
+ worker.awaitTermination(1L, TimeUnit.MINUTES);
+ }
+ }
+
@Test
public void clientClose() throws Exception {
log.info("Test: clientClose");
=====================================
nio-impl/src/test/java/org/xnio/nio/test/NioSslBufferExpansionTcpChannelTestCase.java
=====================================
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.xnio.ChannelListener;
@@ -69,6 +70,11 @@ public class NioSslBufferExpansionTcpChannelTestCase extends
}
}
+ @Before
+ public void initXnioSsl() throws Exception {
+ xnioSsl = Xnio.getInstance("nio", NioSslBufferExpansionTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
+ }
+
@SuppressWarnings("deprecation")
@Override
protected AcceptingChannel<? extends ConnectedSslStreamChannel> createServer(XnioWorker worker, InetSocketAddress address,
@@ -114,9 +120,6 @@ public class NioSslBufferExpansionTcpChannelTestCase extends
channel.shutdownWrites();
}
- @Override
- protected void doConnectionTest(final Runnable body, final ChannelListener<? super ConnectedSslStreamChannel> clientHandler, final ChannelListener<? super ConnectedSslStreamChannel> serverHandler) throws Exception {
- xnioSsl = Xnio.getInstance("nio", NioSslBufferExpansionTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
- super.doConnectionTest(body, clientHandler, serverHandler);
- }
+ @Override @Ignore
+ public void serverClose() {}
}
=====================================
nio-impl/src/test/java/org/xnio/nio/test/NioSslBufferExpansionTcpConnectionTestCase.java
=====================================
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
+import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.xnio.ChannelListener;
import org.xnio.IoFuture;
import org.xnio.OptionMap;
@@ -68,6 +70,11 @@ public class NioSslBufferExpansionTcpConnectionTestCase extends
}
}
+ @Before
+ public void initXnioSsl() throws Exception {
+ xnioSsl = Xnio.getInstance("nio", NioSslTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
+ }
+
@Override
protected AcceptingChannel<? extends SslConnection> createServer(XnioWorker worker, InetSocketAddress address,
ChannelListener<AcceptingChannel<SslConnection>> openListener, OptionMap optionMap) throws IOException {
@@ -108,12 +115,10 @@ public class NioSslBufferExpansionTcpConnectionTestCase extends
@Override
protected void shutdownWrites(SslConnection connection) throws IOException {
+ log.info("server shutting down writes on " + connection.getSinkChannel());
connection.getSinkChannel().shutdownWrites();
}
- @Override
- protected void doConnectionTest(final Runnable body, final ChannelListener<? super SslConnection> clientHandler, final ChannelListener<? super SslConnection> serverHandler) throws Exception {
- xnioSsl = Xnio.getInstance("nio", NioSslTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
- super.doConnectionTest(body, clientHandler, serverHandler);
- }
+ @Override @Ignore
+ public void serverClose() {}
}
=====================================
nio-impl/src/test/java/org/xnio/nio/test/NioSslRandomlyTimedBufferExpansionTcpChannelTestCase.java
=====================================
@@ -22,8 +22,11 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
+import java.security.GeneralSecurityException;
+import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.xnio.ChannelListener;
import org.xnio.IoFuture;
import org.xnio.OptionMap;
@@ -51,6 +54,11 @@ public class NioSslRandomlyTimedBufferExpansionTcpChannelTestCase extends Abstra
private static final String DEFAULT_KEY_STORE = "keystore.jks";
private static final String DEFAULT_KEY_STORE_PASSWORD = "jboss-remoting-test";
+ @Before
+ public void initXnioSsl() throws GeneralSecurityException {
+ xnioSsl = Xnio.getInstance("nio", NioSslTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
+ }
+
@Override
protected AcceptingChannel<? extends ConnectedSslStreamChannel> startServer(XnioWorker worker, final ChannelListener<? super ConnectedSslStreamChannel> serverHandler) throws
IOException {
@@ -133,9 +141,6 @@ public class NioSslRandomlyTimedBufferExpansionTcpChannelTestCase extends Abstra
channel.shutdownWrites();
}
- @Override
- protected void doConnectionTest(final Runnable body, final ChannelListener<? super ConnectedSslStreamChannel> clientHandler, final ChannelListener<? super ConnectedSslStreamChannel> serverHandler) throws Exception {
- xnioSsl = Xnio.getInstance("nio", NioSslTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
- super.doConnectionTest(body, clientHandler, serverHandler);
- }
+ @Override @Ignore
+ public void serverClose() {}
}
=====================================
nio-impl/src/test/java/org/xnio/nio/test/NioSslRandomlyTimedBufferExpansionTcpConnectionTestCase.java
=====================================
@@ -22,8 +22,11 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
+import java.security.GeneralSecurityException;
+import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.xnio.ChannelListener;
import org.xnio.IoFuture;
import org.xnio.OptionMap;
@@ -53,6 +56,11 @@ public class NioSslRandomlyTimedBufferExpansionTcpConnectionTestCase extends Abs
private static final String DEFAULT_KEY_STORE = "keystore.jks";
private static final String DEFAULT_KEY_STORE_PASSWORD = "jboss-remoting-test";
+ @Before
+ public void initXnioSsl() throws GeneralSecurityException {
+ xnioSsl = Xnio.getInstance("nio", NioSslTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
+ }
+
@Override
protected AcceptingChannel<? extends SslConnection> startServer(XnioWorker worker, final ChannelListener<? super SslConnection> serverHandler) throws
IOException {
@@ -133,9 +141,6 @@ public class NioSslRandomlyTimedBufferExpansionTcpConnectionTestCase extends Abs
connection.getSinkChannel().shutdownWrites();
}
- @Override
- protected void doConnectionTest(final Runnable body, final ChannelListener<? super SslConnection> clientHandler, final ChannelListener<? super SslConnection> serverHandler) throws Exception {
- xnioSsl = Xnio.getInstance("nio", NioSslTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
- super.doConnectionTest(body, clientHandler, serverHandler);
- }
+ @Override @Ignore
+ public void serverClose() {}
}
=====================================
nio-impl/src/test/java/org/xnio/nio/test/NioSslTcpChannelTestCase.java
=====================================
@@ -21,7 +21,9 @@ package org.xnio.nio.test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.security.GeneralSecurityException;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.xnio.ChannelListener;
import org.xnio.IoFuture;
@@ -66,6 +68,11 @@ public class NioSslTcpChannelTestCase extends AbstractNioSslTcpTest<ConnectedSsl
}
}
+ @Before
+ public void initXnioSsl() throws GeneralSecurityException {
+ xnioSsl = Xnio.getInstance("nio", NioSslTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
+ }
+
@SuppressWarnings("deprecation")
@Override
protected AcceptingChannel<? extends ConnectedSslStreamChannel> createServer(XnioWorker worker, InetSocketAddress address,
@@ -110,10 +117,4 @@ public class NioSslTcpChannelTestCase extends AbstractNioSslTcpTest<ConnectedSsl
protected void shutdownWrites(ConnectedSslStreamChannel channel) throws IOException {
channel.shutdownWrites();
}
-
- @Override
- protected void doConnectionTest(final Runnable body, final ChannelListener<? super ConnectedSslStreamChannel> clientHandler, final ChannelListener<? super ConnectedSslStreamChannel> serverHandler) throws Exception {
- xnioSsl = Xnio.getInstance("nio", NioSslTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
- super.doConnectionTest(body, clientHandler, serverHandler);
- }
}
=====================================
nio-impl/src/test/java/org/xnio/nio/test/NioSslTcpConnectionTestCase.java
=====================================
@@ -20,7 +20,9 @@ package org.xnio.nio.test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.security.GeneralSecurityException;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.xnio.ChannelListener;
import org.xnio.IoFuture;
@@ -66,6 +68,11 @@ public class NioSslTcpConnectionTestCase extends AbstractNioSslTcpTest<SslConnec
}
}
+ @Before
+ public void initXnioSsl() throws GeneralSecurityException {
+ xnioSsl = Xnio.getInstance("nio", NioSslTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
+ }
+
@Override
protected AcceptingChannel<? extends SslConnection> createServer(XnioWorker worker, InetSocketAddress address,
ChannelListener<AcceptingChannel<SslConnection>> openListener, OptionMap optionMap) throws IOException {
@@ -108,10 +115,4 @@ public class NioSslTcpConnectionTestCase extends AbstractNioSslTcpTest<SslConnec
protected void shutdownWrites(SslConnection connection) throws IOException {
connection.getSinkChannel().shutdownWrites();
}
-
- @Override
- protected void doConnectionTest(final Runnable body, final ChannelListener<? super SslConnection> clientHandler, final ChannelListener<? super SslConnection> serverHandler) throws Exception {
- xnioSsl = Xnio.getInstance("nio", NioSslTcpChannelTestCase.class.getClassLoader()).getSslProvider(OptionMap.EMPTY);
- super.doConnectionTest(body, clientHandler, serverHandler);
- }
}
=====================================
nio-impl/src/test/java/org/xnio/nio/test/NioStartTLSTcpConnectionTestCase.java
=====================================
@@ -58,7 +58,7 @@ public class NioStartTLSTcpConnectionTestCase extends NioSslTcpConnectionTestCas
@Test
public void oneWayTransfer3() throws Exception {
- log.info("Test: oneWayTransfer");
+ log.info("Test: oneWayTransfer3");
final CountDownLatch latch = new CountDownLatch(2);
final AtomicInteger clientSent = new AtomicInteger(0);
final AtomicInteger serverReceived = new AtomicInteger(0);
@@ -377,6 +377,7 @@ public class NioStartTLSTcpConnectionTestCase extends NioSslTcpConnectionTestCas
}
if (c == -1) {
log.info("client shutdown reads");
+ //sourceChannel.shutdownReads();
connection.close();
}
} catch (Throwable t) {
@@ -419,7 +420,7 @@ public class NioStartTLSTcpConnectionTestCase extends NioSslTcpConnectionTestCas
try {
if (sinkChannel.flush()) {
try {
- log.info("client closing channel");
+ log.info("client shutdown writes on " + sinkChannel);
sinkChannel.shutdownWrites();
} catch (Throwable t) {
t.printStackTrace();
@@ -475,6 +476,7 @@ public class NioStartTLSTcpConnectionTestCase extends NioSslTcpConnectionTestCas
}
if (c == -1) {
log.info("server shutdown reads");
+ //sourceChannel.shutdownReads();
connection.close();
}
} catch (Throwable t) {
@@ -497,7 +499,7 @@ public class NioStartTLSTcpConnectionTestCase extends NioSslTcpConnectionTestCas
}
return false;
}
- if (clientHandshakeStarted.get()) {
+ if (serverHandshakeStarted.get()) {
return true;
}
return false;
@@ -519,7 +521,7 @@ public class NioStartTLSTcpConnectionTestCase extends NioSslTcpConnectionTestCas
try {
if (sinkChannel.flush()) {
try {
- log.info("server closing channel");
+ log.info("server shutdown writes");
sinkChannel.shutdownWrites();
} catch (Throwable t) {
t.printStackTrace();
=====================================
nio-impl/src/test/java/org/xnio/nio/test/XnioWorkerTestCase.java
=====================================
@@ -436,14 +436,24 @@ public class XnioWorkerTestCase {
final XnioWorker xnioWorker = xnio.createWorker(OptionMap.EMPTY);
final TestChannelListener<StreamConnection> channelListener = new TestChannelListener<StreamConnection>();
final TestChannelListener<BoundChannel> bindListener = new TestChannelListener<BoundChannel>();
- final IoFuture<StreamConnection> connectionFuture1 = xnioWorker.acceptStreamConnection(bindAddress, channelListener, bindListener, OptionMap.EMPTY);
- final IoFuture<StreamConnection> connection2 = xnioWorker.openStreamConnection(bindAddress, null, OptionMap.EMPTY);
+ IoFuture<StreamConnection> connectionFuture1 = xnioWorker.acceptStreamConnection(bindAddress, channelListener, bindListener, OptionMap.EMPTY);
+ IoFuture<StreamConnection> connection2 = xnioWorker.openStreamConnection(bindAddress, null, OptionMap.EMPTY);
assertNotNull(connectionFuture1);
assertNotNull(connection2);
connectionFuture1.cancel();
+ // we were beaten, couldn't cancel the connection future
+ while (connectionFuture1.getStatus() != IoFuture.Status.CANCELLED) {
+ StreamConnection connection1 = connectionFuture1.get();
+ assertNotNull(connection1);
+ connection2.get().close();
+ // try again once more
+ connectionFuture1 = xnioWorker.acceptStreamConnection(bindAddress, channelListener, bindListener, OptionMap.EMPTY);
+ connection2 = xnioWorker.openStreamConnection(bindAddress, null, OptionMap.EMPTY);
+ }
+
CancellationException expected = null;
try {
connectionFuture1.get();
@@ -466,14 +476,25 @@ public class XnioWorkerTestCase {
final XnioWorker xnioWorker = xnio.createWorker(OptionMap.EMPTY);
final TestChannelListener<ConnectedStreamChannel> channelListener = new TestChannelListener<ConnectedStreamChannel>();
final TestChannelListener<BoundChannel> bindListener = new TestChannelListener<BoundChannel>();
- final IoFuture<ConnectedStreamChannel> channelFuture = xnioWorker.acceptStream(bindAddress, channelListener, bindListener, OptionMap.EMPTY);
- final IoFuture<ConnectedStreamChannel> connectedStreamChannel = xnioWorker.connectStream(bindAddress, null, OptionMap.EMPTY);
+ IoFuture<ConnectedStreamChannel> channelFuture = xnioWorker.acceptStream(bindAddress, channelListener, bindListener, OptionMap.EMPTY);
+ IoFuture<ConnectedStreamChannel> connectedStreamChannel = xnioWorker.connectStream(bindAddress, null, OptionMap.EMPTY);
assertNotNull(connectedStreamChannel);
assertNotNull(channelFuture);
channelFuture.cancel();
+ // we were beaten, couldn't cancel the channel future
+ while (channelFuture.getStatus() != IoFuture.Status.CANCELLED) {
+ ConnectedStreamChannel channel = channelFuture.get();
+ assertNotNull(channel);
+ connectedStreamChannel.get().close();
+ channel.close();
+ // try again once more
+ channelFuture = xnioWorker.acceptStream(bindAddress, channelListener, bindListener, OptionMap.EMPTY).cancel();
+ connectedStreamChannel = xnioWorker.connectStream(bindAddress, null, OptionMap.EMPTY);
+ }
+
CancellationException expected = null;
try {
channelFuture.get();
=====================================
pom.xml
=====================================
@@ -32,7 +32,7 @@
<artifactId>xnio-all</artifactId>
<packaging>pom</packaging>
<name>XNIO Parent POM</name>
- <version>3.8.4.Final</version>
+ <version>3.8.5.Final</version>
<description>The aggregator POM of the XNIO project</description>
<licenses>
View it on GitLab: https://salsa.debian.org/java-team/jboss-xnio/-/commit/9fe261cad2ac4e4b7d77820e86050b13f12e82b1
--
View it on GitLab: https://salsa.debian.org/java-team/jboss-xnio/-/commit/9fe261cad2ac4e4b7d77820e86050b13f12e82b1
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20211128/b3e0c5b2/attachment.htm>
More information about the pkg-java-commits
mailing list