[Git][java-team/snappy-java][upstream] 2 commits: New upstream version 1.1.7.4
Emmanuel Bourg
gitlab at salsa.debian.org
Mon Jun 15 22:28:20 BST 2020
Emmanuel Bourg pushed to branch upstream at Debian Java Maintainers / snappy-java
Commits:
b4ada638 by Emmanuel Bourg at 2020-06-15T23:16:59+02:00
New upstream version 1.1.7.4
- - - - -
ae0e9185 by Emmanuel Bourg at 2020-06-15T23:17:12+02:00
New upstream version 1.1.7.5
- - - - -
17 changed files:
- + .gitattributes
- Makefile
- Makefile.common
- Milestone.md
- build.sbt
- project/build.properties
- project/plugins.sbt
- src/main/java/org/xerial/snappy/SnappyFramed.java
- src/main/java/org/xerial/snappy/SnappyFramedInputStream.java
- src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java
- + src/main/java/org/xerial/snappy/pool/BufferPool.java
- + src/main/java/org/xerial/snappy/pool/CachingBufferPool.java
- + src/main/java/org/xerial/snappy/pool/DefaultPoolFactory.java
- + src/main/java/org/xerial/snappy/pool/DirectByteBuffers.java
- + src/main/java/org/xerial/snappy/pool/QuiescentBufferPool.java
- + src/test/java/org/xerial/snappy/pool/CachingBufferPoolTest.java
- version.sbt
Changes:
=====================================
.gitattributes
=====================================
@@ -0,0 +1,2 @@
+sbt text eol=lf
+
=====================================
Makefile
=====================================
@@ -90,17 +90,15 @@ snappy-header: $(SNAPPY_CMAKE_CACHE)
$(TARGET)/jni-classes/org/xerial/snappy/SnappyNative.class: $(SRC)/org/xerial/snappy/SnappyNative.java
@mkdir -p $(TARGET)/jni-classes
- $(JAVAC) -source 1.7 -target 1.7 -d $(TARGET)/jni-classes -sourcepath $(SRC) $<
+ $(JAVAC) -source 1.7 -target 1.7 -h $(SRC)/org/xerial/snappy/ -d $(TARGET)/jni-classes -sourcepath $(SRC) $<
$(SRC)/org/xerial/snappy/SnappyNative.h: $(TARGET)/jni-classes/org/xerial/snappy/SnappyNative.class
- $(JAVAH) -force -classpath $(TARGET)/jni-classes -o $@ org.xerial.snappy.SnappyNative
$(TARGET)/jni-classes/org/xerial/snappy/BitShuffleNative.class: $(SRC)/org/xerial/snappy/BitShuffleNative.java
@mkdir -p $(TARGET)/jni-classes
- $(JAVAC) -source 1.7 -target 1.7 -d $(TARGET)/jni-classes -sourcepath $(SRC) $<
+ $(JAVAC) -source 1.7 -target 1.7 -h $(SRC)/org/xerial/snappy/ -d $(TARGET)/jni-classes -sourcepath $(SRC) $<
$(SRC)/org/xerial/snappy/BitShuffleNative.h: $(TARGET)/jni-classes/org/xerial/snappy/BitShuffleNative.class
- $(JAVAH) -force -classpath $(TARGET)/jni-classes -o $@ org.xerial.snappy.BitShuffleNative
$(SNAPPY_SRC): $(SNAPPY_GIT_UNPACKED)
=====================================
Makefile.common
=====================================
@@ -115,7 +115,8 @@ ifeq ($(IBM_JDK_7),)
else
Linux-ppc64le_CXXFLAGS := -include $(IBM_JDK_LIB)/jni_md.h -include $(IBM_JDK_LIB)/jniport.h -I$(JAVA_HOME)/include -I$(JAVA_HOME)/include/linux -O2 -fPIC -m64
endif
-Linux-ppc64le_LINKFLAGS := -shared -static-libgcc -static-libstdc++
+# ppcle64 GLIBC is at 2.17; so disable __tls_get_addr_opt which is dependent on 2.22;
+Linux-ppc64le_LINKFLAGS := -shared -static-libgcc -static-libstdc++ -Wl,--no-tls-optimize,--no-tls-get-addr-optimize
Linux-ppc64le_LIBNAME := libsnappyjava.so
Linux-ppc64le_SNAPPY_FLAGS :=
=====================================
Milestone.md
=====================================
@@ -1,5 +1,14 @@
Since version 1.1.0.x, Java 6 (1.6) or higher is required.
+## snapy-java-1.1.7.4 (2020-05-05)
+ * Caching internal buffers for SnappyFramed streams [#234](https://github.com/xerial/snappy-java/pull/234)
+ * Fixed the native lib for ppc64le to work with glibc 2.17 (Previously it depended on 2.22)
+
+## snappy-java-1.1.7.3 (2019-03-25)
+ * Minor release
+ * Output the snappy header even for the empty input to address Spark's [issue](https://issues.apache.org/jira/browse/SPARK-27267)
+ * Fixed SnappyFramed stream to support Java 9
+
## snappy-java-1.1.7.2 (2018-05-21)
* Fix for aarch64 endian issue
=====================================
build.sbt
=====================================
@@ -55,7 +55,7 @@ pomExtra := {
</scm>
}
-scalaVersion in ThisBuild := "2.12.4"
+scalaVersion in ThisBuild := "2.12.8"
javacOptions in (Compile, compile) ++= Seq("-encoding", "UTF-8", "-Xlint:unchecked", "-Xlint:deprecation", "-source", "1.7", "-target", "1.7")
@@ -111,7 +111,7 @@ enablePlugins(SbtOsgi)
osgiSettings
-OsgiKeys.exportPackage := Seq("org.xerial.snappy", "org.xerial.snappy.buffer")
+OsgiKeys.exportPackage := Seq("org.xerial.snappy", "org.xerial.snappy.buffer", "org.xerial.snappy.pool")
OsgiKeys.bundleSymbolicName := "org.xerial.snappy.snappy-java"
OsgiKeys.bundleActivator := Option("org.xerial.snappy.SnappyBundleActivator")
OsgiKeys.importPackage := Seq("""org.osgi.framework;version="[1.5,2)"""")
=====================================
project/build.properties
=====================================
@@ -1,2 +1,2 @@
-sbt.version=1.0.3
+sbt.version=1.2.8
=====================================
project/plugins.sbt
=====================================
@@ -1,8 +1,8 @@
-addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.6")
-addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.0")
-addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
+addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.11")
+addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3")
+addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.0-M2")
addSbtPlugin("com.github.sbt" % "sbt-findbugs" % "2.0.0")
-addSbtPlugin("com.github.sbt" % "sbt-jacoco" % "3.0.3")
-addSbtPlugin("com.typesafe.sbt" % "sbt-osgi" % "0.9.2")
-addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.0.0-RC13")
-addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.3.0")
+addSbtPlugin("com.github.sbt" % "sbt-jacoco" % "3.1.0")
+addSbtPlugin("com.typesafe.sbt" % "sbt-osgi" % "0.9.4")
+addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.1.0-M7")
+addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.5.1")
=====================================
src/main/java/org/xerial/snappy/SnappyFramed.java
=====================================
@@ -3,24 +3,9 @@
*/
package org.xerial.snappy;
-import static java.lang.invoke.MethodHandles.constant;
-import static java.lang.invoke.MethodHandles.dropArguments;
-import static java.lang.invoke.MethodHandles.filterReturnValue;
-import static java.lang.invoke.MethodHandles.guardWithTest;
-import static java.lang.invoke.MethodHandles.lookup;
-import static java.lang.invoke.MethodType.methodType;
-
import java.io.IOException;
-import java.lang.invoke.MethodHandle;
-import java.lang.invoke.MethodHandles.Lookup;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
-import java.security.AccessController;
-import java.security.PrivilegedExceptionAction;
-import java.util.logging.Level;
-import java.util.logging.Logger;
/**
* Constants and utilities for implementing x-snappy-framed.
@@ -38,82 +23,6 @@ final class SnappyFramed
private static final int MASK_DELTA = 0xa282ead8;
- /**
- * Sun specific mechanisms to clean up resources associated with direct byte buffers.
- */
- @SuppressWarnings("unchecked")
- static final Class<? extends ByteBuffer> DIRECT_BUFFER_CLAZZ = (Class<? extends ByteBuffer>) lookupClassQuietly("java.nio.DirectByteBuffer");
-
- static final MethodHandle CLEAN_HANDLE;
-
- static {
- // this approach is based off that used by apache lucene and documented here: https://issues.apache.org/jira/browse/LUCENE-6989
- // and https://github.com/apache/lucene-solr/blob/7e03427fa14a024ce257babcb8362d2451941e21/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
- MethodHandle cleanHandle = null;
- try {
- final PrivilegedExceptionAction<MethodHandle> action = new PrivilegedExceptionAction<MethodHandle>() {
-
- @Override
- public MethodHandle run() throws Exception {
- MethodHandle handle = null;
- if (DIRECT_BUFFER_CLAZZ != null) {
- final Lookup lookup = lookup();
-
- try {
- // sun.misc.Unsafe unmapping (Java 9+)
- final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
- // first check if Unsafe has the right method, otherwise we can give up
- // without doing any security critical stuff:
- final MethodHandle unmapper = lookup.findVirtual(unsafeClass, "invokeCleaner", methodType(void.class, ByteBuffer.class));
- // fetch the unsafe instance and bind it to the virtual MH:
- final Field f = unsafeClass.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- final Object theUnsafe = f.get(null);
- handle = unmapper.bindTo(theUnsafe);
- } catch (Exception e) {
- Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "unable to use java 9 Unsafe.invokeCleaner", e);
-
- // sun.misc.Cleaner unmapping (Java 8 and older)
- final Method m = DIRECT_BUFFER_CLAZZ.getMethod("cleaner");
- m.setAccessible(true);
- final MethodHandle directBufferCleanerMethod = lookup.unreflect(m);
- final Class<?> cleanerClass = directBufferCleanerMethod.type().returnType();
-
- /*
- * "Compile" a MethodHandle that basically is equivalent to the following code:
- * void unmapper(ByteBuffer byteBuffer)
- * {
- * sun.misc.Cleaner cleaner = ((java.nio.DirectByteBuffer) byteBuffer).cleaner();
- * if (nonNull(cleaner))
- * {
- * cleaner.clean();
- * }
- * else
- * {
- * // the noop is needed because MethodHandles#guardWithTest always needs ELSE
- * noop(cleaner);
- * }
- * }
- */
- final MethodHandle cleanMethod = lookup.findVirtual(cleanerClass, "clean", methodType(void.class));
- final MethodHandle nonNullTest = lookup.findStatic(SnappyFramed.class, "nonNull", methodType(boolean.class, Object.class)).asType(methodType(boolean.class, cleanerClass));
- final MethodHandle noop = dropArguments(constant(Void.class, null).asType(methodType(void.class)), 0, cleanerClass);
- handle = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop)).asType(methodType(void.class, ByteBuffer.class));
- }
- }
-
- return handle;
- }
- };
-
- cleanHandle = AccessController.doPrivileged(action);
-
- } catch (Throwable t) {
- Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Exception occurred attempting to lookup Sun specific DirectByteBuffer cleaner classes.", t);
- }
- CLEAN_HANDLE = cleanHandle;
- }
-
/**
* The header consists of the stream identifier flag, 3 bytes indicating a
* length of 6, and "sNaPpY" in ASCII.
@@ -213,52 +122,4 @@ final class SnappyFramed
buffer.clear();
return skip - toSkip;
}
-
- private static Class<?> lookupClassQuietly(String name)
- {
- try {
- return SnappyFramed.class.getClassLoader().loadClass(name);
- }
- catch (Throwable t) {
- Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Did not find requested class: " + name, t);
- }
-
- return null;
- }
-
- /**
- * Provides jvm implementation specific operation to aggressively release resources associated with <i>buffer</i>.
- *
- * @param buffer The {@code ByteBuffer} to release. Must not be {@code null}. Must be {@link ByteBuffer#isDirect() direct}.
- */
- static void releaseDirectByteBuffer(final ByteBuffer buffer)
- {
- assert buffer != null && buffer.isDirect();
-
- if (CLEAN_HANDLE != null && DIRECT_BUFFER_CLAZZ.isInstance(buffer)) {
- try {
- final PrivilegedExceptionAction<Void> pea = new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- try {
- CLEAN_HANDLE.invokeExact(buffer);
- } catch (Exception e) {
- throw e;
- } catch (Throwable t) {
- //this will be an error
- throw new RuntimeException(t);
- }
- return null;
- }
- };
- AccessController.doPrivileged(pea);
- } catch (Throwable t) {
- Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Exception occurred attempting to clean up Sun specific DirectByteBuffer.", t);
- }
- }
- }
-
- static boolean nonNull(Object o) {
- return o != null;
- }
}
=====================================
src/main/java/org/xerial/snappy/SnappyFramedInputStream.java
=====================================
@@ -9,7 +9,6 @@ import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
import static org.xerial.snappy.SnappyFramed.STREAM_IDENTIFIER_FLAG;
import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.readBytes;
-import static org.xerial.snappy.SnappyFramed.releaseDirectByteBuffer;
import static org.xerial.snappy.SnappyFramedOutputStream.MAX_BLOCK_SIZE;
import java.io.EOFException;
@@ -23,6 +22,9 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
+import org.xerial.snappy.pool.BufferPool;
+import org.xerial.snappy.pool.DefaultPoolFactory;
+
/**
* Implements the <a
* href="http://snappy.googlecode.com/svn/trunk/framing_format.txt"
@@ -41,6 +43,7 @@ public final class SnappyFramedInputStream
private final ReadableByteChannel rbc;
private final ByteBuffer frameHeader;
private final boolean verifyChecksums;
+ private final BufferPool bufferPool;
/**
* A single frame read from the underlying {@link InputStream}.
@@ -77,29 +80,67 @@ public final class SnappyFramedInputStream
*/
private byte[] buffer;
+ /**
+ * Creates a Snappy input stream to read data from the specified underlying
+ * input stream.
+ * <p>
+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
+ * </p>
+ *
+ * @param in the underlying input stream. Must not be {@code null}.
+ * @throws IOException
+ */
+ public SnappyFramedInputStream(InputStream in)
+ throws IOException
+ {
+ this(in, true, DefaultPoolFactory.getDefaultPool());
+ }
+
/**
* Creates a Snappy input stream to read data from the specified underlying
* input stream.
*
* @param in the underlying input stream. Must not be {@code null}.
+ * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
+ * @throws IOException
*/
- public SnappyFramedInputStream(InputStream in)
+ public SnappyFramedInputStream(InputStream in, BufferPool bufferPool)
throws IOException
{
- this(in, true);
+ this(in, true, bufferPool);
}
/**
* Creates a Snappy input stream to read data from the specified underlying
* input stream.
+ * <p>
+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
+ * </p>
*
* @param in the underlying input stream. Must not be {@code null}.
* @param verifyChecksums if true, checksums in input stream will be verified
+ * @throws IOException
*/
public SnappyFramedInputStream(InputStream in, boolean verifyChecksums)
throws IOException
{
- this(Channels.newChannel(in), verifyChecksums);
+ this(in, verifyChecksums, DefaultPoolFactory.getDefaultPool());
+ }
+
+ /**
+ * Creates a Snappy input stream to read data from the specified underlying
+ * input stream.
+ *
+ * @param in the underlying input stream. Must not be {@code null}.
+ * @param verifyChecksums if true, checksums in input stream will be verified
+ * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
+ * @throws IOException
+ */
+ public SnappyFramedInputStream(InputStream in, boolean verifyChecksums,
+ BufferPool bufferPool)
+ throws IOException
+ {
+ this(Channels.newChannel(in), verifyChecksums, bufferPool);
}
/**
@@ -107,6 +148,24 @@ public final class SnappyFramedInputStream
* channel.
*
* @param in the underlying readable channel. Must not be {@code null}.
+ * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
+ * @throws IOException
+ */
+ public SnappyFramedInputStream(ReadableByteChannel in, BufferPool bufferPool)
+ throws IOException
+ {
+ this(in, true, bufferPool);
+ }
+
+ /**
+ * Creates a Snappy input stream to read data from the specified underlying
+ * channel.
+ * <p>
+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
+ * </p>
+ *
+ * @param in the underlying readable channel. Must not be {@code null}.
+ * @throws IOException
*/
public SnappyFramedInputStream(ReadableByteChannel in)
throws IOException
@@ -117,18 +176,43 @@ public final class SnappyFramedInputStream
/**
* Creates a Snappy input stream to read data from the specified underlying
* channel.
+ * <p>
+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
+ * </p>
*
* @param in the underlying readable channel. Must not be {@code null}.
* @param verifyChecksums if true, checksums in input stream will be verified
+ * @throws IOException
*/
public SnappyFramedInputStream(ReadableByteChannel in,
boolean verifyChecksums)
throws IOException
+ {
+ this(in, verifyChecksums, DefaultPoolFactory.getDefaultPool());
+ }
+
+ /**
+ * Creates a Snappy input stream to read data from the specified underlying
+ * channel.
+ *
+ * @param in the underlying readable channel. Must not be {@code null}.
+ * @param verifyChecksums if true, checksums in input stream will be verified
+ * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
+ * @throws IOException
+ */
+ public SnappyFramedInputStream(ReadableByteChannel in,
+ boolean verifyChecksums, BufferPool bufferPool)
+ throws IOException
{
if (in == null) {
throw new NullPointerException("in is null");
}
+ if (bufferPool == null) {
+ throw new NullPointerException("bufferPool is null");
+ }
+
+ this.bufferPool = bufferPool;
this.rbc = in;
this.verifyChecksums = verifyChecksums;
@@ -155,19 +239,22 @@ public final class SnappyFramedInputStream
*/
private void allocateBuffersBasedOnSize(int size)
{
-
if (input != null) {
- releaseDirectByteBuffer(input);
+ bufferPool.releaseDirect(input);
}
if (uncompressedDirect != null) {
- releaseDirectByteBuffer(uncompressedDirect);
+ bufferPool.releaseDirect(uncompressedDirect);
}
- input = ByteBuffer.allocateDirect(size);
+ if (buffer != null) {
+ bufferPool.releaseArray(buffer);
+ }
+
+ input = bufferPool.allocateDirect(size);
final int maxCompressedLength = Snappy.maxCompressedLength(size);
- uncompressedDirect = ByteBuffer.allocateDirect(maxCompressedLength);
- buffer = new byte[maxCompressedLength];
+ uncompressedDirect = bufferPool.allocateDirect(maxCompressedLength);
+ buffer = bufferPool.allocateArray(maxCompressedLength);
}
@Override
@@ -359,14 +446,21 @@ public final class SnappyFramedInputStream
finally {
if (!closed) {
closed = true;
- }
- if (input != null) {
- releaseDirectByteBuffer(input);
- }
+ if (input != null) {
+ bufferPool.releaseDirect(input);
+ input = null;
+ }
+
+ if (uncompressedDirect != null) {
+ bufferPool.releaseDirect(uncompressedDirect);
+ uncompressedDirect = null;
+ }
- if (uncompressedDirect != null) {
- releaseDirectByteBuffer(uncompressedDirect);
+ if (buffer != null) {
+ bufferPool.releaseArray(buffer);
+ buffer = null;
+ }
}
}
}
@@ -456,9 +550,10 @@ public final class SnappyFramedInputStream
final int uncompressedLength = Snappy.uncompressedLength(input);
if (uncompressedLength > uncompressedDirect.capacity()) {
- uncompressedDirect = ByteBuffer
- .allocateDirect(uncompressedLength);
- buffer = new byte[Math.max(input.capacity(), uncompressedLength)];
+ bufferPool.releaseDirect(uncompressedDirect);
+ bufferPool.releaseArray(buffer);
+ uncompressedDirect = bufferPool.allocateDirect(uncompressedLength);
+ buffer = bufferPool.allocateArray(uncompressedLength);
}
uncompressedDirect.clear();
=====================================
src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java
=====================================
@@ -7,7 +7,6 @@ import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.maskedCrc32c;
-import static org.xerial.snappy.SnappyFramed.releaseDirectByteBuffer;
import java.io.IOException;
import java.io.InputStream;
@@ -19,6 +18,9 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
+import org.xerial.snappy.pool.BufferPool;
+import org.xerial.snappy.pool.DefaultPoolFactory;
+
/**
* Implements the <a
* href="http://snappy.googlecode.com/svn/trunk/framing_format.txt"
@@ -59,6 +61,8 @@ public final class SnappyFramedOutputStream
private final ByteBuffer headerBuffer = ByteBuffer.allocate(8).order(
ByteOrder.LITTLE_ENDIAN);
+ private final BufferPool bufferPool;
+ private final int blockSize;
private final ByteBuffer buffer;
private final ByteBuffer directInputBuffer;
private final ByteBuffer outputBuffer;
@@ -72,6 +76,9 @@ public final class SnappyFramedOutputStream
/**
* Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE}
* and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
+ * <p>
+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
+ * </p>
*
* @param out The underlying {@link OutputStream} to write to. Must not be
* {@code null}.
@@ -80,11 +87,29 @@ public final class SnappyFramedOutputStream
public SnappyFramedOutputStream(OutputStream out)
throws IOException
{
- this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO);
+ this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool());
+ }
+
+ /**
+ * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE}
+ * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
+ *
+ * @param out The underlying {@link OutputStream} to write to. Must not be
+ * {@code null}.
+ * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
+ * @throws IOException
+ */
+ public SnappyFramedOutputStream(OutputStream out, BufferPool bufferPool)
+ throws IOException
+ {
+ this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool);
}
/**
* Creates a new {@link SnappyFramedOutputStream} instance.
+ * <p>
+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
+ * </p>
*
* @param out The underlying {@link OutputStream} to write to. Must not be
* {@code null}.
@@ -99,12 +124,35 @@ public final class SnappyFramedOutputStream
double minCompressionRatio)
throws IOException
{
- this(Channels.newChannel(out), blockSize, minCompressionRatio);
+ this(Channels.newChannel(out), blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool());
+ }
+
+ /**
+ * Creates a new {@link SnappyFramedOutputStream} instance.
+ *
+ * @param out The underlying {@link OutputStream} to write to. Must not be
+ * {@code null}.
+ * @param blockSize The block size (of raw data) to compress before writing frames
+ * to <i>out</i>. Must be in (0, 65536].
+ * @param minCompressionRatio Defines the minimum compression ratio (
+ * {@code compressedLength / rawLength}) that must be achieved to
+ * write the compressed data. This must be in (0, 1.0].
+ * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
+ * @throws IOException
+ */
+ public SnappyFramedOutputStream(OutputStream out, int blockSize,
+ double minCompressionRatio, BufferPool bufferPool)
+ throws IOException
+ {
+ this(Channels.newChannel(out), blockSize, minCompressionRatio, bufferPool);
}
/**
* Creates a new {@link SnappyFramedOutputStream} using the
* {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
+ * <p>
+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
+ * </p>
*
* @param out The underlying {@link WritableByteChannel} to write to. Must
* not be {@code null}.
@@ -114,7 +162,25 @@ public final class SnappyFramedOutputStream
public SnappyFramedOutputStream(WritableByteChannel out)
throws IOException
{
- this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO);
+ this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool());
+ }
+
+ /**
+ * Creates a new {@link SnappyFramedOutputStream} using the
+ * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
+ * <p>
+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
+ * </p>
+ *
+ * @param out The underlying {@link WritableByteChannel} to write to. Must
+ * not be {@code null}.
+ * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
+ * @throws IOException
+ */
+ public SnappyFramedOutputStream(WritableByteChannel out, BufferPool bufferPool)
+ throws IOException
+ {
+ this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool);
}
/**
@@ -133,9 +199,34 @@ public final class SnappyFramedOutputStream
public SnappyFramedOutputStream(WritableByteChannel out, int blockSize,
double minCompressionRatio)
throws IOException
+ {
+ this(out, blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool());
+ }
+
+ /**
+ * Creates a new {@link SnappyFramedOutputStream} instance.
+ *
+ * @param out The underlying {@link WritableByteChannel} to write to. Must
+ * not be {@code null}.
+ * @param blockSize The block size (of raw data) to compress before writing frames
+ * to <i>out</i>. Must be in (0, 65536].
+ * @param minCompressionRatio Defines the minimum compression ratio (
+ * {@code compressedLength / rawLength}) that must be achieved to
+ * write the compressed data. This must be in (0, 1.0].
+ * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
+ * @throws IOException
+ * @since 1.1.1
+ */
+ public SnappyFramedOutputStream(WritableByteChannel out, int blockSize,
+ double minCompressionRatio, BufferPool bufferPool)
+ throws IOException
{
if (out == null) {
- throw new NullPointerException();
+ throw new NullPointerException("out is null");
+ }
+
+ if (bufferPool == null) {
+ throw new NullPointerException("buffer pool is null");
}
if (minCompressionRatio <= 0 || minCompressionRatio > 1.0) {
@@ -147,12 +238,14 @@ public final class SnappyFramedOutputStream
throw new IllegalArgumentException("block size " + blockSize
+ " must be in (0, 65536]");
}
-
+ this.blockSize = blockSize;
this.out = out;
this.minCompressionRatio = minCompressionRatio;
- buffer = ByteBuffer.allocate(blockSize);
- directInputBuffer = ByteBuffer.allocateDirect(blockSize);
- outputBuffer = ByteBuffer.allocateDirect(Snappy
+
+ this.bufferPool = bufferPool;
+ buffer = ByteBuffer.wrap(bufferPool.allocateArray(blockSize), 0, blockSize);
+ directInputBuffer = bufferPool.allocateDirect(blockSize);
+ outputBuffer = bufferPool.allocateDirect(Snappy
.maxCompressedLength(blockSize));
writeHeader(out);
@@ -370,9 +463,9 @@ public final class SnappyFramedOutputStream
}
finally {
closed = true;
-
- releaseDirectByteBuffer(directInputBuffer);
- releaseDirectByteBuffer(outputBuffer);
+ bufferPool.releaseArray(buffer.array());
+ bufferPool.releaseDirect(directInputBuffer);
+ bufferPool.releaseDirect(outputBuffer);
}
}
@@ -389,6 +482,7 @@ public final class SnappyFramedOutputStream
buffer.flip();
writeCompressed(buffer);
buffer.clear();
+ buffer.limit(blockSize);
}
}
=====================================
src/main/java/org/xerial/snappy/pool/BufferPool.java
=====================================
@@ -0,0 +1,53 @@
+package org.xerial.snappy.pool;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Makes various types of buffers available for use and potential re-use.
+ *
+ * <p>
+ * Implementations must be safe for concurrent use by multiple threads.
+ * </p>
+ *
+ * @author Brett Okken
+ */
+public interface BufferPool {
+
+ /**
+ * Returns a {@code byte[]} of <i>size</i> or greater length.
+ * @param size The minimum size array required. Must be {@code >= 0}.
+ * @return A {@code byte[]} with length of at least <i>size</i>.
+ * @see #releaseArray(byte[])
+ */
+ public byte[] allocateArray(int size);
+
+ /**
+ * Returns instance to pool for potential future reuse.
+ * <p>
+ * Must not be returned more than 1 time. Must not be used by caller after return.
+ * </p>
+ * @param buffer Instance to return to pool. Must not be {@code null}.
+ * Must not be returned more than 1 time. Must not be used by caller after return.
+ */
+ public void releaseArray(byte[] buffer);
+
+ /**
+ * Returns a {@link ByteBuffer#allocateDirect(int) direct ByteBuffer} of <i>size</i> or
+ * greater {@link ByteBuffer#capacity() capacity}.
+ * @param size The minimum size buffer required. Must be {@code >= 0}.
+ * @return A {@code ByteBuffer} of <i>size</i> or greater {@link ByteBuffer#capacity() capacity}.
+ * @see #releaseDirect(ByteBuffer)
+ * @see ByteBuffer#allocateDirect(int)
+ */
+ public ByteBuffer allocateDirect(int size);
+
+ /**
+ * Returns instance to pool for potential future reuse.
+ * <p>
+ * Must not be returned more than 1 time. Must not be used by caller after return.
+ * </p>
+ * @param buffer Instance to return to pool. Must not be {@code null}.
+ * Must not be returned more than 1 time. Must not be used by caller after return.
+ */
+ public void releaseDirect(ByteBuffer buffer);
+}
=====================================
src/main/java/org/xerial/snappy/pool/CachingBufferPool.java
=====================================
@@ -0,0 +1,216 @@
+package org.xerial.snappy.pool;
+
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A {@link BufferPool} implementation which caches values at fixed sizes.
+ * <p>
+ * Pooled instances are held as {@link SoftReference} to allow GC if necessary.
+ * </p>
+ * <p>
+ * The current fixed sizes are calculated as follows:
+ * <ul>
+ * <li>Values < 4KB return 4KB</li>
+ * <li>4KB - 32KB to 2KB</li>
+ * <li>32KB - 512KB to 16KB</li>
+ * <li>512KB - 2MB to 128KB</li>
+ * <li>2MB - 16MB to 512KB</li>
+ * <li>16MB - 128MB to 4MB</li>
+ * <li>128MB - 512MB to 16MB</li>
+ * <li>512MB - 1.5 GB to 128MB</li>
+ * <li>Values > 1.5GB return {@link Integer#MAX_VALUE}</li>
+ * </ul>
+ * </p>
+ * @author Brett Okken
+ */
+public final class CachingBufferPool implements BufferPool {
+
+ private static interface IntFunction<E> {
+ public E create(int size);
+ }
+
+ private static final IntFunction<byte[]> ARRAY_FUNCTION = new IntFunction<byte[]>() {
+ @Override
+ public byte[] create(int size) {
+ return new byte[size];
+ }
+ };
+
+ private static final IntFunction<ByteBuffer> DBB_FUNCTION = new IntFunction<ByteBuffer>() {
+ @Override
+ public ByteBuffer create(int size) {
+ return ByteBuffer.allocateDirect(size);
+ }
+ };
+
+ private static final CachingBufferPool INSTANCE = new CachingBufferPool();
+
+ private final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<byte[]>>> bytes = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<ByteBuffer>>> buffers = new ConcurrentHashMap<>();
+
+ private CachingBufferPool() {
+ }
+
+ /**
+ * Returns instance of {@link CachingBufferPool} for using cached buffers.
+ * @return instance of {@link CachingBufferPool} for using cached buffers.
+ */
+ public static BufferPool getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] allocateArray(int size) {
+ if (size <= 0) {
+ throw new IllegalArgumentException("size is invalid: " + size);
+ }
+
+ return getOrCreate(size, bytes, ARRAY_FUNCTION);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void releaseArray(byte[] buffer) {
+ if (buffer == null) {
+ throw new IllegalArgumentException("buffer is null");
+ }
+ returnValue(buffer, buffer.length, bytes);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ByteBuffer allocateDirect(int size) {
+ if (size <= 0) {
+ throw new IllegalArgumentException("size is invalid: " + size);
+ }
+
+ return getOrCreate(size, buffers, DBB_FUNCTION);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void releaseDirect(ByteBuffer buffer) {
+ if (buffer == null) {
+ throw new IllegalArgumentException("buffer is null");
+ }
+ buffer.clear();
+ returnValue(buffer, buffer.capacity(), buffers);
+ }
+
+ private static <E> E getOrCreate(final int size, final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map, final IntFunction<E> creator) {
+ assert size > 0;
+ final int adjustedSize = adjustSize(size);
+ final ConcurrentLinkedDeque<SoftReference<E>> queue = optimisticGetEntry(adjustedSize, map);
+ SoftReference<E> entry;
+ while ((entry = queue.pollFirst()) != null) {
+ final E val = entry.get();
+ if (val != null) {
+ return val;
+ }
+ }
+
+ return creator.create(adjustedSize);
+ }
+
+ /*
+ * This is package scope to allow direct unit testing.
+ */
+ static int adjustSize(int size) {
+ assert size > 0;
+
+ switch (Integer.numberOfLeadingZeros(size)) {
+ case 1: // 1GB - 2GB
+ case 2: // 512MB
+ //if 512MB - 1.5 GB round to nearest 128 MB (2^27), else Integer.MAX_VALUE
+ return size <= 0x6000_0000 ? roundToPowers(size, 27) : Integer.MAX_VALUE;
+ case 3: //256MB
+ case 4: //128MB
+ //if 128MB - 512MB, round to nearest 16 MB
+ return roundToPowers(size, 24);
+ case 5: // 64MB
+ case 6: // 32MB
+ case 7: // 16MB
+ //if 16MB - 128MB, round to nearest 4MB
+ return roundToPowers(size, 22);
+ case 8: // 8MB
+ case 9: // 4MB
+ case 10: // 2MB
+ //if 2MB - 16MB, round to nearest 512KB
+ return roundToPowers(size, 19);
+ case 11: // 1MB
+ case 12: //512KB
+ //if 512KB - 2MB, round to nearest 128KB
+ return roundToPowers(size, 17);
+ case 13: //256KB
+ case 14: //128KB
+ case 15: // 64KB
+ case 16: // 32KB
+ //if 32KB to 512KB, round to nearest 16KB
+ return roundToPowers(size, 14);
+ case 17: // 16KB
+ case 18: // 8KB
+ case 19: // 4KB
+ // if 4KB - 32KB, round to nearest 2KB
+ return roundToPowers(size, 11);
+ default:
+ return 4 * 1024;
+ }
+ }
+
+ private static int roundToPowers(int number, int bits) {
+ final int mask = (0x7FFF_FFFF >> bits) << bits;
+ final int floor = number & mask;
+ return floor == number ? number : floor + (1 << bits);
+ }
+
+ private static <E> ConcurrentLinkedDeque<SoftReference<E>> optimisticGetEntry(Integer key, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map) {
+ ConcurrentLinkedDeque<SoftReference<E>> val = map.get(key);
+ if (val == null) {
+ map.putIfAbsent(key, new ConcurrentLinkedDeque<SoftReference<E>>());
+ val = map.get(key);
+ }
+ return val;
+ }
+
+ private static <E> void returnValue(E value, Integer size, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map) {
+ final ConcurrentLinkedDeque<SoftReference<E>> queue = map.get(size);
+ //no queue will exist if buffer was not originally obtained from this class
+ if (queue != null) {
+ //push this value onto deque first so that concurrent request can use it
+ queue.addFirst(new SoftReference<E>(value));
+
+ //purge oldest entries have lost references
+ SoftReference<E> entry;
+ boolean lastEmpty = true;
+ while(lastEmpty && (entry = queue.peekLast()) != null) {
+ if (entry.get() == null) {
+ queue.removeLastOccurrence(entry);
+ } else {
+ lastEmpty = false;
+ }
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return "CachingBufferPool [bytes=" + this.bytes + ", buffers=" + this.buffers + "]";
+ }
+}
+
=====================================
src/main/java/org/xerial/snappy/pool/DefaultPoolFactory.java
=====================================
@@ -0,0 +1,37 @@
+package org.xerial.snappy.pool;
+
+/**
+ * Manages implementation of {@link BufferPool} to use by default. Setting the system property {@link #DISABLE_CACHING_PROPERTY} to {@code true} will
+ * cause the {@link QuiescentBufferPool} to be used by default. Otherwise, {@link CachingBufferPool} will be used by default.
+ * {@link #setDefaultPool(BufferPool)} can be used to explicitly control the implementation to use.
+ */
+public final class DefaultPoolFactory {
+
+ /**
+ * Name of system property to disable use of {@link CachingBufferPool} by default.
+ */
+ public static final String DISABLE_CACHING_PROPERTY = "org.xerial.snappy.pool.disable";
+
+ private static volatile BufferPool defaultPool = "true".equalsIgnoreCase(System.getProperty(DISABLE_CACHING_PROPERTY))
+ ? QuiescentBufferPool.getInstance()
+ : CachingBufferPool.getInstance();
+
+ /**
+ * @return The default instance to use.
+ */
+ public static BufferPool getDefaultPool() {
+ return defaultPool;
+ }
+
+ /**
+ * Sets the default instance to use.
+ * @param pool The default instance to use. Must not be {@code null}.
+ * @see #getDefaultPool()
+ */
+ public static void setDefaultPool(BufferPool pool) {
+ if (pool == null) {
+ throw new IllegalArgumentException("pool is null");
+ }
+ defaultPool = pool;
+ }
+}
=====================================
src/main/java/org/xerial/snappy/pool/DirectByteBuffers.java
=====================================
@@ -0,0 +1,150 @@
+package org.xerial.snappy.pool;
+
+import static java.lang.invoke.MethodHandles.constant;
+import static java.lang.invoke.MethodHandles.dropArguments;
+import static java.lang.invoke.MethodHandles.filterReturnValue;
+import static java.lang.invoke.MethodHandles.guardWithTest;
+import static java.lang.invoke.MethodHandles.lookup;
+import static java.lang.invoke.MethodType.methodType;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles.Lookup;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedExceptionAction;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Utility to facilitate disposing of direct byte buffer instances.
+ */
+final class DirectByteBuffers {
+
+ /**
+ * Sun specific mechanisms to clean up resources associated with direct byte buffers.
+ */
+ @SuppressWarnings("unchecked")
+ static final Class<? extends ByteBuffer> DIRECT_BUFFER_CLAZZ = (Class<? extends ByteBuffer>) lookupClassQuietly("java.nio.DirectByteBuffer");
+
+ static final MethodHandle CLEAN_HANDLE;
+
+ static {
+ // this approach is based off that used by apache lucene and documented here: https://issues.apache.org/jira/browse/LUCENE-6989
+ // and https://github.com/apache/lucene-solr/blob/7e03427fa14a024ce257babcb8362d2451941e21/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
+ MethodHandle cleanHandle = null;
+ try {
+ final PrivilegedExceptionAction<MethodHandle> action = new PrivilegedExceptionAction<MethodHandle>() {
+
+ @Override
+ public MethodHandle run() throws Exception {
+ MethodHandle handle = null;
+ if (DIRECT_BUFFER_CLAZZ != null) {
+ final Lookup lookup = lookup();
+
+ try {
+ // sun.misc.Unsafe unmapping (Java 9+)
+ final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
+ // first check if Unsafe has the right method, otherwise we can give up
+ // without doing any security critical stuff:
+ final MethodHandle unmapper = lookup.findVirtual(unsafeClass, "invokeCleaner", methodType(void.class, ByteBuffer.class));
+ // fetch the unsafe instance and bind it to the virtual MH:
+ final Field f = unsafeClass.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ final Object theUnsafe = f.get(null);
+ handle = unmapper.bindTo(theUnsafe);
+ } catch (Exception e) {
+ Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "unable to use java 9 Unsafe.invokeCleaner", e);
+
+ // sun.misc.Cleaner unmapping (Java 8 and older)
+ final Method m = DIRECT_BUFFER_CLAZZ.getMethod("cleaner");
+ m.setAccessible(true);
+ final MethodHandle directBufferCleanerMethod = lookup.unreflect(m);
+ final Class<?> cleanerClass = directBufferCleanerMethod.type().returnType();
+
+ /*
+ * "Compile" a MethodHandle that basically is equivalent to the following code:
+ * void unmapper(ByteBuffer byteBuffer)
+ * {
+ * sun.misc.Cleaner cleaner = ((java.nio.DirectByteBuffer) byteBuffer).cleaner();
+ * if (nonNull(cleaner))
+ * {
+ * cleaner.clean();
+ * }
+ * else
+ * {
+ * // the noop is needed because MethodHandles#guardWithTest always needs ELSE
+ * noop(cleaner);
+ * }
+ * }
+ */
+ final MethodHandle cleanMethod = lookup.findVirtual(cleanerClass, "clean", methodType(void.class));
+ final MethodHandle nonNullTest = lookup.findStatic(DirectByteBuffers.class, "nonNull", methodType(boolean.class, Object.class)).asType(methodType(boolean.class, cleanerClass));
+ final MethodHandle noop = dropArguments(constant(Void.class, null).asType(methodType(void.class)), 0, cleanerClass);
+ handle = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop)).asType(methodType(void.class, ByteBuffer.class));
+ }
+ }
+
+ return handle;
+ }
+ };
+
+ cleanHandle = AccessController.doPrivileged(action);
+
+ } catch (Throwable t) {
+ Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "Exception occurred attempting to lookup Sun specific DirectByteBuffer cleaner classes.", t);
+ }
+ CLEAN_HANDLE = cleanHandle;
+ }
+
+
+ private static Class<?> lookupClassQuietly(String name)
+ {
+ try {
+ return DirectByteBuffers.class.getClassLoader().loadClass(name);
+ }
+ catch (Throwable t) {
+ Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "Did not find requested class: " + name, t);
+ }
+
+ return null;
+ }
+
+
+ static boolean nonNull(Object o) {
+ return o != null;
+ }
+
+ /**
+ * Provides jvm implementation specific operation to aggressively release resources associated with <i>buffer</i>.
+ *
+ * @param buffer The {@code ByteBuffer} to release. Must not be {@code null}. Must be {@link ByteBuffer#isDirect() direct}.
+ */
+ public static void releaseDirectByteBuffer(final ByteBuffer buffer)
+ {
+ assert buffer != null && buffer.isDirect();
+
+ if (CLEAN_HANDLE != null && DIRECT_BUFFER_CLAZZ.isInstance(buffer)) {
+ try {
+ final PrivilegedExceptionAction<Void> pea = new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ try {
+ CLEAN_HANDLE.invokeExact(buffer);
+ } catch (Exception e) {
+ throw e;
+ } catch (Throwable t) {
+ //this will be an error
+ throw new RuntimeException(t);
+ }
+ return null;
+ }
+ };
+ AccessController.doPrivileged(pea);
+ } catch (Throwable t) {
+ Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "Exception occurred attempting to clean up Sun specific DirectByteBuffer.", t);
+ }
+ }
+ }
+}
=====================================
src/main/java/org/xerial/snappy/pool/QuiescentBufferPool.java
=====================================
@@ -0,0 +1,55 @@
+package org.xerial.snappy.pool;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link BufferPool} implementation which does no pooling. New instances will be created for each call to allocate.
+ * @author Brett Okken
+ */
+public final class QuiescentBufferPool implements BufferPool {
+
+ private static final QuiescentBufferPool INSTANCE = new QuiescentBufferPool();
+
+ private QuiescentBufferPool() {
+ }
+
+ /**
+ * @return Instance of {@link BufferPool} which does no caching/reuse of instances.
+ */
+ public static BufferPool getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Creates a new {@code byte[]} of <i>size</i>.
+ */
+ @Override
+ public byte[] allocateArray(int size) {
+ return new byte[size];
+ }
+
+ /**
+ * Does nothing.
+ */
+ @Override
+ public void releaseArray(byte[] buffer) {
+ }
+
+ /**
+ * {@link ByteBuffer#allocateDirect(int) Allocates} a direct {@link ByteBuffer} of <i>size</i>.
+ */
+ @Override
+ public ByteBuffer allocateDirect(int size) {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ /**
+ * Aggressively releases native resources associated with <i>buffer</i>.
+ */
+ @Override
+ public void releaseDirect(ByteBuffer buffer) {
+ assert buffer != null && buffer.isDirect();
+ DirectByteBuffers.releaseDirectByteBuffer(buffer);
+ }
+
+}
=====================================
src/test/java/org/xerial/snappy/pool/CachingBufferPoolTest.java
=====================================
@@ -0,0 +1,184 @@
+package org.xerial.snappy.pool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+
+public class CachingBufferPoolTest {
+
+ private static final int LIST_COUNT = 2048;
+
+ @Test
+ public void testAdjustSize() {
+ assertEquals(4 * 1024, CachingBufferPool.adjustSize(2));
+ assertEquals(4 * 1024, CachingBufferPool.adjustSize(1023));
+ assertEquals(4 * 1024, CachingBufferPool.adjustSize(1024));
+ assertEquals(4 * 1024, CachingBufferPool.adjustSize(1025));
+ assertEquals(4 * 1024, CachingBufferPool.adjustSize(4 * 1024));
+ assertEquals((4 + 2) * 1024, CachingBufferPool.adjustSize((4 * 1024) + 1));
+ assertEquals(6 * 1024, CachingBufferPool.adjustSize(5 * 1024));
+ assertEquals(6 * 1024, CachingBufferPool.adjustSize((5 * 1024) + 1));
+
+ assertEquals(32 * 1024, CachingBufferPool.adjustSize(32 * 1024));
+ assertEquals((32 + 16) * 1024, CachingBufferPool.adjustSize((32 * 1024) + 1));
+
+ assertEquals(2 * 1024 * 1024, CachingBufferPool.adjustSize(2 * 1024 * 1024));
+ assertEquals(((2 * 1024) + 512) * 1024, CachingBufferPool.adjustSize((2 * 1024 * 1024) + 1));
+
+ assertEquals(16 * 1024 * 1024, CachingBufferPool.adjustSize(16 * 1024 * 1024));
+ assertEquals((16 + 4) * 1024 * 1024, CachingBufferPool.adjustSize((16 * 1024 * 1024) + 1));
+
+ assertEquals(128 * 1024 * 1024, CachingBufferPool.adjustSize(128 * 1024 * 1024));
+ assertEquals((128 + 16) * 1024 * 1024, CachingBufferPool.adjustSize((128 * 1024 * 1024) + 1));
+
+ assertEquals(512 * 1024 * 1024, CachingBufferPool.adjustSize(512 * 1024 * 1024));
+ assertEquals((512 + 128) * 1024 * 1024, CachingBufferPool.adjustSize((512 * 1024 * 1024) + 1));
+ assertEquals(0x6000_0000, CachingBufferPool.adjustSize(0x6000_0000));
+ assertEquals(0x6000_0000, CachingBufferPool.adjustSize(0x6000_0000 - 1));
+ assertEquals(Integer.MAX_VALUE, CachingBufferPool.adjustSize(0x6000_0001));
+ assertEquals(Integer.MAX_VALUE, CachingBufferPool.adjustSize(Integer.MAX_VALUE));
+ assertEquals(Integer.MAX_VALUE, CachingBufferPool.adjustSize(Integer.MAX_VALUE - 1));
+ }
+
+ @Test
+ public void testDirectByteBuffers() throws Exception {
+
+ BufferPool pool = CachingBufferPool.getInstance();
+
+ ByteBuffer bb1 = pool.allocateDirect(12 * 1024);
+ assertNotNull(bb1);
+ assertEquals(12 * 1024, bb1.limit());
+ assertEquals(12 * 1024, bb1.capacity());
+ assertEquals(0, bb1.position());
+
+ ByteBuffer bb2 = pool.allocateDirect(12 * 1024);
+ assertNotNull(bb2);
+ assertEquals(12 * 1024, bb2.limit());
+ assertEquals(12 * 1024, bb2.capacity());
+ assertEquals(0, bb2.position());
+
+ assertNotSame(bb1, bb2);
+
+ bb2.position(18);
+ pool.releaseDirect(bb2);
+
+ ByteBuffer bb3 = pool.allocateDirect(12 * 1024);
+ assertNotNull(bb3);
+ assertEquals(12 * 1024, bb3.limit());
+ assertEquals(12 * 1024, bb3.capacity());
+ assertEquals(0, bb3.position());
+
+ assertNotSame(bb1, bb2);
+ assertSame(bb2, bb3);
+
+ pool.releaseDirect(bb1);
+
+ ByteBuffer bb4 = pool.allocateDirect((12 * 1024) - 1);
+ assertNotNull(bb4);
+ assertEquals(12 * 1024, bb4.limit());
+ assertEquals(12 * 1024, bb4.capacity());
+ assertEquals(0, bb4.position());
+
+ assertSame(bb1, bb4);
+ }
+
+ @Test
+ public void testArrays() throws Exception {
+
+ BufferPool pool = CachingBufferPool.getInstance();
+
+ byte[] bb1 = pool.allocateArray(12 * 1024);
+ assertNotNull(bb1);
+ assertEquals(12 * 1024, bb1.length);
+
+ byte[] bb2 = pool.allocateArray(12 * 1024);
+ assertNotNull(bb2);
+ assertEquals(12 * 1024, bb2.length);
+
+ assertNotSame(bb1, bb2);
+
+ pool.releaseArray(bb2);
+
+ byte[] bb3 = pool.allocateArray(12 * 1024);
+ assertNotNull(bb3);
+ assertEquals(12 * 1024, bb3.length);
+
+ assertNotSame(bb1, bb2);
+ assertSame(bb2, bb3);
+
+ pool.releaseArray(bb1);
+
+ byte[] bb4 = pool.allocateArray((12 * 1024) - 1);
+ assertNotNull(bb4);
+ assertEquals(12 * 1024, bb4.length);
+
+ assertSame(bb1, bb4);
+ }
+
+ @Test
+ public void testSoftReferences() {
+
+ BufferPool pool = CachingBufferPool.getInstance();
+ byte[] bb1 = pool.allocateArray(8 * 1024);
+ Reference<byte[]> ref = new WeakReference<byte[]>(bb1);
+ bb1[0] = 123;
+ bb1[8000] = -74;
+ int bb1HC = System.identityHashCode(bb1);
+
+ pool.releaseArray(bb1);
+
+ byte[] bb1_copy = pool.allocateArray(8 * 1024);
+ assertSame(bb1, bb1_copy);
+ assertEquals(123, bb1_copy[0]);
+ assertEquals(-74, bb1_copy[8000]);
+ assertEquals(bb1HC, System.identityHashCode(bb1_copy));
+
+ //release back into pool (again)
+ pool.releaseArray(bb1);
+
+ //release strong references
+ bb1_copy = null;
+ bb1 = null;
+ assertNotNull(ref.get());
+
+ //force an OOME to for SoftReferences to be collected
+ List<byte[]> vals = forceOOMEGC(LIST_COUNT);
+ assertTrue("count: " + vals.size(), vals.size() < LIST_COUNT);
+
+ //assert that our test reference has been cleared
+ assertNull(ref.get());
+
+ //get another value from the pool
+ byte[] bb2 = pool.allocateArray(8 * 1024);
+ //assert that it is indeed a new value, and not same from previous
+ assertNotEquals(123, bb2[0]);
+ assertNotEquals(-74, bb2[8000]);
+ assertNotEquals(bb1HC, System.identityHashCode(bb2));
+ }
+
+ private static List<byte[]> forceOOMEGC(int count) {
+ final List<byte[]> vals = new ArrayList<>(count);
+
+ try {
+ for (int i=0; i<count; ++i) {
+ vals.add(new byte[10 * 1024 * 1024]);
+ }
+ } catch(Error e) {
+
+ }
+ return vals;
+ }
+}
=====================================
version.sbt
=====================================
@@ -1 +1 @@
-version in ThisBuild := "1.1.7.3"
+version in ThisBuild := "1.1.7.5"
View it on GitLab: https://salsa.debian.org/java-team/snappy-java/-/compare/f959f782291b13da7dafe2eeeb4aa81bde16b332...ae0e91850fc98eb3785b7219b21daa74b4c0436d
--
View it on GitLab: https://salsa.debian.org/java-team/snappy-java/-/compare/f959f782291b13da7dafe2eeeb4aa81bde16b332...ae0e91850fc98eb3785b7219b21daa74b4c0436d
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20200615/01079b80/attachment.html>
More information about the pkg-java-commits
mailing list