[Git][java-team/jboss-xnio][upstream] New upstream version 3.8.7
Markus Koschany (@apo)
gitlab at salsa.debian.org
Wed May 4 12:17:35 BST 2022
Markus Koschany pushed to branch upstream at Debian Java Maintainers / jboss-xnio
Commits:
42d094b0 by Markus Koschany at 2022-05-04T13:11:33+02:00
New upstream version 3.8.7
- - - - -
13 changed files:
- api/pom.xml
- api/src/main/java/org/xnio/XnioWorker.java
- api/src/main/java/org/xnio/channels/Channels.java
- api/src/main/java/org/xnio/conduits/ConduitStreamSinkChannel.java
- api/src/main/java/org/xnio/conduits/Conduits.java
- api/src/main/java/org/xnio/ssl/JsseSslConduitEngine.java
- api/src/test/java/org/xnio/channels/ChannelsTestCase.java
- + api/src/test/java/org/xnio/conduits/ConduitsTestCase.java
- api/src/test/java/org/xnio/mock/ConnectedStreamChannelMock.java
- + api/src/test/java/org/xnio/mock/MessageConduitMock.java
- + api/src/test/java/org/xnio/mock/ReadableByteChannelMock.java
- nio-impl/pom.xml
- pom.xml
Changes:
=====================================
api/pom.xml
=====================================
@@ -37,7 +37,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
- <version>3.8.6.Final</version>
+ <version>3.8.7.Final</version>
</parent>
<dependencies>
=====================================
api/src/main/java/org/xnio/XnioWorker.java
=====================================
@@ -1188,6 +1188,8 @@ public abstract class XnioWorker extends AbstractExecutorService implements Conf
}
public XnioWorker build() {
+ log.debugf("Creating worker:%s, pool size:%s, max pool size:%s, keep alive:%s, io threads:%s, stack size:%s",
+ workerName, coreWorkerPoolSize, maxWorkerPoolSize, workerKeepAlive, workerIoThreads, workerStackSize);
return xnio.build(this);
}
}
=====================================
api/src/main/java/org/xnio/channels/Channels.java
=====================================
@@ -910,7 +910,7 @@ public final class Channels {
if (count == 0L) return total;
if (NULL_FILE_CHANNEL != null) {
while (count > 0) {
- if ((lres = channel.transferTo(0, count, NULL_FILE_CHANNEL)) == 0L) {
+ if ((lres = channel.transferTo(0, count, NULL_FILE_CHANNEL)) <= 0L) {
break;
}
total += lres;
@@ -950,7 +950,7 @@ public final class Channels {
if (count == 0L) return total;
if (NULL_FILE_CHANNEL != null) {
while (count > 0) {
- if ((lres = NULL_FILE_CHANNEL.transferFrom(channel, 0, count)) == 0L) {
+ if ((lres = NULL_FILE_CHANNEL.transferFrom(channel, 0, count)) <= 0L) {
break;
}
total += lres;
@@ -983,34 +983,31 @@ public final class Channels {
* @throws IOException if an error occurs
*/
public static long drain(FileChannel channel, long position, long count) throws IOException {
- if (channel instanceof StreamSourceChannel) {
- return drain((StreamSourceChannel) channel, count);
- } else {
- long total = 0L, lres;
- int ires;
- ByteBuffer buffer = null;
- for (;;) {
- if (count == 0L) return total;
- if (NULL_FILE_CHANNEL != null) {
- while (count > 0) {
- if ((lres = channel.transferTo(position, count, NULL_FILE_CHANNEL)) == 0L) {
- break;
- }
- total += lres;
- count -= lres;
+ long total = 0L, lres;
+ int ires;
+ ByteBuffer buffer = null;
+ for (;;) {
+ if (count == 0L) return total;
+ if (NULL_FILE_CHANNEL != null) {
+ while (count > 0) {
+ if ((lres = channel.transferTo(position, count, NULL_FILE_CHANNEL)) <= 0L) {
+ break;
}
- // jump out quick if we drained the fast way
- if (total > 0L) return total;
- }
- if (buffer == null) buffer = DRAIN_BUFFER.duplicate();
- if ((long) buffer.limit() > count) buffer.limit((int) count);
- ires = channel.read(buffer);
- buffer.clear();
- switch (ires) {
- case -1: return total == 0L ? -1L : total;
- case 0: return total;
- default: total += (long) ires;
+ total += lres;
+ count -= lres;
+ position += lres;
}
+ // jump out quick if we drained the fast way
+ if (total > 0L) return total;
+ }
+ if (buffer == null) buffer = DRAIN_BUFFER.duplicate();
+ if ((long) buffer.limit() > count) buffer.limit((int) count);
+ ires = channel.read(buffer, position);
+ buffer.clear();
+ switch (ires) {
+ case -1: return total == 0L ? -1L : total;
+ case 0: return total;
+ default: total += (long) ires;
}
}
}
@@ -1095,7 +1092,7 @@ public final class Channels {
final String osName = System.getProperty("os.name", "unknown").toLowerCase(Locale.US);
try {
if (osName.contains("windows")) {
- return new FileOutputStream("NUL:").getChannel();
+ return new FileOutputStream("NUL").getChannel();
} else {
return new FileOutputStream("/dev/null").getChannel();
}
=====================================
api/src/main/java/org/xnio/conduits/ConduitStreamSinkChannel.java
=====================================
@@ -211,4 +211,12 @@ public final class ConduitStreamSinkChannel implements StreamSinkChannel, WriteL
throw new IllegalStateException(e);
}
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return "ConduitStreamSinkChannel reading from " + this.conduit;
+ }
}
=====================================
api/src/main/java/org/xnio/conduits/Conduits.java
=====================================
@@ -202,7 +202,7 @@ public final class Conduits {
if (count == 0L) return total;
if (NULL_FILE_CHANNEL != null) {
while (count > 0) {
- if ((lres = conduit.transferTo(0, count, NULL_FILE_CHANNEL)) == 0L) {
+ if ((lres = conduit.transferTo(0, count, NULL_FILE_CHANNEL)) <= 0L) {
break;
}
total += lres;
@@ -229,7 +229,7 @@ public final class Conduits {
final String osName = System.getProperty("os.name", "unknown").toLowerCase(Locale.US);
try {
if (osName.contains("windows")) {
- return new FileOutputStream("NUL:").getChannel();
+ return new FileOutputStream("NUL").getChannel();
} else {
return new FileOutputStream("/dev/null").getChannel();
}
=====================================
api/src/main/java/org/xnio/ssl/JsseSslConduitEngine.java
=====================================
@@ -691,7 +691,19 @@ final class JsseSslConduitEngine {
buffer.flip();
}
log.logf(FQCN, Logger.Level.TRACE, null, "Unwrapping %s into %s", buffer, unwrappedBuffer);
- return engine.unwrap(buffer, unwrappedBuffer);
+ try {
+ return engine.unwrap(buffer, unwrappedBuffer);
+ } catch (SSLHandshakeException e) {
+ // see XNIO-406 (TODO once the JDK bug is solved, remove this catch block)
+ // ugly hack. I tried to check for buffer has remaining in previous if block (after read attempt), such as
+ // if (!buffer.hasRemaining() && !engine.isInboundDone() && !engine.isOutboundDone() && engine.getDelegatedTask() == null) {
+ // return new SSLEngineResult(...)
+ // but that breaks other tests (for some known reasons), so it appears that the only workaround for now is catching the exception
+ if (e.getMessage().startsWith("Insufficient buffer remaining for AEAD cipher fragment (2).")) {
+ return new SSLEngineResult(SSLEngineResult.Status.BUFFER_UNDERFLOW, HandshakeStatus.NEED_UNWRAP,0, 0);
+ }
+ throw e;
+ }
}
/**
=====================================
api/src/test/java/org/xnio/channels/ChannelsTestCase.java
=====================================
@@ -19,24 +19,10 @@
package org.xnio.channels;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.xnio.AssertReadWrite.assertReadMessage;
-import static org.xnio.AssertReadWrite.assertWrittenMessage;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.TimeUnit;
-
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.xnio.Buffers;
import org.xnio.ChannelListener;
import org.xnio.Option;
@@ -45,6 +31,31 @@ import org.xnio.StreamConnection;
import org.xnio.mock.AcceptingChannelMock;
import org.xnio.mock.ConnectedStreamChannelMock;
import org.xnio.mock.MessageChannelMock;
+import org.xnio.mock.ReadableByteChannelMock;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongFunction;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.xnio.AssertReadWrite.assertReadMessage;
+import static org.xnio.AssertReadWrite.assertWrittenMessage;
/**
* Test for {@link Channels}.
@@ -69,7 +80,7 @@ public class ChannelsTestCase {
assertTrue(connectedChannelMock.isFlushed());
connectedChannelMock.enableFlush(false);
ByteBuffer buffer = ByteBuffer.allocate(10);
- buffer.put("10".getBytes("UTF-8")).flip();
+ buffer.put("10".getBytes(UTF_8)).flip();
assertEquals(2, connectedChannelMock.write(buffer));
assertWrittenMessage(connectedChannelMock, "10");
assertFalse(connectedChannelMock.isFlushed());
@@ -88,7 +99,7 @@ public class ChannelsTestCase {
public void shutdownWritesBlocking() throws IOException, InterruptedException {
connectedChannelMock.enableFlush(false);
ByteBuffer buffer = ByteBuffer.allocate(10);
- buffer.put("shutdown".getBytes("UTF-8")).flip();
+ buffer.put("shutdown".getBytes(UTF_8)).flip();
assertEquals(8, connectedChannelMock.write(buffer));
assertWrittenMessage(connectedChannelMock, "shutdown");
assertFalse(connectedChannelMock.isShutdownWrites());
@@ -107,7 +118,7 @@ public class ChannelsTestCase {
}
@Test
- public void writeBlocking() throws IOException, InterruptedException {
+ public void writeBlocking() throws InterruptedException {
connectedChannelMock.enableWrite(false);
WriteBlocking writeRunnable = new WriteBlocking(connectedChannelMock, "write this");
Thread writeThread = new Thread(writeRunnable);
@@ -123,7 +134,7 @@ public class ChannelsTestCase {
}
@Test
- public void writeBlockingWithTimeout() throws IOException, InterruptedException {
+ public void writeBlockingWithTimeout() throws InterruptedException {
connectedChannelMock.enableWrite(false);
WriteBlocking writeRunnable = new WriteBlocking(connectedChannelMock, "write with timeout", 1000, TimeUnit.MICROSECONDS);
Thread writeThread = new Thread(writeRunnable);
@@ -139,7 +150,7 @@ public class ChannelsTestCase {
}
@Test
- public void writeBufferArrayBlocking() throws IOException, InterruptedException {
+ public void writeBufferArrayBlocking() throws InterruptedException {
connectedChannelMock.enableWrite(false);
WriteBufferArrayBlocking writeRunnable = new WriteBufferArrayBlocking(connectedChannelMock, "write", " this");
Thread writeThread = new Thread(writeRunnable);
@@ -155,7 +166,7 @@ public class ChannelsTestCase {
}
@Test
- public void writeBufferArrayBlockingWithTimeout() throws IOException, InterruptedException {
+ public void writeBufferArrayBlockingWithTimeout() throws InterruptedException {
connectedChannelMock.enableWrite(false);
WriteBufferArrayBlocking writeRunnable = new WriteBufferArrayBlocking(connectedChannelMock, 1000,
TimeUnit.MILLISECONDS, "write", "with", "timeout");
@@ -172,7 +183,7 @@ public class ChannelsTestCase {
}
@Test
- public void sendBlocking() throws IOException, InterruptedException {
+ public void sendBlocking() throws InterruptedException {
connectedChannelMock.enableWrite(false);
SendBlocking sendRunnable = new SendBlocking(messageChannelMock, "send this");
Thread sendThread = new Thread(sendRunnable);
@@ -187,7 +198,7 @@ public class ChannelsTestCase {
}
@Test
- public void sendBlockingWithTimeout() throws IOException, InterruptedException {
+ public void sendBlockingWithTimeout() throws InterruptedException {
connectedChannelMock.enableWrite(false);
SendBlocking sendRunnable = new SendBlocking(messageChannelMock, "send with timeout", 1000, TimeUnit.MICROSECONDS);
Thread sendThread = new Thread(sendRunnable);
@@ -204,7 +215,7 @@ public class ChannelsTestCase {
}
@Test
- public void sendBufferArrayBlocking() throws IOException, InterruptedException {
+ public void sendBufferArrayBlocking() throws InterruptedException {
connectedChannelMock.enableWrite(false);
SendBufferArrayBlocking sendRunnable = new SendBufferArrayBlocking(messageChannelMock, "send", " this");
Thread sendThread = new Thread(sendRunnable);
@@ -219,7 +230,7 @@ public class ChannelsTestCase {
}
@Test
- public void sendBufferArrayBlockingWithTimeout() throws IOException, InterruptedException {
+ public void sendBufferArrayBlockingWithTimeout() throws InterruptedException {
connectedChannelMock.enableWrite(false);
SendBufferArrayBlocking sendRunnable = new SendBufferArrayBlocking(messageChannelMock, 1000,
TimeUnit.MILLISECONDS, "send", "with", "timeout");
@@ -237,7 +248,7 @@ public class ChannelsTestCase {
}
@Test
- public void readBlocking() throws IOException, InterruptedException {
+ public void readBlocking() throws InterruptedException {
connectedChannelMock.setReadData("read this");
ReadBlocking readRunnable = new ReadBlocking(connectedChannelMock);
Thread readThread = new Thread(readRunnable);
@@ -253,7 +264,7 @@ public class ChannelsTestCase {
}
@Test
- public void readBlockingToEmptyBuffer() throws IOException, InterruptedException {
+ public void readBlockingToEmptyBuffer() throws InterruptedException {
connectedChannelMock.setReadData("can't read this");
ReadBlocking readRunnable = new ReadBlocking(connectedChannelMock, Buffers.EMPTY_BYTE_BUFFER);
Thread readThread = new Thread(readRunnable);
@@ -264,7 +275,7 @@ public class ChannelsTestCase {
}
@Test
- public void readBlockingWithTimeout() throws IOException, InterruptedException {
+ public void readBlockingWithTimeout() throws InterruptedException {
connectedChannelMock.setReadData("read with timeout");
ReadBlocking readRunnable = new ReadBlocking(connectedChannelMock, 100, TimeUnit.MILLISECONDS);
Thread readThread = new Thread(readRunnable);
@@ -280,7 +291,7 @@ public class ChannelsTestCase {
}
@Test
- public void readBlockingWithTimeoutToEmptyBuffer() throws IOException, InterruptedException {
+ public void readBlockingWithTimeoutToEmptyBuffer() throws InterruptedException {
connectedChannelMock.setReadData("can't read this");
ReadBlocking readRunnable = new ReadBlocking(connectedChannelMock, 100, TimeUnit.MILLISECONDS, Buffers.EMPTY_BYTE_BUFFER);
Thread readThread = new Thread(readRunnable);
@@ -295,7 +306,7 @@ public class ChannelsTestCase {
}
@Test
- public void readBlockingToBufferArray() throws IOException, InterruptedException {
+ public void readBlockingToBufferArray() throws InterruptedException {
connectedChannelMock.setReadData("read", "this");
ReadToBufferArrayBlocking readRunnable = new ReadToBufferArrayBlocking(connectedChannelMock);
Thread readThread = new Thread(readRunnable);
@@ -315,7 +326,7 @@ public class ChannelsTestCase {
}
@Test
- public void readBlockingToBufferArrayWithTimeout() throws IOException, InterruptedException {
+ public void readBlockingToBufferArrayWithTimeout() throws InterruptedException {
connectedChannelMock.setReadData("read", "with", "timeout");
ReadToBufferArrayBlocking readRunnable = new ReadToBufferArrayBlocking(connectedChannelMock, 1000,
TimeUnit.MILLISECONDS);
@@ -336,13 +347,13 @@ public class ChannelsTestCase {
}
@Test
- public void readBlockingToEmptyBufferArrayWithTimeout() throws IOException, InterruptedException {
+ public void readBlockingToEmptyBufferArrayWithTimeout() throws IOException {
connectedChannelMock.setReadData("can't read this");
assertEquals(0, Channels.readBlocking(connectedChannelMock, new ByteBuffer[0], 0, 0, 2, TimeUnit.MINUTES));
}
@Test
- public void receiveBlocking() throws IOException, InterruptedException {
+ public void receiveBlocking() throws InterruptedException {
connectedChannelMock.setReadData("receive this");
ReceiveBlocking receiveRunnable = new ReceiveBlocking(messageChannelMock);
Thread receiveThread = new Thread(receiveRunnable);
@@ -358,7 +369,7 @@ public class ChannelsTestCase {
}
@Test
- public void receiveBlockingWithTimeout() throws IOException, InterruptedException {
+ public void receiveBlockingWithTimeout() throws InterruptedException {
connectedChannelMock.setReadData("receive with timeout");
ReceiveBlocking receiveRunnable = new ReceiveBlocking(messageChannelMock, 100, TimeUnit.MILLISECONDS);
Thread receiveThread = new Thread(receiveRunnable);
@@ -374,7 +385,7 @@ public class ChannelsTestCase {
}
@Test
- public void receiveBufferArrayBlocking() throws IOException, InterruptedException {
+ public void receiveBufferArrayBlocking() throws InterruptedException {
connectedChannelMock.setReadData("receive", "this");
ReceiveBufferArrayBlocking receiveRunnable = new ReceiveBufferArrayBlocking(messageChannelMock);
Thread receiveThread = new Thread(receiveRunnable);
@@ -394,7 +405,7 @@ public class ChannelsTestCase {
}
@Test
- public void receiveBufferArrayBlockingWithTimeout() throws IOException, InterruptedException {
+ public void receiveBufferArrayBlockingWithTimeout() throws InterruptedException {
connectedChannelMock.setReadData("receive", "with", "timeout");
ReceiveBufferArrayBlocking receiveRunnable = new ReceiveBufferArrayBlocking(messageChannelMock, 1000,
TimeUnit.MILLISECONDS);
@@ -417,7 +428,7 @@ public class ChannelsTestCase {
@Test
public void acceptBlocking() throws IOException, InterruptedException {
final AcceptingChannelMock acceptingChannelMock = new AcceptingChannelMock();
- final AcceptBlocking<?> acceptBlockingRunnable = new AcceptBlocking<StreamConnection>(acceptingChannelMock);
+ final AcceptBlocking<?> acceptBlockingRunnable = new AcceptBlocking<>(acceptingChannelMock);
final Thread acceptChannelThread = new Thread(acceptBlockingRunnable);
assertNotNull(Channels.acceptBlocking(acceptingChannelMock));
assertFalse(acceptingChannelMock.haveWaitedAcceptable());
@@ -437,7 +448,7 @@ public class ChannelsTestCase {
@Test
public void acceptBlockingWithTimeout() throws IOException, InterruptedException {
final AcceptingChannelMock acceptingChannelMock = new AcceptingChannelMock();
- final AcceptBlocking<?> acceptBlockingRunnable = new AcceptBlocking<StreamConnection>(acceptingChannelMock, 10, TimeUnit.SECONDS);
+ final AcceptBlocking<?> acceptBlockingRunnable = new AcceptBlocking<>(acceptingChannelMock, 10, TimeUnit.SECONDS);
final Thread acceptChannelThread = new Thread(acceptBlockingRunnable);
// try to accept blocking with acceptance enabled at accepting channel mock
assertNotNull(Channels.acceptBlocking(acceptingChannelMock, 1, TimeUnit.SECONDS));
@@ -447,7 +458,7 @@ public class ChannelsTestCase {
acceptChannelThread.start();
acceptChannelThread.join(200);
assertFalse(acceptChannelThread.isAlive());
- // thread is supposed to have finished, after having invoked awaitAcceptable at acceptingchannelMock with 10s timeout
+ // thread is supposed to have finished, after having invoked awaitAcceptable at acceptingChannelMock with 10s timeout
assertTrue(acceptingChannelMock.haveWaitedAcceptable());
assertEquals(10, acceptingChannelMock.getAwaitAcceptableTime());
assertEquals(TimeUnit.SECONDS, acceptingChannelMock.getAwaitAcceptableTimeUnit());
@@ -515,7 +526,7 @@ public class ChannelsTestCase {
final FileChannel fileChannel = randomAccessFile.getChannel();
try {
final ByteBuffer buffer = ByteBuffer.allocate(10);
- buffer.put("test".getBytes("UTF-8")).flip();
+ buffer.put("test".getBytes(UTF_8)).flip();
assertEquals(4, fileChannel.write(buffer));
fileChannel.position(0);
Channels.transferBlocking(channelMock, fileChannel, 0, 4);
@@ -536,7 +547,7 @@ public class ChannelsTestCase {
final FileChannel fileChannel = randomAccessFile.getChannel();
try {
final ByteBuffer buffer = ByteBuffer.allocate(10);
- buffer.put("test12345".getBytes("UTF-8")).flip();
+ buffer.put("test12345".getBytes(UTF_8)).flip();
assertEquals(9, fileChannel.write(buffer));
fileChannel.position(0);
@@ -556,9 +567,7 @@ public class ChannelsTestCase {
@Test
public void setChannelListeners() {
final ConnectedStreamChannelMock channelMock = new ConnectedStreamChannelMock();
- final ChannelListener<ConnectedStreamChannel> channelListener = new ChannelListener<ConnectedStreamChannel>() {
- public void handleEvent(final ConnectedStreamChannel channel) {}
- };
+ final ChannelListener<ConnectedStreamChannel> channelListener = channel -> {};
// test setReadListener
Channels.setReadListener(channelMock, channelListener);
@@ -582,9 +591,7 @@ public class ChannelsTestCase {
@Test
public void setAcceptListener() {
final AcceptingChannelMock channelMock = new AcceptingChannelMock();
- final ChannelListener<AcceptingChannel<StreamConnection>> channelListener = new ChannelListener<AcceptingChannel<StreamConnection>>() {
- public void handleEvent(final AcceptingChannel<StreamConnection> channel) {}
- };
+ final ChannelListener<AcceptingChannel<StreamConnection>> channelListener = channel -> {};
Channels.setAcceptListener(channelMock, channelListener);
assertSame(channelListener, channelMock.getAcceptListener());
@@ -629,16 +636,16 @@ public class ChannelsTestCase {
assertReadMessage(bufferArray[5]);
// test write(ByteBuffer)
buffer.clear();
- buffer.put("write".getBytes("UTF-8")).flip();
+ buffer.put("write".getBytes(UTF_8)).flip();
wrappedByteChannel.write(buffer);
assertWrittenMessage(channelMock, "write");
// test write(ByteBuffer[])
for(ByteBuffer bufferItem: bufferArray) {
bufferItem.clear();
}
- bufferArray[0].put("writ".getBytes("UTF-8")).flip();
- bufferArray[1].put("e_ag".getBytes("UTF-8")).flip();
- bufferArray[2].put("ain".getBytes("UTF-8")).flip();
+ bufferArray[0].put("writ".getBytes(UTF_8)).flip();
+ bufferArray[1].put("e_ag".getBytes(UTF_8)).flip();
+ bufferArray[2].put("ain".getBytes(UTF_8)).flip();
bufferArray[3].flip();
bufferArray[4].flip();
bufferArray[5].flip();
@@ -692,13 +699,14 @@ public class ChannelsTestCase {
assertEquals(1000, Channels.getOption(channelMock, Options.MAX_OUTBOUND_MESSAGE_SIZE, 1000));
assertEquals(5000, Channels.getOption(brokenConfigurable, Options.SSL_CLIENT_SESSION_TIMEOUT, 5000));
// long type option
- assertEquals(1l, Channels.getOption(channelMock, Options.STACK_SIZE, 1l));
- channelMock.setOption(Options.STACK_SIZE, 50000l);
- assertEquals(50000l, Channels.getOption(channelMock, Options.STACK_SIZE, 100));
+ assertEquals(1L, Channels.getOption(channelMock, Options.STACK_SIZE, 1L));
+ channelMock.setOption(Options.STACK_SIZE, 50000L);
+ assertEquals(50000L, Channels.getOption(channelMock, Options.STACK_SIZE, 100));
assertEquals(100, Channels.getOption(brokenConfigurable, Options.STACK_SIZE, 100));
}
@Test
+ @SuppressWarnings( "deprecation" )
public void unwrap() {
assertNull(Channels.unwrap(ConnectedStreamChannelMock.class, null));
final ConnectedStreamChannelMock channelMock = new ConnectedStreamChannelMock();
@@ -708,6 +716,207 @@ public class ChannelsTestCase {
assertNull(Channels.unwrap(FramedMessageChannel.class, channelMock));
}
+ @Test
+ public void drainStreamSourceChannel() throws IOException {
+ ConnectedStreamChannelMock channelMock = new ConnectedStreamChannelMock();
+ assertDrain(channelMock, (long count)-> {
+ try {
+ return Channels.drain(channelMock, count);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ public void drainStreamSourceChannelAsReadableByteChannel() throws IOException {
+ ReadableByteChannelMock channelMock = new ConnectedStreamChannelMock();
+ assertDrain(channelMock, (long count)-> {
+ try {
+ return Channels.drain(channelMock, count);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ public void drainReadableByteChannel() throws IOException {
+ ReadableByteChannelMock channelMock = new ReadableByteChannelMock();
+ assertDrain(channelMock, (long count)-> {
+ try {
+ return Channels.drain(channelMock, count);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void drainFileChannel() throws IOException {
+ final File file1 = folder.newFile();
+ //write out data to the test files
+ final FileWriter fw1 = new FileWriter( file1 );
+ final BufferedWriter bw1 = new BufferedWriter( fw1 );
+ for (int i = 0; i < 5; i++)
+ bw1.write( "read data\n");
+ bw1.close();
+ fw1.close();
+ for (Method method: FileChannel.class.getDeclaredMethods())
+ if (Modifier.isStatic(method.getModifiers()))
+ System.out.println("method: " + method);
+ final FileChannel fileChannel = new FileInputStream(file1).getChannel();
+ try
+ {
+ // test drain 0
+ assertEquals(0, Channels.drain(fileChannel, 0));
+ ByteBuffer buffer = ByteBuffer.allocate(10);
+ fileChannel.read(buffer, 0);
+ assertReadMessage(buffer, "read data\n");
+
+ // test drain negative
+ buffer.clear();
+ boolean failed = false;
+ try {
+ assertEquals(0, Channels.drain(fileChannel, 10, -3));
+ } catch (IllegalArgumentException illegalArgumentException) {
+ failed = true;
+ }
+ assertTrue(failed);
+ assertEquals(10, fileChannel.read(buffer, 10));
+ assertReadMessage(buffer, "read data\n");
+
+ // test drain 2
+ assertEquals(2, Channels.drain(fileChannel, 20,2));
+ buffer.clear();
+ buffer.limit(8);
+ assertEquals(8, fileChannel.read(buffer, 22));
+ assertReadMessage(buffer, "ad data\n");
+
+ // test drain little by little
+ buffer.clear();
+ buffer.limit(4);
+ assertEquals(1, Channels.drain(fileChannel, 30, 1));
+ assertEquals(2, Channels.drain(fileChannel, 31,2));
+ assertEquals(3, Channels.drain(fileChannel, 33,3));
+ assertEquals(4, fileChannel.read(buffer, 36));
+ assertReadMessage(buffer, "ata\n");
+
+ // test drain more bytes than available
+ buffer.clear();
+ assertEquals(10, Channels.drain(fileChannel, 40, 11));
+
+ // test drain an already drained channel
+ assertEquals(-1, Channels.drain(fileChannel, 50, Long.MAX_VALUE));
+ assertEquals(-1, fileChannel.read(buffer, 50));
+ } finally {
+ fileChannel.close();
+ }
+ boolean failed = false;
+ try {
+ Channels.drain(fileChannel, Long.MAX_VALUE);
+ } catch (ClosedChannelException e) {
+ failed = true;
+ }
+ assertTrue(failed);
+ }
+
+ private void assertDrain(ReadableByteChannelMock channelMock, LongFunction<Long> drainFunction) throws IOException {
+ channelMock.enableClosedCheck(true);
+ // test drain 0
+ channelMock.setReadData("read", "data");
+ channelMock.enableRead(true);
+ assertEquals(0L, (long) drainFunction.apply(0));
+ ByteBuffer buffer = ByteBuffer.allocate(10);
+ channelMock.read(buffer);
+ assertReadMessage(buffer, "read", "data");
+
+ // test drain negative
+ channelMock.setReadData("read", "data");
+ channelMock.enableRead(true);
+ buffer.clear();
+ boolean failed = false;
+ try {
+ drainFunction.apply(-5);
+ } catch (IllegalArgumentException illegalArgumentException) {
+ failed = true;
+ }
+ assertTrue(failed);
+ assertEquals(8, channelMock.read(buffer));
+ assertReadMessage(buffer, "read", "data");
+
+ // test drain 2
+ channelMock.setReadData("read", "data");
+ channelMock.enableRead(true);
+ assertEquals(2L, (long) drainFunction.apply(2));
+ buffer.clear();
+ assertEquals(6, channelMock.read(buffer));
+ assertReadMessage(buffer, "ad", "data");
+
+ // test drain little by little
+ channelMock.setReadData("read", "data");
+ channelMock.enableRead(true);
+ buffer.clear();
+ assertEquals(1L, (long) drainFunction.apply(1));
+ assertEquals(2L, (long) drainFunction.apply(2));
+ assertEquals(3L, (long) drainFunction.apply(3));
+ assertEquals(2, channelMock.read(buffer));
+ assertReadMessage(buffer, "ta");
+
+ // test drain the exact amount of bytes left
+ channelMock.setReadData("read", "data");
+ channelMock.enableRead(true);
+ buffer.clear();
+ assertEquals(8L, (long) drainFunction.apply(8));
+ assertEquals(0, channelMock.read(buffer));
+
+ // test drain more bytes than available
+ channelMock.setReadData("read", "data");
+ channelMock.enableRead(true);
+ buffer.clear();
+ assertEquals(8L, (long) drainFunction.apply(9));
+ assertEquals(0, channelMock.read(buffer));
+
+ // test drain the exact amount of bytes left without reading the EOF
+ channelMock.setReadData("read", "data");
+ channelMock.enableRead(true);
+ channelMock.setEof();
+ buffer.clear();
+ assertEquals(8L, (long) drainFunction.apply(8));
+ assertEquals(-1, channelMock.read(buffer));
+
+ // test drain more bytes than available with eof
+ channelMock.setReadData("read", "data");
+ channelMock.enableRead(true);
+ buffer.clear();
+ assertEquals(8L, (long) drainFunction.apply(9));
+ assertEquals(-1, channelMock.read(buffer));
+
+ // test drain with long max (Undertow usage)
+ channelMock.setReadData("read", "data");
+ channelMock.enableRead(true);
+ buffer.clear();
+ assertEquals(8L, (long) drainFunction.apply(Long.MAX_VALUE));
+ assertEquals(-1, channelMock.read(buffer));
+
+ // test drain an already drained channel
+ assertEquals(-1L, (long) drainFunction.apply(Long.MAX_VALUE));
+ assertEquals(-1, channelMock.read(buffer));
+
+ channelMock.close();
+ failed = false;
+ try {
+ drainFunction.apply(Long.MAX_VALUE);
+ } catch (RuntimeException e) {
+ assertTrue(e.getCause() instanceof ClosedChannelException);
+ failed = true;
+ }
+ assertTrue(failed);
+ }
+
public static class FlushBlocking implements Runnable {
private final SuspendableWriteChannel channel;
@@ -765,7 +974,7 @@ public class ChannelsTestCase {
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(30);
try {
- buffer.put(message.getBytes("UTF-8")).flip();
+ buffer.put(message.getBytes(UTF_8)).flip();
if (timeoutUnit != null) {
writeResult = Channels.writeBlocking(channel, buffer, timeout, timeoutUnit);
} else {
@@ -805,7 +1014,7 @@ public class ChannelsTestCase {
try {
for (int i = 0; i < buffer.length; i++) {
buffer[i] = ByteBuffer.allocate(message[i].length());
- buffer[i].put(message[i].getBytes("UTF-8")).flip();
+ buffer[i].put(message[i].getBytes(UTF_8)).flip();
}
if (timeoutUnit != null) {
writeResult = Channels.writeBlocking(channel, buffer, 0, buffer.length, timeout, timeoutUnit);
@@ -844,7 +1053,7 @@ public class ChannelsTestCase {
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(30);
try {
- buffer.put(message.getBytes("UTF-8")).flip();
+ buffer.put(message.getBytes(UTF_8)).flip();
if (timeoutUnit != null) {
sendResult = Channels.sendBlocking(channel, buffer, timeout, timeoutUnit);
} else {
@@ -884,7 +1093,7 @@ public class ChannelsTestCase {
try {
for (int i = 0; i < buffer.length; i++) {
buffer[i] = ByteBuffer.allocate(message[i].length());
- buffer[i].put(message[i].getBytes("UTF-8")).flip();
+ buffer[i].put(message[i].getBytes(UTF_8)).flip();
}
if (timeoutUnit != null) {
sendResult = Channels.sendBlocking(channel, buffer, 0, buffer.length, timeout, timeoutUnit);
=====================================
api/src/test/java/org/xnio/conduits/ConduitsTestCase.java
=====================================
@@ -0,0 +1,305 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ *
+ * Copyright 2022 Red Hat, Inc. and/or its affiliates, and individual
+ * contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.xnio.conduits;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.xnio.mock.ConduitMock;
+import org.xnio.mock.MessageConduitMock;
+import org.xnio.mock.XnioIoThreadMock;
+import org.xnio.mock.XnioWorkerMock;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.wildfly.common.Assert.assertTrue;
+import static org.xnio.AssertReadWrite.assertReadMessage;
+import static org.xnio.AssertReadWrite.assertWrittenMessage;
+
+/**
+ * Test for {@link Conduits}.
+ *
+ * @author <a href="mailto:frainone at redhat.com">Flavia Rainone</a>
+ */
+public class ConduitsTestCase {
+
+ private ConduitMock conduitMock;
+ private MessageConduitMock messageConduitMock;
+
+
+ @Before
+ public void init() {
+ final XnioWorkerMock worker = new XnioWorkerMock();
+ final XnioIoThreadMock threadMock = worker.chooseThread();
+ threadMock.start();
+ conduitMock = new ConduitMock(worker, threadMock);
+ messageConduitMock = new MessageConduitMock(worker, threadMock);
+ }
+
+ @Test
+ public void transferToFile1() throws IOException {
+ conduitMock.setReadData("test");
+ conduitMock.enableReads(true);
+ final File file = File.createTempFile("test", ".txt");
+ file.deleteOnExit();
+ final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+ final FileChannel fileChannel = randomAccessFile.getChannel();
+ final ByteBuffer buffer = ByteBuffer.allocate(10);
+ try {
+ assertEquals(0, Conduits.transfer(conduitMock, 0, buffer, fileChannel));
+ fileChannel.position(0);
+ assertEquals(buffer.position(), buffer.limit());
+ buffer.compact();
+ fileChannel.read(buffer);
+ assertReadMessage(buffer, "");
+ assertEquals(4, Conduits.transfer(conduitMock, 4, buffer, fileChannel));
+ fileChannel.position(0);
+ assertEquals(buffer.position(), buffer.limit());
+ buffer.compact();
+ fileChannel.read(buffer);
+ assertReadMessage(buffer, "test");
+ } finally {
+ fileChannel.close();
+ randomAccessFile.close();
+ }
+ }
+
+ @Test
+ public void transferToFile2() throws IOException {
+ conduitMock.setReadData("test", "12345");
+ final File file = File.createTempFile("test", ".txt");
+ file.deleteOnExit();
+ final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+ final FileChannel fileChannel = randomAccessFile.getChannel();
+ final ByteBuffer buffer = ByteBuffer.allocate(10);
+ try {
+ assertEquals(0, Conduits.transfer(conduitMock, 8, buffer, fileChannel));
+ fileChannel.position(0);
+ assertEquals(buffer.position(), buffer.limit());
+ buffer.compact();
+ fileChannel.read(buffer);
+ assertReadMessage(buffer, "");
+ conduitMock.enableReads(true);
+ assertEquals(8, Conduits.transfer(conduitMock, 8, buffer, fileChannel));
+ fileChannel.position(0);
+ buffer.compact();
+ fileChannel.read(buffer);
+ assertReadMessage(buffer, "test", "1234");
+ } finally {
+ fileChannel.close();
+ randomAccessFile.close();
+ }
+ }
+
+ @Test
+ public void transferFromFile1() throws IOException {
+ final File file = File.createTempFile("test", ".txt");
+ file.deleteOnExit();
+ final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+ final FileChannel fileChannel = randomAccessFile.getChannel();
+ try {
+ final ByteBuffer buffer = ByteBuffer.allocate(10);
+ buffer.put("test".getBytes(UTF_8)).flip();
+ assertEquals(4, fileChannel.write(buffer));
+ buffer.compact();
+ fileChannel.position(0);
+ assertEquals(4, Conduits.transfer(fileChannel, 4, buffer, conduitMock));
+ assertFalse(buffer.hasRemaining());
+ assertWrittenMessage(conduitMock, "test");
+ } finally {
+ fileChannel.close();
+ randomAccessFile.close();
+ }
+ }
+
+ @Test
+ public void transferFromFile2() throws IOException {
+ conduitMock.enableWrites(false);
+ final File file = File.createTempFile("test", ".txt");
+ file.deleteOnExit();
+ final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+ final FileChannel fileChannel = randomAccessFile.getChannel();
+ try {
+ final ByteBuffer buffer = ByteBuffer.allocate(10);
+ buffer.put("test12345".getBytes(UTF_8)).flip();
+ assertEquals(9, fileChannel.write(buffer));
+ fileChannel.position(0);
+ buffer.compact();
+ assertEquals(0, Conduits.transfer(fileChannel, 8, buffer, conduitMock));
+ assertWrittenMessage(conduitMock, "");
+ conduitMock.enableWrites(true);
+ assertEquals(8, buffer.remaining());
+ conduitMock.write(buffer);
+ assertWrittenMessage(conduitMock, "test", "1234");
+ buffer.compact();
+ assertEquals(1, Conduits.transfer(fileChannel, 8, buffer, conduitMock));
+ assertWrittenMessage(conduitMock, "test", "12345");
+ } finally {
+ fileChannel.close();
+ randomAccessFile.close();
+ }
+ }
+
+ @Test
+ public void writeFinalBasic() throws IOException {
+ conduitMock.enableWrites(false);
+ final ByteBuffer buffer = ByteBuffer.allocate(10);
+ buffer.put("write this".getBytes(UTF_8)).flip();
+ assertEquals(0, Conduits.writeFinalBasic(conduitMock, buffer));
+ assertFalse(conduitMock.isWriteShutdown());
+ conduitMock.enableWrites(true);
+ assertEquals(10, Conduits.writeFinalBasic(conduitMock, buffer));
+ assertWrittenMessage(conduitMock, "write this");
+ assertTrue(conduitMock.isWriteShutdown());
+ assertFalse(conduitMock.isFlushed());
+ }
+
+ @Test
+ public void writeFinalBasicBufferArray() throws IOException {
+ conduitMock.enableWrites(false);
+ ByteBuffer[] bufferArray = new ByteBuffer[]{ByteBuffer.allocate(5), ByteBuffer.allocate(4)};
+ bufferArray[0].put("write".getBytes(UTF_8)).flip();
+ bufferArray[1].put("this".getBytes(UTF_8)).flip();
+ assertEquals(0, Conduits.writeFinalBasic(conduitMock, bufferArray, 0, 2));
+ assertFalse(conduitMock.isWriteShutdown());
+ conduitMock.enableWrites(true);
+ assertEquals(9, Conduits.writeFinalBasic(conduitMock, bufferArray, 0, 2));
+ assertWrittenMessage(conduitMock, "writethis");
+ assertTrue(conduitMock.isWriteShutdown());
+ assertFalse(conduitMock.isFlushed());
+ }
+
+ @Test
+ public void sendFinalBasic() throws IOException {
+ messageConduitMock.enableWrites(false);
+ final ByteBuffer buffer = ByteBuffer.allocate(10);
+ buffer.put("send this".getBytes(UTF_8)).flip();
+ assertFalse(Conduits.sendFinalBasic(messageConduitMock, buffer));
+ assertFalse(messageConduitMock.isWriteShutdown());
+ messageConduitMock.enableWrites(true);
+ assertTrue(Conduits.sendFinalBasic(messageConduitMock, buffer));
+ assertWrittenMessage(messageConduitMock, "send this");
+ assertTrue(messageConduitMock.isWriteShutdown());
+ assertFalse(messageConduitMock.isFlushed());
+ }
+
+ @Test
+ public void sendFinalBasicBufferArray() throws IOException {
+ messageConduitMock.enableWrites(false);
+ ByteBuffer[] bufferArray = new ByteBuffer[]{ByteBuffer.allocate(4), ByteBuffer.allocate(4)};
+ bufferArray[0].put("send".getBytes(UTF_8)).flip();
+ bufferArray[1].put("this".getBytes(UTF_8)).flip();
+ assertFalse(Conduits.sendFinalBasic(messageConduitMock, bufferArray, 0, 2));
+ messageConduitMock.enableWrites(true);
+ assertTrue(Conduits.sendFinalBasic(messageConduitMock, bufferArray, 0, 2));
+ assertWrittenMessage(messageConduitMock, "sendthis");
+ assertTrue(messageConduitMock.isWriteShutdown());
+ assertFalse(messageConduitMock.isFlushed());
+ }
+
+ @Test
+ public void drain() throws IOException {
+ // test drain 0
+ conduitMock.setReadData("read", "data");
+ conduitMock.enableReads(true);
+ assertEquals(0, Conduits.drain(conduitMock, 0));
+ ByteBuffer buffer = ByteBuffer.allocate(10);
+ conduitMock.read(buffer);
+ assertReadMessage(buffer, "read", "data");
+
+ // test drain negative
+ conduitMock.setReadData("read", "data");
+ conduitMock.enableReads(true);
+ buffer.clear();
+ boolean failed = false;
+ try {
+ Conduits.drain(conduitMock, -5);
+ } catch (IllegalArgumentException illegalArgumentException) {
+ failed = true;
+ }
+ assertTrue(failed);
+ assertEquals(8, conduitMock.read(buffer));
+ assertReadMessage(buffer, "read", "data");
+
+ // test drain 2
+ conduitMock.setReadData("read", "data");
+ conduitMock.enableReads(true);
+ assertEquals(2, Conduits.drain(conduitMock, 2));
+ buffer.clear();
+ assertEquals(6, conduitMock.read(buffer));
+ assertReadMessage(buffer, "ad", "data");
+
+ // test drain little by little
+ conduitMock.setReadData("read", "data");
+ conduitMock.enableReads(true);
+ buffer.clear();
+ assertEquals(1, Conduits.drain(conduitMock, 1));
+ assertEquals(2, Conduits.drain(conduitMock, 2));
+ assertEquals(3, Conduits.drain(conduitMock, 3));
+ assertEquals(2, conduitMock.read(buffer));
+ assertReadMessage(buffer, "ta");
+
+ // test drain the exact amount of bytes left
+ conduitMock.setReadData("read", "data");
+ conduitMock.enableReads(true);
+ buffer.clear();
+ assertEquals(8, Conduits.drain(conduitMock, 8));
+ assertEquals(0, conduitMock.read(buffer));
+
+ // test drain more bytes than available
+ conduitMock.setReadData("read", "data");
+ conduitMock.enableReads(true);
+ buffer.clear();
+ assertEquals(8, Conduits.drain(conduitMock, 9));
+ assertEquals(0, conduitMock.read(buffer));
+
+ // test drain the exact amount of bytes left without reading the EOF
+ conduitMock.setReadData("read", "data");
+ conduitMock.enableReads(true);
+ conduitMock.setEof();
+ buffer.clear();
+ assertEquals(8, Conduits.drain(conduitMock, 8));
+ assertEquals(-1, conduitMock.read(buffer));
+
+ // test drain more bytes than available with eof
+ conduitMock.setReadData("read", "data");
+ conduitMock.enableReads(true);
+ buffer.clear();
+ assertEquals(8, Conduits.drain(conduitMock, 9));
+ assertEquals(-1, conduitMock.read(buffer));
+
+ // test drain with long max (Undertow usage)
+ conduitMock.setReadData("read", "data");
+ conduitMock.enableReads(true);
+ buffer.clear();
+ assertEquals(8, Conduits.drain(conduitMock, Long.MAX_VALUE));
+ assertEquals(-1, conduitMock.read(buffer));
+
+ // test drain an already drained channel
+ assertEquals(-1, Conduits.drain(conduitMock, Long.MAX_VALUE));
+ assertEquals(-1, conduitMock.read(buffer));
+ }
+}
=====================================
api/src/test/java/org/xnio/mock/ConnectedStreamChannelMock.java
=====================================
@@ -20,7 +20,6 @@
package org.xnio.mock;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
@@ -49,32 +48,25 @@ import org.xnio.channels.StreamSourceChannel;
*
* @author <a href="mailto:flavia.rainone at jboss.com">Flavia Rainone</a>
*/
-public class ConnectedStreamChannelMock implements ConnectedStreamChannel, StreamSourceChannel, StreamSinkChannel, Mock{
+public class ConnectedStreamChannelMock extends ReadableByteChannelMock implements ConnectedStreamChannel, StreamSourceChannel, StreamSinkChannel, Mock {
+ private OptionMap optionMap;
+ private Thread readWaiter;
// written stuff will be copied to this buffer
- private ByteBuffer writeBuffer = ByteBuffer.allocate(1000);
- // read stuff will be taken from this buffer
- private ByteBuffer readBuffer = ByteBuffer.allocate(10000);
- // read stuff can only be read if read is enabled
- private boolean readEnabled;
+ private final ByteBuffer writeBuffer = ByteBuffer.allocate(1000);
// can only write when write is enabled
private boolean writeEnabled = true;
- // indicates if this channel is closed
- private boolean closed = false;
- private boolean checkClosed = true;
private boolean writeResumed = false;
private boolean writeAwaken = false;
private boolean readAwaken = false;
private boolean readResumed = false;
private boolean readsDown = false;
private boolean writesDown = false;
- private boolean allowShutdownWrites = true;
+ private final boolean allowShutdownWrites = true;
private boolean flushed = true;
private boolean flushEnabled = true;
- private boolean eof = false;
private XnioWorker worker = new XnioWorkerMock();
- private XnioIoThread executor = new XnioIoThreadMock(null);
- private Thread readWaiter;
+ private final XnioIoThread executor = new XnioIoThreadMock(null);
private Thread writeWaiter;
private ChannelListener<? super ConnectedStreamChannel> readListener;
private ChannelListener<? super ConnectedStreamChannel> writeListener;
@@ -82,119 +74,48 @@ public class ConnectedStreamChannelMock implements ConnectedStreamChannel, Strea
private String info = null; // any extra information regarding this channel used by tests
// listener setters
- private final ChannelListener.Setter<ConnectedStreamChannel> readListenerSetter = new ChannelListener.Setter<ConnectedStreamChannel>() {
+ private final Setter<ConnectedStreamChannel> readListenerSetter = new Setter<ConnectedStreamChannel>() {
@Override
public void set(ChannelListener<? super ConnectedStreamChannel> listener) {
readListener = listener;
}
};
- private final ChannelListener.Setter<ConnectedStreamChannel> writeListenerSetter = new ChannelListener.Setter<ConnectedStreamChannel>() {
+ private final Setter<ConnectedStreamChannel> writeListenerSetter = new Setter<ConnectedStreamChannel>() {
@Override
public void set(ChannelListener<? super ConnectedStreamChannel> listener) {
writeListener = listener;
}
};
- private final ChannelListener.Setter<ConnectedStreamChannel> closeListenerSetter = new ChannelListener.Setter<ConnectedStreamChannel>() {
+ private final Setter<ConnectedStreamChannel> closeListenerSetter = new Setter<ConnectedStreamChannel>() {
@Override
public void set(ChannelListener<? super ConnectedStreamChannel> listener) {
closeListener = listener;
}
};
-
/**
* Feeds {@code readData} to read clients.
* @param readData data that will be available for reading on this channel mock
*/
public void setReadData(String... readData) {
final Thread waiter;
-
synchronized (this) {
- int totalLength = 0;
- for (String data: readData) {
- totalLength += data.length();
- }
- int position = readBuffer.position();
- boolean resetPosition = false;
- if (!readBuffer.hasRemaining()) {
- readBuffer.compact();
- } else if(readBuffer.position() > 0 || readBuffer.limit() != readBuffer.capacity()) {
- if (readBuffer.capacity() - readBuffer.limit() < totalLength) {
- if (readBuffer.position() > 0 && readBuffer.capacity() - readBuffer.limit() + readBuffer.position() >= totalLength) {
- readBuffer.compact();
- }
- throw new RuntimeException("ReadBuffer is full - not enough space to add more read data");
- }
- int limit = readBuffer.limit();
- readBuffer.position(limit);
- readBuffer.limit(limit += totalLength);
- resetPosition = true;
- }
- for (String data: readData) {
- try {
- readBuffer.put(data.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
+ int totalLength = super.doSetReadData(readData);
+ if (readWaiter == null || totalLength == 0 || !readEnabled) {
+ return;
}
- }
- readBuffer.flip();
- if (resetPosition) {
- readBuffer.position(position);
- }
-
- if (readWaiter == null || totalLength == 0 || !readEnabled) {
- return;
- }
- waiter = readWaiter;
- readWaiter = null;
+ waiter = readWaiter;
+ readWaiter = null;
}
LockSupport.unpark(waiter);
}
- /**
- * Feeds {@code readData} to read clients.
- * @param readData data that will be available for reading on this channel mock
- */
public void setReadDataWithLength(String... readData) {
final Thread waiter;
synchronized (this) {
- if (eof == true) {
- throw new IllegalStateException("Cannot add read data once eof is set");
- }
- int totalLength = 0;
- for (String data: readData) {
- totalLength += data.length();
- }
- int position = readBuffer.position();
- boolean resetPosition = false;
- if (!readBuffer.hasRemaining()) {
- readBuffer.compact();
- } else if(readBuffer.position() > 0 || readBuffer.limit() != readBuffer.capacity()) {
- if (readBuffer.capacity() - readBuffer.limit() + 4 < totalLength) {
- if (readBuffer.position() > 0 && readBuffer.capacity() - readBuffer.limit() + readBuffer.position() + 4 >= totalLength) {
- readBuffer.compact();
- }
- throw new RuntimeException("ReadBuffer is full - not enough space to add more read data");
- }
- int limit = readBuffer.limit();
- readBuffer.position(limit);
- readBuffer.limit(limit += totalLength + 4);
- resetPosition = true;
- }
- readBuffer.putInt(totalLength);
- for (String data: readData) {
- try {
- readBuffer.put(data.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
- readBuffer.flip();
- if (resetPosition) {
- readBuffer.position(position);
- }
+ int totalLength = doSetReadDataWithLength(readData);
if (readWaiter == null || totalLength == 0 || !readEnabled) {
return;
}
@@ -203,48 +124,10 @@ public class ConnectedStreamChannelMock implements ConnectedStreamChannel, Strea
LockSupport.unpark(waiter);
}
- /**
- * Feeds {@code readData} to read clients.
- * @param readData data that will be available for reading on this channel mock
- */
public void setReadDataWithLength(int length, String... readData) {
final Thread waiter;
synchronized (this) {
- if (eof == true) {
- throw new IllegalStateException("Cannot add read data once eof is set");
- }
- int totalLength = 0;
- for (String data: readData) {
- totalLength += data.length();
- }
- int position = readBuffer.position();
- boolean resetPosition = false;
- if (!readBuffer.hasRemaining()) {
- readBuffer.compact();
- } else if(readBuffer.position() > 0 || readBuffer.limit() != readBuffer.capacity()) {
- if (readBuffer.capacity() - readBuffer.limit() + 4 < totalLength) {
- if (readBuffer.position() > 0 && readBuffer.capacity() - readBuffer.limit() + readBuffer.position() + 4 >= totalLength) {
- readBuffer.compact();
- }
- throw new RuntimeException("ReadBuffer is full - not enough space to add more read data");
- }
- int limit = readBuffer.limit();
- readBuffer.position(limit);
- readBuffer.limit(limit += totalLength + 4);
- resetPosition = true;
- }
- readBuffer.putInt(length);
- for (String data: readData) {
- try {
- readBuffer.put(data.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
- readBuffer.flip();
- if (resetPosition) {
- readBuffer.position(position);
- }
+ int totalLength = doSetReadDataWithLength(length, readData);
if (readWaiter == null || totalLength == 0 || !readEnabled) {
return;
}
@@ -256,7 +139,7 @@ public class ConnectedStreamChannelMock implements ConnectedStreamChannel, Strea
public void setEof() {
final Thread waiter;
synchronized (this) {
- eof = true;
+ super.setEof();
if (readWaiter == null || !readEnabled) {
return;
}
@@ -265,10 +148,27 @@ public class ConnectedStreamChannelMock implements ConnectedStreamChannel, Strea
LockSupport.unpark(waiter);
}
+ /**
+ * Returns all the bytes that have been written to this channel mock.
+ *
+ * @return the written bytes in the form of a UTF-8 string
+ */
+ public String getWrittenText() {
+ if (writeBuffer.position() == 0 && writeBuffer.limit() == writeBuffer.capacity()) {
+ return "";
+ }
+ writeBuffer.flip();
+ return Buffers.getModifiedUtf8(writeBuffer);
+ }
+
+ public ByteBuffer getWrittenBytes() {
+ return writeBuffer;
+ }
+
public void enableRead(boolean enable) {
final Thread waiter;
synchronized (this) {
- readEnabled = enable;
+ super.enableRead(enable);
if (readWaiter == null || !readEnabled || !((readBuffer.hasRemaining() && readBuffer.limit() != readBuffer.capacity()) || eof)) {
return;
}
@@ -288,44 +188,9 @@ public class ConnectedStreamChannelMock implements ConnectedStreamChannel, Strea
}
}
- public synchronized void enableClosedCheck(boolean enable) {
- checkClosed = enable;
- }
-
- /**
- * Returns all the bytes that have been written to this channel mock.
- *
- * @return the written bytes in the form of a UTF-8 string
- */
- public String getWrittenText() {
- if (writeBuffer.position() == 0 && writeBuffer.limit() == writeBuffer.capacity()) {
- return "";
- }
- writeBuffer.flip();
- return Buffers.getModifiedUtf8(writeBuffer);
- }
-
- public ByteBuffer getWrittenBytes() {
- return writeBuffer;
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
- shutdownWrites();
- shutdownReads();
- }
-
- @Override
- public boolean isOpen() {
- return !closed;
- }
-
- private OptionMap optionMap;
-
@Override
public boolean supportsOption(Option<?> option) {
- return optionMap == null? false: optionMap.contains(option);
+ return optionMap != null && optionMap.contains(option);
}
@Override
@@ -374,9 +239,8 @@ public class ConnectedStreamChannelMock implements ConnectedStreamChannel, Strea
@Override
public void shutdownReads() throws IOException {
readsDown = true;
- return;
}
-
+
public boolean isShutdownReads() {
return readsDown;
}
@@ -457,14 +321,13 @@ public class ConnectedStreamChannelMock implements ConnectedStreamChannel, Strea
writesDown = true;
final Thread waiter;
synchronized (this) {
- eof = true;
+ super.setEof();
if (readWaiter == null) {
return;
}
waiter = readWaiter;
}
LockSupport.unpark(waiter);
- return;
}
public boolean isShutdownWrites() {
@@ -610,28 +473,6 @@ public class ConnectedStreamChannelMock implements ConnectedStreamChannel, Strea
return 0;
}
- @Override
- public synchronized int read(ByteBuffer dst) throws IOException {
- if (closed && checkClosed) {
- throw new ClosedChannelException();
- }
- if (readEnabled) {
- try {
- if ((!readBuffer.hasRemaining() || readBuffer.position() == 0 && readBuffer.limit() == readBuffer.capacity()) && eof) {
- return -1;
- }
- if (readBuffer.limit() == readBuffer.capacity() && readBuffer.position() == 0) {
- return 0;
- }
- return Buffers.copy(dst, readBuffer);
- } catch (RuntimeException e) {
- System.out.println("Got exception at attempt of copying contents of dst "+ dst.remaining() + " into read buffer " + readBuffer.remaining());
- throw e;
- }
- }
- return 0;
- }
-
@Override
public synchronized long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
if (closed && checkClosed) {
@@ -648,7 +489,7 @@ public class ConnectedStreamChannelMock implements ConnectedStreamChannel, Strea
}
return 0;
}
-
+
public synchronized boolean allReadDataConsumed() {
return readBuffer.position() == readBuffer.limit();
}
@@ -790,4 +631,12 @@ public class ConnectedStreamChannelMock implements ConnectedStreamChannel, Strea
public void setInfo(String i) {
info = i;
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ shutdownWrites();
+ shutdownReads();
+ }
+
}
=====================================
api/src/test/java/org/xnio/mock/MessageConduitMock.java
=====================================
@@ -0,0 +1,76 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ *
+ * Copyright 2022 Red Hat, Inc. and/or its affiliates, and individual
+ * contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.xnio.mock;
+
+import org.xnio.Buffers;
+import org.xnio.XnioIoThread;
+import org.xnio.XnioWorker;
+import org.xnio.conduits.MessageSinkConduit;
+import org.xnio.conduits.MessageSourceConduit;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Mock for {@code MessageSinkConduit} and {@code MessageSourceConduit}.
+ *
+ * @author <a href="mailto:frainone at redhat.com">Flavia Rainone</a>
+ */
+public class MessageConduitMock extends ConduitMock implements MessageSinkConduit, MessageSourceConduit {
+
+
+ public MessageConduitMock(XnioWorker worker, XnioIoThread xnioIoThread) {
+ super(worker, xnioIoThread);
+ }
+
+ @Override
+ public boolean send(ByteBuffer src) throws IOException {
+ return src.remaining() == write(src);
+ }
+
+ @Override
+ public boolean send(ByteBuffer[] srcs, int offs, int len) throws IOException {
+ return Buffers.remaining(srcs, offs, len) == write(srcs, offs, len);
+ }
+
+ @Override
+ public boolean sendFinal(ByteBuffer src) throws IOException {
+ boolean sent = send(src);
+ terminateWrites();
+ return sent;
+ }
+
+ @Override
+ public boolean sendFinal(ByteBuffer[] srcs, int offs, int len) throws IOException {
+ boolean sent = send(srcs, offs, len);
+ terminateWrites();
+ return sent;
+ }
+
+ @Override
+ public int receive(ByteBuffer dst) throws IOException {
+ return read(dst);
+ }
+
+ @Override
+ public long receive(ByteBuffer[] dsts, int offs, int len) throws IOException {
+ return read(dsts, offs, len);
+ }
+}
\ No newline at end of file
=====================================
api/src/test/java/org/xnio/mock/ReadableByteChannelMock.java
=====================================
@@ -0,0 +1,217 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ *
+ * Copyright 2022 Red Hat, Inc. and/or its affiliates, and individual
+ * contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.xnio.mock;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.StandardCharsets;
+
+import org.xnio.Buffers;
+
+/**
+ * Mock of a connected stream channel.<p>
+ * This channel mock will store everything that is written to it for later comparison, and allows feeding of bytes for
+ * reading.
+ *
+ * @author <a href="mailto:flavia.rainone at jboss.com">Flavia Rainone</a>
+ */
+public class ReadableByteChannelMock implements ReadableByteChannel {
+
+ // read stuff will be taken from this buffer
+ protected ByteBuffer readBuffer = ByteBuffer.allocate(10000);
+ // read stuff can only be read if read is enabled
+ protected boolean readEnabled;
+ // indicates if this channel is closed
+ protected boolean closed = false;
+ protected boolean checkClosed = true;
+ protected boolean eof = false;
+
+ /**
+ * Feeds {@code readData} to read clients.
+ * @param readData data that will be available for reading on this channel mock
+ */
+ public void setReadData(String... readData) {
+ doSetReadData(readData);
+ }
+
+ protected synchronized int doSetReadData(String... readData) {
+ int totalLength = 0;
+ for (String data: readData) {
+ totalLength += data.length();
+ }
+ int position = readBuffer.position();
+ boolean resetPosition = false;
+ if (!readBuffer.hasRemaining()) {
+ readBuffer.compact();
+ } else if(readBuffer.position() > 0 || readBuffer.limit() != readBuffer.capacity()) {
+ if (readBuffer.capacity() - readBuffer.limit() < totalLength) {
+ if (readBuffer.position() > 0 && readBuffer.capacity() - readBuffer.limit() + readBuffer.position() >= totalLength) {
+ readBuffer.compact();
+ }
+ throw new RuntimeException("ReadBuffer is full - not enough space to add more read data");
+ }
+ int limit = readBuffer.limit();
+ readBuffer.position(limit);
+ readBuffer.limit(limit + totalLength);
+ resetPosition = true;
+ }
+ for (String data: readData) {
+ readBuffer.put(data.getBytes(StandardCharsets.UTF_8));
+ }
+ readBuffer.flip();
+ if (resetPosition) {
+ readBuffer.position(position);
+ }
+ return totalLength;
+ }
+
+ /**
+ * Feeds {@code readData} to read clients.
+ * @param readData data that will be available for reading on this channel mock
+ */
+ public void setReadDataWithLength(String... readData) {
+ doSetReadDataWithLength(readData);
+ }
+
+ protected synchronized int doSetReadDataWithLength(String... readData) {
+ if (eof) {
+ throw new IllegalStateException("Cannot add read data once eof is set");
+ }
+ int totalLength = 0;
+ for (String data: readData) {
+ totalLength += data.length();
+ }
+ int position = readBuffer.position();
+ boolean resetPosition = false;
+ if (!readBuffer.hasRemaining()) {
+ readBuffer.compact();
+ } else if(readBuffer.position() > 0 || readBuffer.limit() != readBuffer.capacity()) {
+ if (readBuffer.capacity() - readBuffer.limit() + 4 < totalLength) {
+ if (readBuffer.position() > 0 && readBuffer.capacity() - readBuffer.limit() + readBuffer.position() + 4 >= totalLength) {
+ readBuffer.compact();
+ }
+ throw new RuntimeException("ReadBuffer is full - not enough space to add more read data");
+ }
+ int limit = readBuffer.limit();
+ readBuffer.position(limit);
+ readBuffer.limit(limit + totalLength + 4);
+ resetPosition = true;
+ }
+ readBuffer.putInt(totalLength);
+ for (String data: readData) {
+ readBuffer.put(data.getBytes(StandardCharsets.UTF_8));
+ }
+ readBuffer.flip();
+ if (resetPosition) {
+ readBuffer.position(position);
+ }
+ return totalLength;
+ }
+
+ /**
+ * Feeds {@code readData} to read clients.
+ * @param readData data that will be available for reading on this channel mock
+ */
+ public void setReadDataWithLength(int length, String... readData) {
+ doSetReadDataWithLength(length, readData);
+ }
+
+ protected synchronized int doSetReadDataWithLength(int length, String... readData) {
+ if (eof) {
+ throw new IllegalStateException("Cannot add read data once eof is set");
+ }
+ int totalLength = 0;
+ for (String data: readData) {
+ totalLength += data.length();
+ }
+ int position = readBuffer.position();
+ boolean resetPosition = false;
+ if (!readBuffer.hasRemaining()) {
+ readBuffer.compact();
+ } else if(readBuffer.position() > 0 || readBuffer.limit() != readBuffer.capacity()) {
+ if (readBuffer.capacity() - readBuffer.limit() + 4 < totalLength) {
+ if (readBuffer.position() > 0 && readBuffer.capacity() - readBuffer.limit() + readBuffer.position() + 4 >= totalLength) {
+ readBuffer.compact();
+ }
+ throw new RuntimeException("ReadBuffer is full - not enough space to add more read data");
+ }
+ int limit = readBuffer.limit();
+ readBuffer.position(limit);
+ readBuffer.limit(limit + totalLength + 4);
+ resetPosition = true;
+ }
+ readBuffer.putInt(length);
+ for (String data: readData) {
+ readBuffer.put(data.getBytes(StandardCharsets.UTF_8));
+ }
+ readBuffer.flip();
+ if (resetPosition) {
+ readBuffer.position(position);
+ }
+ return totalLength;
+ }
+
+ public synchronized void setEof() {
+ eof = true;
+ }
+
+ public synchronized void enableRead(boolean enable) {
+ readEnabled = enable;
+ }
+
+ public synchronized void enableClosedCheck(boolean enable) {
+ checkClosed = enable;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return !closed;
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer dst) throws IOException {
+ if (closed && checkClosed) {
+ throw new ClosedChannelException();
+ }
+ if (readEnabled) {
+ try {
+ if ((!readBuffer.hasRemaining() || readBuffer.position() == 0 && readBuffer.limit() == readBuffer.capacity()) && eof) {
+ return -1;
+ }
+ if (readBuffer.limit() == readBuffer.capacity() && readBuffer.position() == 0) {
+ return 0;
+ }
+ return Buffers.copy(dst, readBuffer);
+ } catch (RuntimeException e) {
+ System.out.println("Got exception at attempt of copying contents of dst "+ dst.remaining() + " into read buffer " + readBuffer.remaining());
+ throw e;
+ }
+ }
+ return 0;
+ }
+}
=====================================
nio-impl/pom.xml
=====================================
@@ -31,7 +31,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
- <version>3.8.6.Final</version>
+ <version>3.8.7.Final</version>
</parent>
<properties>
=====================================
pom.xml
=====================================
@@ -32,7 +32,7 @@
<artifactId>xnio-all</artifactId>
<packaging>pom</packaging>
<name>XNIO Parent POM</name>
- <version>3.8.6.Final</version>
+ <version>3.8.7.Final</version>
<description>The aggregator POM of the XNIO project</description>
<licenses>
View it on GitLab: https://salsa.debian.org/java-team/jboss-xnio/-/commit/42d094b0c6ced1746ea3d8b9859cae04570cec90
--
View it on GitLab: https://salsa.debian.org/java-team/jboss-xnio/-/commit/42d094b0c6ced1746ea3d8b9859cae04570cec90
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20220504/a832c34c/attachment.htm>
More information about the pkg-java-commits
mailing list