[Git][java-team/jboss-xnio][master] 3 commits: New upstream version 3.7.3
Markus Koschany
gitlab at salsa.debian.org
Thu Aug 15 00:38:37 BST 2019
Markus Koschany pushed to branch master at Debian Java Maintainers / jboss-xnio
Commits:
a2a44393 by Markus Koschany at 2019-08-14T23:32:28Z
New upstream version 3.7.3
- - - - -
60530894 by Markus Koschany at 2019-08-14T23:32:34Z
Update upstream source from tag 'upstream/3.7.3'
Update to upstream version '3.7.3'
with Debian dir c8d76bd25252c224d67ba93a2702426ff2759c59
- - - - -
b188b7d8 by Markus Koschany at 2019-08-14T23:32:57Z
Update changelog
- - - - -
7 changed files:
- api/pom.xml
- api/src/main/java/org/xnio/ByteBufferSlicePool.java
- + api/src/test/java/org/xnio/ByteBufferSlicePoolTestCase.java
- debian/changelog
- nio-impl/pom.xml
- nio-impl/src/main/java/org/xnio/nio/WatchServiceFileSystemWatcher.java
- pom.xml
Changes:
=====================================
api/pom.xml
=====================================
@@ -37,7 +37,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
- <version>3.7.2.Final</version>
+ <version>3.7.3.Final</version>
</parent>
<dependencies>
=====================================
api/src/main/java/org/xnio/ByteBufferSlicePool.java
=====================================
@@ -19,11 +19,14 @@
package org.xnio;
+import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -35,14 +38,20 @@ import static org.xnio._private.Messages.msg;
* returned pooled buffers. When the buffer is no longer needed, it should be freed back into the pool; failure
* to do so will cause the corresponding buffer area to be unavailable until the buffer is garbage-collected.
*
+ * If the buffer pool is no longer used, it is advisable to invoke {@link #clean()} to make
+ * sure that direct allocated buffers can be reused by a future instance.
+ *
* @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
+ * @author Flavia Rainone
* @deprecated See {@link ByteBufferPool}.
*/
public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
private static final int LOCAL_LENGTH;
+ private static final Queue<ByteBuffer> FREE_DIRECT_BUFFERS;
static {
+ // read thread local size property
String value = AccessController.doPrivileged(new ReadPropertyAction("xnio.bufferpool.threadlocal.size", "12"));
int val;
try {
@@ -51,30 +60,19 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
val = 12;
}
LOCAL_LENGTH = val;
+
+ // free direct buffers queue to keep direct buffers that are out of reach because of garbage collection of pools
+ FREE_DIRECT_BUFFERS = new ConcurrentLinkedQueue<>();
}
- private final Set<Ref> refSet = Collections.synchronizedSet(new HashSet<Ref>());
+ private final Set<Ref> refSet = Collections.synchronizedSet(new HashSet<>());
private final Queue<Slice> sliceQueue;
private final BufferAllocator<ByteBuffer> allocator;
private final int bufferSize;
private final int buffersPerRegion;
private final int threadLocalQueueSize;
- private final ThreadLocal<ThreadLocalCache> localQueueHolder = new ThreadLocal<ThreadLocalCache>() {
- protected ThreadLocalCache initialValue() {
- //noinspection serial
- return new ThreadLocalCache();
- }
-
- public void remove() {
- final ArrayDeque<Slice> deque = get().queue;
- Slice slice = deque.poll();
- while (slice != null) {
- doFree(slice);
- slice = deque.poll();
- }
- super.remove();
- }
- };
+ private final List<ByteBuffer> directBuffers;
+ private final ThreadLocal<ThreadLocalCache> localQueueHolder = new ThreadLocalCacheWrapper(this);
/**
* Construct a new instance.
@@ -94,8 +92,14 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
buffersPerRegion = maxRegionSize / bufferSize;
this.bufferSize = bufferSize;
this.allocator = allocator;
- sliceQueue = new ConcurrentLinkedQueue<Slice>();
+ sliceQueue = new ConcurrentLinkedQueue<>();
this.threadLocalQueueSize = threadLocalQueueSize;
+ // handle direct byte buffer allocation for reuse of direct buffers
+ if (allocator == BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR) {
+ directBuffers = Collections.synchronizedList(new ArrayList<>());
+ } else {
+ directBuffers = null;
+ }
}
/**
@@ -142,31 +146,72 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
if (slice != null) {
return new PooledByteBuffer(slice, slice.slice());
}
- final int bufferSize = this.bufferSize;
- final int buffersPerRegion = this.buffersPerRegion;
- final ByteBuffer region = allocator.allocate(buffersPerRegion * bufferSize);
- int idx = bufferSize;
- for (int i = 1; i < buffersPerRegion; i ++) {
- sliceQueue.add(new Slice(region, idx, bufferSize));
- idx += bufferSize;
- }
- final Slice newSlice = new Slice(region, 0, bufferSize);
+ final Slice newSlice = allocateSlices(buffersPerRegion, bufferSize);
return new PooledByteBuffer(newSlice, newSlice.slice());
}
}
+ private Slice allocateSlices(final int buffersPerRegion, final int bufferSize) {
+ // only true if using direct allocation
+ if (directBuffers != null) {
+ ByteBuffer region = FREE_DIRECT_BUFFERS.poll();
+ try {
+ if (region != null) {
+ return sliceReusedBuffer(region, buffersPerRegion, bufferSize);
+ }
+ region = allocator.allocate(buffersPerRegion * bufferSize);
+ return sliceAllocatedBuffer(region, buffersPerRegion, bufferSize);
+ } finally {
+ directBuffers.add(region);
+ }
+ }
+ return sliceAllocatedBuffer(
+ allocator.allocate(buffersPerRegion * bufferSize),
+ buffersPerRegion, bufferSize);
+ }
+
+ private Slice sliceReusedBuffer(final ByteBuffer region, final int buffersPerRegion, final int bufferSize) {
+ int maxI = Math.min(buffersPerRegion, region.capacity() / bufferSize);
+ // create slices
+ int idx = bufferSize;
+ for (int i = 1; i < maxI; i++) {
+ sliceQueue.add(new Slice(region, idx, bufferSize));
+ idx += bufferSize;
+ }
+
+ if (maxI == 0)
+ return allocateSlices(buffersPerRegion, bufferSize);
+ if (maxI < buffersPerRegion)
+ sliceQueue.add(allocateSlices(buffersPerRegion - maxI, bufferSize));
+ return new Slice(region, 0, bufferSize);
+
+ }
+
+ private Slice sliceAllocatedBuffer(final ByteBuffer region, final int buffersPerRegion, final int bufferSize) {
+ // create slices
+ int idx = bufferSize;
+ for (int i = 1; i < buffersPerRegion; i++) {
+ sliceQueue.add(new Slice(region, idx, bufferSize));
+ idx += bufferSize;
+ }
+ return new Slice(region, 0, bufferSize);
+ }
+
/**
- * Cleans all ThreadLocal caches
+ * Cleans the pool, removing references to any buffers inside it.
+ * Should be invoked on pool disposal, when the pool will no longer be
+ * used.
*/
public void clean() {
ThreadLocalCache localCache = localQueueHolder.get();
if (!localCache.queue.isEmpty()) {
localCache.queue.clear();
}
-
if(!sliceQueue.isEmpty()) {
sliceQueue.clear();
}
+ // pass everything that is directly allocated to free direct buffers
+ FREE_DIRECT_BUFFERS.addAll(directBuffers);
}
/**
@@ -176,6 +221,19 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
return bufferSize;
}
+ private ThreadLocalCache createThreadLocalCache() {
+ return new ThreadLocalCache(this);
+ }
+
+ private void freeThreadLocalCache(ThreadLocalCache cache) {
+ final ArrayDeque<Slice> deque = cache.queue;
+ Slice slice = deque.poll();
+ while (slice != null) {
+ doFree(slice);
+ slice = deque.poll();
+ }
+ }
+
private void doFree(Slice region) {
if (threadLocalQueueSize > 0) {
final ThreadLocalCache localCache = localQueueHolder.get();
@@ -239,11 +297,15 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
}
}
- private final class Slice {
+ // to prevent memory leaks via thread internal map for thread local, we need to
+ // make this class static or else the outer ByteBufferSlicePool
+ // is never collected while the thread is active
+ // Thread -> thread local map -> ThreadLocalCacheWrapper -> ThreadLocalCache -> queue -> Slices -> ByteBufferSlicePool
+ private static final class Slice {
private final ByteBuffer parent;
private Slice(final ByteBuffer parent, final int start, final int size) {
- this.parent = (ByteBuffer)parent.duplicate().position(start).limit(start+size);
+ this.parent = (ByteBuffer) parent.duplicate().position(start).limit(start+size);
}
ByteBuffer slice() {
@@ -265,26 +327,68 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
}
}
- private final class ThreadLocalCache {
+ final static class ThreadLocalCache {
+ // to prevent memory leaks via thread internal map for thread local, we need to
+ // weakly reference the outer ByteBufferSlicePool
+ // or else the pool is never collected while the thread is active
+ // Thread -> thread local map -> ThreadLocalCache -> pool
+ final WeakReference<ByteBufferSlicePool> pool;
- final ArrayDeque<Slice> queue = new ArrayDeque<Slice>(threadLocalQueueSize) {
+ // internal queue of slices; used to prevent all threads synchronizing on a single queue
+ final ArrayDeque<Slice> queue;
+ // indicates how many slices should be returned to queue on free
+ int outstanding = 0;
- /**
- * This sucks but there's no other way to ensure these buffers are returned to the pool.
- */
- protected void finalize() {
- final ArrayDeque<Slice> deque = queue;
- Slice slice = deque.poll();
- while (slice != null) {
- doFree(slice);
- slice = deque.poll();
+ ThreadLocalCache(ByteBufferSlicePool pool) {
+ this.pool = new WeakReference<>(pool);
+ this.queue = new ArrayDeque<Slice>(pool.threadLocalQueueSize) {
+ /**
+ * This sucks but there's no other way to ensure these buffers are returned to the pool.
+ */
+ protected void finalize() {
+ final ByteBufferSlicePool pool = ThreadLocalCache.this.pool.get();
+ if (pool == null)
+ return;
+ final ArrayDeque<Slice> deque = queue;
+ Slice slice = deque.poll();
+ while (slice != null) {
+ pool.doFree(slice);
+ slice = deque.poll();
+ }
}
- }
- };
+ };
+ }
+ }
- int outstanding = 0;
+ private static class ThreadLocalCacheWrapper extends ThreadLocal<ThreadLocalCache> {
+ // to prevent memory leaks via thread internal map for thread local, we need to
+ // weakly reference the outer ByteBufferSlicePool
+ // or else the pool is never collected while the thread is active
+ // Thread -> thread local map -> ThreadLocalCacheWrapper -> pool
+ private final WeakReference<ByteBufferSlicePool> pool;
+
+ ThreadLocalCacheWrapper(ByteBufferSlicePool pool) {
+ this.pool = new WeakReference<>(pool);
+ }
- ThreadLocalCache() {
+ protected ThreadLocalCache initialValue() {
+ final ByteBufferSlicePool pool = this.pool.get();
+ if (pool != null) {
+ //noinspection serial
+ return pool.createThreadLocalCache();
+ }
+ return null;
+ }
+
+ public void remove() {
+ final ByteBufferSlicePool pool = this.pool.get();
+ final ThreadLocalCache cache = get();
+ if (pool != null && cache != null) {
+ //noinspection serial
+ pool.freeThreadLocalCache(cache);
+ }
+ super.remove();
}
}
+
}
=====================================
api/src/test/java/org/xnio/ByteBufferSlicePoolTestCase.java
=====================================
@@ -0,0 +1,155 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ *
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates, and individual
+ * contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.xnio;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link ByteBufferSlicePool}.
+ *
+ * @author <a href="mailto:flavia.rainone at jboss.com">Flavia Rainone</a>
+ *
+ */
+public class ByteBufferSlicePoolTestCase {
+
+ @Test
+ public void basicSlicePool() {
+ ByteBufferSlicePool slicePool = new ByteBufferSlicePool(10, 10);
+ for (int i = 0; i < 10; i++){
+ Pooled<ByteBuffer> pooledBuffer = slicePool.allocate();
+ assertNotNull(pooledBuffer);
+ ByteBuffer buffer = pooledBuffer.getResource();
+ assertNotNull(buffer);
+ assertEquals(10, buffer.capacity());
+ assertEquals(0, buffer.position());
+ pooledBuffer.free();
+ boolean failed = false;
+ try {
+ pooledBuffer.getResource();
+ } catch (IllegalStateException expected) {
+ failed = true;
+ }
+ assertTrue(failed);
+ }
+ }
+
+ @Test
+ public void bufferAllocatorSlicePool() {
+ ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.BYTE_BUFFER_ALLOCATOR, 2, 5);
+ for (int i = 0; i < 2; i++){
+ Pooled<ByteBuffer> pooledBuffer = slicePool.allocate();
+ assertNotNull(pooledBuffer);
+ ByteBuffer buffer = pooledBuffer.getResource();
+ assertNotNull(buffer);
+ assertEquals(2, buffer.capacity());
+ assertEquals(0, buffer.position());
+ pooledBuffer.free();
+ boolean failed = false;
+ try {
+ pooledBuffer.getResource();
+ } catch (IllegalStateException expected) {
+ failed = true;
+ }
+ assertTrue(failed);
+ }
+ }
+
+ @Test
+ public void bufferAllocatorNoThreadLocalSlicePool() {
+ ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.BYTE_BUFFER_ALLOCATOR, 8, 12, 0);
+ Pooled<ByteBuffer> pooledBuffer = slicePool.allocate();
+ assertNotNull(pooledBuffer);
+ ByteBuffer buffer = pooledBuffer.getResource();
+ assertNotNull(buffer);
+ assertEquals(8, buffer.capacity());
+ assertEquals(0, buffer.position());
+ pooledBuffer.free();
+ boolean failed = false;
+ try {
+ pooledBuffer.getResource();
+ } catch (IllegalStateException expected) {
+ failed = true;
+ }
+ assertTrue(failed);
+ }
+
+ @Test
+ public void directBufferAllocator() throws InterruptedException {
+ for (int i = 0; i < 100; i++)
+ stressDirectBufferReuse();
+ }
+
+ private void stressDirectBufferReuse() throws InterruptedException {
+ ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
+ slicePool.allocate();
+ slicePool.clean();
+
+ // reusing the same size
+ slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
+ slicePool.allocate();
+ slicePool.clean();
+
+ // reusing for a smaller size
+ slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 7, 21);
+ slicePool.allocate();
+ slicePool.clean();
+
+ // reusing for a bigger size
+ slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 10, 20);
+ slicePool.allocate();
+ slicePool.clean();
+ }
+
+
+ @Test
+ public void directBufferAllocator2() throws InterruptedException {
+ for (int i = 0; i < 100; i++)
+ stressDirectBufferReuse2();
+ }
+
+ private void stressDirectBufferReuse2() throws InterruptedException {
+ ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
+ slicePool.allocate();
+ slicePool.clean();
+
+ // reusing the same size
+ slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
+ slicePool.allocate();
+ slicePool.clean();
+
+ // reusing for a smaller size
+ slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 7, 21);
+ slicePool.allocate();
+ slicePool.clean();
+
+ // reusing for a bigger size whose first buffer is bigger than all previous allocated byte buffer regions
+ slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 250, 10000);
+ slicePool.allocate();
+ slicePool.clean();
+ }
+
+}
\ No newline at end of file
=====================================
debian/changelog
=====================================
@@ -1,3 +1,9 @@
+jboss-xnio (3.7.3-1) unstable; urgency=medium
+
+ * New upstream version 3.7.3.
+
+ -- Markus Koschany <apo at debian.org> Thu, 15 Aug 2019 01:32:45 +0200
+
jboss-xnio (3.7.2-1) unstable; urgency=medium
* New upstream version 3.7.2.
=====================================
nio-impl/pom.xml
=====================================
@@ -31,7 +31,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
- <version>3.7.2.Final</version>
+ <version>3.7.3.Final</version>
</parent>
<properties>
=====================================
nio-impl/src/main/java/org/xnio/nio/WatchServiceFileSystemWatcher.java
=====================================
@@ -126,6 +126,16 @@ class WatchServiceFileSystemWatcher implements FileSystemWatcher, Runnable {
while (it.hasNext()) {
FileChangeEvent event = it.next();
if (event.getType() == FileChangeEvent.Type.MODIFIED) {
+ if (addedFiles.contains(event.getFile()) &&
+ deletedFiles.contains(event.getFile())) {
+ // XNIO-344
+ // All file change events (ADDED, REMOVED and MODIFIED) occurred here.
+ // This happens when an updated file is moved from the different
+ // filesystems or the directory having different project quota on Linux.
+ // ADDED and REMOVED events will be removed in the latter conditional branching.
+ // So, this MODIFIED event needs to be kept for the file change notification.
+ continue;
+ }
if (addedFiles.contains(event.getFile()) ||
deletedFiles.contains(event.getFile())) {
it.remove();
=====================================
pom.xml
=====================================
@@ -32,7 +32,7 @@
<artifactId>xnio-all</artifactId>
<packaging>pom</packaging>
<name>XNIO Parent POM</name>
- <version>3.7.2.Final</version>
+ <version>3.7.3.Final</version>
<description>The aggregator POM of the XNIO project</description>
<licenses>
View it on GitLab: https://salsa.debian.org/java-team/jboss-xnio/compare/639d9b6f51410cf11a29f85dd4a54f2d1db6230e...b188b7d8ec71f8510f39988a24fbad4fa11e92ef
--
View it on GitLab: https://salsa.debian.org/java-team/jboss-xnio/compare/639d9b6f51410cf11a29f85dd4a54f2d1db6230e...b188b7d8ec71f8510f39988a24fbad4fa11e92ef
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20190814/e8a41eb3/attachment.html>
More information about the pkg-java-commits
mailing list