[Git][java-team/jboss-xnio][master] 3 commits: New upstream version 3.7.3

Markus Koschany gitlab at salsa.debian.org
Thu Aug 15 00:38:37 BST 2019



Markus Koschany pushed to branch master at Debian Java Maintainers / jboss-xnio


Commits:
a2a44393 by Markus Koschany at 2019-08-14T23:32:28Z
New upstream version 3.7.3
- - - - -
60530894 by Markus Koschany at 2019-08-14T23:32:34Z
Update upstream source from tag 'upstream/3.7.3'

Update to upstream version '3.7.3'
with Debian dir c8d76bd25252c224d67ba93a2702426ff2759c59
- - - - -
b188b7d8 by Markus Koschany at 2019-08-14T23:32:57Z
Update changelog

- - - - -


7 changed files:

- api/pom.xml
- api/src/main/java/org/xnio/ByteBufferSlicePool.java
- + api/src/test/java/org/xnio/ByteBufferSlicePoolTestCase.java
- debian/changelog
- nio-impl/pom.xml
- nio-impl/src/main/java/org/xnio/nio/WatchServiceFileSystemWatcher.java
- pom.xml


Changes:

=====================================
api/pom.xml
=====================================
@@ -37,7 +37,7 @@
     <parent>
         <groupId>org.jboss.xnio</groupId>
         <artifactId>xnio-all</artifactId>
-        <version>3.7.2.Final</version>
+        <version>3.7.3.Final</version>
     </parent>
 
     <dependencies>


=====================================
api/src/main/java/org/xnio/ByteBufferSlicePool.java
=====================================
@@ -19,11 +19,14 @@
 
 package org.xnio;
 
+import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.security.AccessController;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -35,14 +38,20 @@ import static org.xnio._private.Messages.msg;
  * returned pooled buffers.  When the buffer is no longer needed, it should be freed back into the pool; failure
  * to do so will cause the corresponding buffer area to be unavailable until the buffer is garbage-collected.
  *
+ * If the buffer pool is no longer used, it is advisable to invoke {@link #clean()} to make
+ * sure that direct allocated buffers can be reused by a future instance.
+ *
  * @author <a href="mailto:david.lloyd at redhat.com">David M. Lloyd</a>
+ * @author Flavia Rainone
  * @deprecated See {@link ByteBufferPool}.
  */
 public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
 
     private static final int LOCAL_LENGTH;
+    private static final Queue<ByteBuffer> FREE_DIRECT_BUFFERS;
 
     static {
+        // read thread local size property
         String value = AccessController.doPrivileged(new ReadPropertyAction("xnio.bufferpool.threadlocal.size", "12"));
         int val;
         try {
@@ -51,30 +60,19 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
             val = 12;
         }
         LOCAL_LENGTH = val;
+
+        // free direct buffers queue to keep direct buffers that are out of reach because of garbage collection of pools
+        FREE_DIRECT_BUFFERS = new ConcurrentLinkedQueue<>();
     }
 
-    private final Set<Ref> refSet = Collections.synchronizedSet(new HashSet<Ref>());
+    private final Set<Ref> refSet = Collections.synchronizedSet(new HashSet<>());
     private final Queue<Slice> sliceQueue;
     private final BufferAllocator<ByteBuffer> allocator;
     private final int bufferSize;
     private final int buffersPerRegion;
     private final int threadLocalQueueSize;
-    private final ThreadLocal<ThreadLocalCache> localQueueHolder = new ThreadLocal<ThreadLocalCache>() {
-        protected ThreadLocalCache initialValue() {
-            //noinspection serial
-            return new ThreadLocalCache();
-        }
-
-        public void remove() {
-            final ArrayDeque<Slice> deque = get().queue;
-            Slice slice = deque.poll();
-            while (slice != null) {
-                doFree(slice);
-                slice = deque.poll();
-            }
-            super.remove();
-        }
-    };
+    private final List<ByteBuffer> directBuffers;
+    private final ThreadLocal<ThreadLocalCache> localQueueHolder = new ThreadLocalCacheWrapper(this);
 
     /**
      * Construct a new instance.
@@ -94,8 +92,14 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
         buffersPerRegion = maxRegionSize / bufferSize;
         this.bufferSize = bufferSize;
         this.allocator = allocator;
-        sliceQueue = new ConcurrentLinkedQueue<Slice>();
+        sliceQueue = new ConcurrentLinkedQueue<>();
         this.threadLocalQueueSize = threadLocalQueueSize;
+        // handle direct byte buffer allocation for reuse of direct buffers
+        if (allocator == BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR) {
+            directBuffers = Collections.synchronizedList(new ArrayList<>());
+        } else {
+            directBuffers = null;
+        }
     }
 
     /**
@@ -142,31 +146,72 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
             if (slice != null) {
                 return new PooledByteBuffer(slice, slice.slice());
             }
-            final int bufferSize = this.bufferSize;
-            final int buffersPerRegion = this.buffersPerRegion;
-            final ByteBuffer region = allocator.allocate(buffersPerRegion * bufferSize);
-            int idx = bufferSize;
-            for (int i = 1; i < buffersPerRegion; i ++) {
-                sliceQueue.add(new Slice(region, idx, bufferSize));
-                idx += bufferSize;
-            }
-            final Slice newSlice = new Slice(region, 0, bufferSize);
+            final Slice newSlice = allocateSlices(buffersPerRegion, bufferSize);
             return new PooledByteBuffer(newSlice, newSlice.slice());
         }
     }
 
+    private Slice allocateSlices(final int buffersPerRegion, final int bufferSize) {
+        // only true if using direct allocation
+        if (directBuffers != null) {
+            ByteBuffer region = FREE_DIRECT_BUFFERS.poll();
+            try {
+                if (region != null) {
+                    return sliceReusedBuffer(region, buffersPerRegion, bufferSize);
+                }
+                region = allocator.allocate(buffersPerRegion * bufferSize);
+                return sliceAllocatedBuffer(region, buffersPerRegion, bufferSize);
+            } finally {
+                directBuffers.add(region);
+            }
+        }
+        return sliceAllocatedBuffer(
+                allocator.allocate(buffersPerRegion * bufferSize),
+                buffersPerRegion, bufferSize);
+    }
+
+    private Slice sliceReusedBuffer(final ByteBuffer region, final int buffersPerRegion, final int bufferSize) {
+        int maxI = Math.min(buffersPerRegion, region.capacity() / bufferSize);
+        // create slices
+        int idx = bufferSize;
+        for (int i = 1; i < maxI; i++) {
+            sliceQueue.add(new Slice(region, idx, bufferSize));
+            idx += bufferSize;
+        }
+
+        if (maxI == 0)
+            return allocateSlices(buffersPerRegion, bufferSize);
+        if (maxI < buffersPerRegion)
+            sliceQueue.add(allocateSlices(buffersPerRegion - maxI, bufferSize));
+        return new Slice(region, 0, bufferSize);
+
+    }
+
+    private Slice sliceAllocatedBuffer(final ByteBuffer region, final int buffersPerRegion, final int bufferSize) {
+        // create slices
+        int idx = bufferSize;
+        for (int i = 1; i < buffersPerRegion; i++) {
+            sliceQueue.add(new Slice(region, idx, bufferSize));
+            idx += bufferSize;
+        }
+        return new Slice(region, 0, bufferSize);
+    }
+
     /**
-     * Cleans all ThreadLocal caches
+     * Cleans the pool, removing references to any buffers inside it.
+     * Should be invoked on pool disposal, when the pool will no longer be
+     * used.
      */
     public void clean() {
         ThreadLocalCache localCache = localQueueHolder.get();
         if (!localCache.queue.isEmpty()) {
             localCache.queue.clear();
         }
-
         if(!sliceQueue.isEmpty()) {
             sliceQueue.clear();
         }
+        // pass everything that is directly allocated to free direct buffers
+        FREE_DIRECT_BUFFERS.addAll(directBuffers);
     }
 
     /**
@@ -176,6 +221,19 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
         return bufferSize;
     }
 
+    private ThreadLocalCache createThreadLocalCache() {
+        return new ThreadLocalCache(this);
+    }
+
+    private void freeThreadLocalCache(ThreadLocalCache cache) {
+        final ArrayDeque<Slice> deque = cache.queue;
+        Slice slice = deque.poll();
+        while (slice != null) {
+            doFree(slice);
+            slice = deque.poll();
+        }
+    }
+
     private void doFree(Slice region) {
         if (threadLocalQueueSize > 0) {
             final ThreadLocalCache localCache = localQueueHolder.get();
@@ -239,11 +297,15 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
         }
     }
 
-    private final class Slice {
+    // to prevent memory leaks via thread internal map for thread local, we need to
+    // make this class static or else the outer ByteBufferSlicePool
+    // is never collected while the thread is active
+    // Thread -> thread local map -> ThreadLocalCacheWrapper -> ThreadLocalCache -> queue -> Slices -> ByteBufferSlicePool
+    private static final class Slice {
         private final ByteBuffer parent;
 
         private Slice(final ByteBuffer parent, final int start, final int size) {
-            this.parent = (ByteBuffer)parent.duplicate().position(start).limit(start+size);
+            this.parent = (ByteBuffer) parent.duplicate().position(start).limit(start+size);
         }
 
         ByteBuffer slice() {
@@ -265,26 +327,68 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
         }
     }
 
-    private final class ThreadLocalCache {
+    final static class ThreadLocalCache {
+        // to prevent memory leaks via thread internal map for thread local, we need to
+        // weakly reference the outer ByteBufferSlicePool
+        // or else the pool is never collected while the thread is active
+        // Thread -> thread local map -> ThreadLocalCache -> pool
+        final WeakReference<ByteBufferSlicePool> pool;
 
-        final ArrayDeque<Slice> queue =  new ArrayDeque<Slice>(threadLocalQueueSize) {
+        // internal queue of slices; used to prevent all threads synchronizing on a single queue
+        final ArrayDeque<Slice> queue;
+        // indicates how many slices should be returned to queue on free
+        int outstanding = 0;
 
-            /**
-             * This sucks but there's no other way to ensure these buffers are returned to the pool.
-             */
-            protected void finalize() {
-                final ArrayDeque<Slice> deque = queue;
-                Slice slice = deque.poll();
-                while (slice != null) {
-                    doFree(slice);
-                    slice = deque.poll();
+        ThreadLocalCache(ByteBufferSlicePool pool) {
+            this.pool = new WeakReference<>(pool);
+            this.queue = new ArrayDeque<Slice>(pool.threadLocalQueueSize) {
+                /**
+                 * This sucks but there's no other way to ensure these buffers are returned to the pool.
+                 */
+                protected void finalize() {
+                    final ByteBufferSlicePool pool = ThreadLocalCache.this.pool.get();
+                    if (pool == null)
+                        return;
+                    final ArrayDeque<Slice> deque = queue;
+                    Slice slice = deque.poll();
+                    while (slice != null) {
+                        pool.doFree(slice);
+                        slice = deque.poll();
+                    }
                 }
-            }
-        };
+            };
+        }
+    }
 
-        int outstanding = 0;
+    private static class ThreadLocalCacheWrapper extends ThreadLocal<ThreadLocalCache> {
+        // to prevent memory leaks via thread internal map for thread local, we need to
+        // weakly reference the outer ByteBufferSlicePool
+        // or else the pool is never collected while the thread is active
+        // Thread -> thread local map -> ThreadLocalCacheWrapper -> pool
+        private final WeakReference<ByteBufferSlicePool> pool;
+
+        ThreadLocalCacheWrapper(ByteBufferSlicePool pool) {
+            this.pool = new WeakReference<>(pool);
+        }
 
-        ThreadLocalCache() {
+        protected ThreadLocalCache initialValue() {
+            final ByteBufferSlicePool pool = this.pool.get();
+            if (pool != null) {
+                //noinspection serial
+                return pool.createThreadLocalCache();
+            }
+            return null;
+        }
+
+        public void remove() {
+            final ByteBufferSlicePool pool = this.pool.get();
+            final ThreadLocalCache cache = get();
+            if (pool != null && cache != null) {
+                //noinspection serial
+                pool.freeThreadLocalCache(cache);
+            }
+            super.remove();
         }
     }
+
 }


=====================================
api/src/test/java/org/xnio/ByteBufferSlicePoolTestCase.java
=====================================
@@ -0,0 +1,155 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ *
+ * Copyright 2019 Red Hat, Inc. and/or its affiliates, and individual
+ * contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.xnio;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link ByteBufferSlicePool}.
+ * 
+ * @author <a href="mailto:flavia.rainone at jboss.com">Flavia Rainone</a>
+ *
+ */
+public class ByteBufferSlicePoolTestCase {
+
+    @Test
+    public void basicSlicePool() {
+        ByteBufferSlicePool slicePool = new ByteBufferSlicePool(10, 10);
+        for (int i = 0; i < 10; i++){
+            Pooled<ByteBuffer> pooledBuffer = slicePool.allocate();
+            assertNotNull(pooledBuffer);
+            ByteBuffer buffer = pooledBuffer.getResource();
+            assertNotNull(buffer);
+            assertEquals(10, buffer.capacity());
+            assertEquals(0, buffer.position());
+            pooledBuffer.free();
+            boolean failed = false;
+            try {
+                pooledBuffer.getResource();
+            } catch (IllegalStateException expected) {
+                failed = true;
+            }
+            assertTrue(failed);
+        }
+    }
+
+    @Test
+    public void bufferAllocatorSlicePool() {
+        ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.BYTE_BUFFER_ALLOCATOR, 2, 5);
+        for (int i = 0; i < 2; i++){
+            Pooled<ByteBuffer> pooledBuffer = slicePool.allocate();
+            assertNotNull(pooledBuffer);
+            ByteBuffer buffer = pooledBuffer.getResource();
+            assertNotNull(buffer);
+            assertEquals(2, buffer.capacity());
+            assertEquals(0, buffer.position());
+            pooledBuffer.free();
+            boolean failed = false;
+            try {
+                pooledBuffer.getResource();
+            } catch (IllegalStateException expected) {
+                failed = true;
+            }
+            assertTrue(failed);
+        }
+    }
+
+    @Test
+    public void bufferAllocatorNoThreadLocalSlicePool() {
+        ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.BYTE_BUFFER_ALLOCATOR, 8, 12, 0);
+        Pooled<ByteBuffer> pooledBuffer = slicePool.allocate();
+        assertNotNull(pooledBuffer);
+        ByteBuffer buffer = pooledBuffer.getResource();
+        assertNotNull(buffer);
+        assertEquals(8, buffer.capacity());
+        assertEquals(0, buffer.position());
+        pooledBuffer.free();
+        boolean failed = false;
+        try {
+            pooledBuffer.getResource();
+        } catch (IllegalStateException expected) {
+            failed = true;
+        }
+        assertTrue(failed);
+    }
+
+    @Test
+    public void directBufferAllocator() throws InterruptedException {
+        for (int i = 0; i < 100; i++)
+            stressDirectBufferReuse();
+    }
+
+    private void stressDirectBufferReuse() throws InterruptedException {
+        ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
+        slicePool.allocate();
+        slicePool.clean();
+
+        // reusing the same size
+        slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
+        slicePool.allocate();
+        slicePool.clean();
+
+        // reusing for a smaller size
+        slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 7, 21);
+        slicePool.allocate();
+        slicePool.clean();
+
+        // reusing for a bigger size
+        slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 10, 20);
+        slicePool.allocate();
+        slicePool.clean();
+    }
+
+
+    @Test
+    public void directBufferAllocator2() throws InterruptedException {
+        for (int i = 0; i < 100; i++)
+            stressDirectBufferReuse2();
+    }
+
+    private void stressDirectBufferReuse2() throws InterruptedException {
+        ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
+        slicePool.allocate();
+        slicePool.clean();
+
+        // reusing the same size
+        slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
+        slicePool.allocate();
+        slicePool.clean();
+
+        // reusing for a smaller size
+        slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 7, 21);
+        slicePool.allocate();
+        slicePool.clean();
+
+        // reusing for a bigger size whose first buffer is bigger than all previous allocated byte buffer regions
+        slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 250, 10000);
+        slicePool.allocate();
+        slicePool.clean();
+    }
+
+}
\ No newline at end of file


=====================================
debian/changelog
=====================================
@@ -1,3 +1,9 @@
+jboss-xnio (3.7.3-1) unstable; urgency=medium
+
+  * New upstream version 3.7.3.
+
+ -- Markus Koschany <apo at debian.org>  Thu, 15 Aug 2019 01:32:45 +0200
+
 jboss-xnio (3.7.2-1) unstable; urgency=medium
 
   * New upstream version 3.7.2.


=====================================
nio-impl/pom.xml
=====================================
@@ -31,7 +31,7 @@
     <parent>
         <groupId>org.jboss.xnio</groupId>
         <artifactId>xnio-all</artifactId>
-        <version>3.7.2.Final</version>
+        <version>3.7.3.Final</version>
     </parent>
     
     <properties>


=====================================
nio-impl/src/main/java/org/xnio/nio/WatchServiceFileSystemWatcher.java
=====================================
@@ -126,6 +126,16 @@ class WatchServiceFileSystemWatcher implements FileSystemWatcher, Runnable {
                             while (it.hasNext()) {
                                 FileChangeEvent event = it.next();
                                 if (event.getType() == FileChangeEvent.Type.MODIFIED) {
+                                    if (addedFiles.contains(event.getFile()) &&
+                                            deletedFiles.contains(event.getFile())) {
+                                        // XNIO-344
+                                        // All file change events (ADDED, REMOVED and MODIFIED) occurred here.
+                                        // This happens when an updated file is moved from the different
+                                        // filesystems or the directory having different project quota on Linux.
+                                        // ADDED and REMOVED events will be removed in the latter conditional branching.
+                                        // So, this MODIFIED event needs to be kept for the file change notification.
+                                        continue;
+                                    }
                                     if (addedFiles.contains(event.getFile()) ||
                                             deletedFiles.contains(event.getFile())) {
                                         it.remove();


=====================================
pom.xml
=====================================
@@ -32,7 +32,7 @@
     <artifactId>xnio-all</artifactId>
     <packaging>pom</packaging>
     <name>XNIO Parent POM</name>
-    <version>3.7.2.Final</version>
+    <version>3.7.3.Final</version>
     <description>The aggregator POM of the XNIO project</description>
 
     <licenses>



View it on GitLab: https://salsa.debian.org/java-team/jboss-xnio/compare/639d9b6f51410cf11a29f85dd4a54f2d1db6230e...b188b7d8ec71f8510f39988a24fbad4fa11e92ef

-- 
View it on GitLab: https://salsa.debian.org/java-team/jboss-xnio/compare/639d9b6f51410cf11a29f85dd4a54f2d1db6230e...b188b7d8ec71f8510f39988a24fbad4fa11e92ef
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20190814/e8a41eb3/attachment.html>


More information about the pkg-java-commits mailing list