summaryrefslogtreecommitdiffstats
path: root/src/arrow/java/memory/memory-netty
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/java/memory/memory-netty')
-rw-r--r--src/arrow/java/memory/memory-netty/pom.xml72
-rw-r--r--src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/ExpandableByteBuf.java56
-rw-r--r--src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/LargeBuffer.java34
-rw-r--r--src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java448
-rw-r--r--src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/NettyArrowBuf.java622
-rw-r--r--src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java280
-rw-r--r--src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java270
-rw-r--r--src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java161
-rw-r--r--src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java38
-rw-r--r--src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java123
-rw-r--r--src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestNettyArrowBuf.java141
-rw-r--r--src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestUnsafeDirectLittleEndian.java77
-rw-r--r--src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/ITTestLargeArrowBuf.java72
-rw-r--r--src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestAllocationManagerNetty.java39
-rw-r--r--src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java1183
-rw-r--r--src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEmptyArrowBuf.java88
-rw-r--r--src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEndianness.java51
-rw-r--r--src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java108
-rw-r--r--src/arrow/java/memory/memory-netty/src/test/resources/logback.xml28
19 files changed, 3891 insertions, 0 deletions
diff --git a/src/arrow/java/memory/memory-netty/pom.xml b/src/arrow/java/memory/memory-netty/pom.xml
new file mode 100644
index 000000000..dee06a321
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>arrow-memory</artifactId>
+ <groupId>org.apache.arrow</groupId>
+ <version>6.0.1</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>arrow-memory-netty</artifactId>
+ <name>Arrow Memory - Netty</name>
+ <description>Netty allocator and utils for allocating memory in Arrow</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.immutables</groupId>
+ <artifactId>value</artifactId>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <!-- This profile turns on integration testing. It activates the failsafe plugin and will run any tests
+ with the 'IT' prefix. This should be run in a separate CI build or on developers machines as it potentially
+ uses quite a bit of memory. Activate the tests by adding -Pintegration-tests to your maven command line -->
+ <id>integration-tests</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/ExpandableByteBuf.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/ExpandableByteBuf.java
new file mode 100644
index 000000000..09b730044
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/ExpandableByteBuf.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import org.apache.arrow.memory.BufferAllocator;
+
+/**
+ * Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of
+ * the Netty framework
+ * (thus supporting RPC level memory accounting).
+ */
+public class ExpandableByteBuf extends MutableWrappedByteBuf {
+
+ private final BufferAllocator allocator;
+
+ public ExpandableByteBuf(ByteBuf buffer, BufferAllocator allocator) {
+ super(buffer);
+ this.allocator = allocator;
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ return new ExpandableByteBuf(buffer.copy(index, length), allocator);
+ }
+
+ @Override
+ public ByteBuf capacity(int newCapacity) {
+ if (newCapacity > capacity()) {
+ ByteBuf newBuf = NettyArrowBuf.unwrapBuffer(allocator.buffer(newCapacity));
+ newBuf.writeBytes(buffer, 0, buffer.capacity());
+ newBuf.readerIndex(buffer.readerIndex());
+ newBuf.writerIndex(buffer.writerIndex());
+ buffer.release();
+ buffer = newBuf;
+ return newBuf;
+ } else {
+ return super.capacity(newCapacity);
+ }
+ }
+
+}
diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/LargeBuffer.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/LargeBuffer.java
new file mode 100644
index 000000000..792b3b814
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+/**
+ * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and
+ * counts.
+ */
+public class LargeBuffer extends MutableWrappedByteBuf {
+
+ public LargeBuffer(ByteBuf buffer) {
+ super(buffer);
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ return new LargeBuffer(buffer.copy(index, length));
+ }
+}
diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
new file mode 100644
index 000000000..5221dd3c1
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+import io.netty.util.ByteProcessor;
+
+/**
+ * This is basically a complete copy of netty's DuplicatedByteBuf. We copy because we want to override
+ * some behaviors and make buffer mutable.
+ */
+abstract class MutableWrappedByteBuf extends AbstractByteBuf {
+
+ ByteBuf buffer;
+
+ public MutableWrappedByteBuf(ByteBuf buffer) {
+ super(buffer.maxCapacity());
+
+ if (buffer instanceof MutableWrappedByteBuf) {
+ this.buffer = ((MutableWrappedByteBuf) buffer).buffer;
+ } else {
+ this.buffer = buffer;
+ }
+
+ setIndex(buffer.readerIndex(), buffer.writerIndex());
+ }
+
+ @Override
+ public ByteBuffer nioBuffer(int index, int length) {
+ return unwrap().nioBuffer(index, length);
+ }
+
+ @Override
+ public ByteBuf unwrap() {
+ return buffer;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return buffer.alloc();
+ }
+
+ @Override
+ public ByteOrder order() {
+ return buffer.order();
+ }
+
+ @Override
+ public boolean isDirect() {
+ return buffer.isDirect();
+ }
+
+ @Override
+ public int capacity() {
+ return buffer.capacity();
+ }
+
+ @Override
+ public ByteBuf capacity(int newCapacity) {
+ buffer.capacity(newCapacity);
+ return this;
+ }
+
+ @Override
+ public boolean hasArray() {
+ return buffer.hasArray();
+ }
+
+ @Override
+ public byte[] array() {
+ return buffer.array();
+ }
+
+ @Override
+ public int arrayOffset() {
+ return buffer.arrayOffset();
+ }
+
+ @Override
+ public boolean hasMemoryAddress() {
+ return buffer.hasMemoryAddress();
+ }
+
+ @Override
+ public long memoryAddress() {
+ return buffer.memoryAddress();
+ }
+
+ @Override
+ public byte getByte(int index) {
+ return _getByte(index);
+ }
+
+ @Override
+ protected byte _getByte(int index) {
+ return buffer.getByte(index);
+ }
+
+ @Override
+ public short getShort(int index) {
+ return _getShort(index);
+ }
+
+ @Override
+ protected short _getShort(int index) {
+ return buffer.getShort(index);
+ }
+
+ @Override
+ public short getShortLE(int index) {
+ return buffer.getShortLE(index);
+ }
+
+ @Override
+ protected short _getShortLE(int index) {
+ return buffer.getShortLE(index);
+ }
+
+ @Override
+ public int getUnsignedMedium(int index) {
+ return _getUnsignedMedium(index);
+ }
+
+ @Override
+ protected int _getUnsignedMedium(int index) {
+ return buffer.getUnsignedMedium(index);
+ }
+
+ @Override
+ public int getUnsignedMediumLE(int index) {
+ return buffer.getUnsignedMediumLE(index);
+ }
+
+ @Override
+ protected int _getUnsignedMediumLE(int index) {
+ return buffer.getUnsignedMediumLE(index);
+ }
+
+ @Override
+ public int getInt(int index) {
+ return _getInt(index);
+ }
+
+ @Override
+ protected int _getInt(int index) {
+ return buffer.getInt(index);
+ }
+
+ @Override
+ public int getIntLE(int index) {
+ return buffer.getIntLE(index);
+ }
+
+ @Override
+ protected int _getIntLE(int index) {
+ return buffer.getIntLE(index);
+ }
+
+ @Override
+ public long getLong(int index) {
+ return _getLong(index);
+ }
+
+ @Override
+ protected long _getLong(int index) {
+ return buffer.getLong(index);
+ }
+
+ @Override
+ public long getLongLE(int index) {
+ return buffer.getLongLE(index);
+ }
+
+ @Override
+ protected long _getLongLE(int index) {
+ return buffer.getLongLE(index);
+ }
+
+ @Override
+ public abstract ByteBuf copy(int index, int length);
+
+ @Override
+ public ByteBuf slice(int index, int length) {
+ return new SlicedByteBuf(this, index, length);
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+ buffer.getBytes(index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+ buffer.getBytes(index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuffer dst) {
+ buffer.getBytes(index, dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setByte(int index, int value) {
+ _setByte(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setByte(int index, int value) {
+ buffer.setByte(index, value);
+ }
+
+ @Override
+ public ByteBuf setShort(int index, int value) {
+ _setShort(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setShort(int index, int value) {
+ buffer.setShort(index, value);
+ }
+
+ @Override
+ public ByteBuf setShortLE(int index, int value) {
+ buffer.setShortLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setShortLE(int index, int value) {
+ buffer.setShortLE(index, value);
+ }
+
+ @Override
+ public ByteBuf setMedium(int index, int value) {
+ _setMedium(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setMedium(int index, int value) {
+ buffer.setMedium(index, value);
+ }
+
+ @Override
+ public ByteBuf setMediumLE(int index, int value) {
+ buffer.setMediumLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setMediumLE(int index, int value) {
+ buffer.setMediumLE(index, value);
+ }
+
+ @Override
+ public ByteBuf setInt(int index, int value) {
+ _setInt(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setInt(int index, int value) {
+ buffer.setInt(index, value);
+ }
+
+ @Override
+ public ByteBuf setIntLE(int index, int value) {
+ buffer.setIntLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setIntLE(int index, int value) {
+ buffer.setIntLE(index, value);
+ }
+
+ @Override
+ public ByteBuf setLong(int index, long value) {
+ _setLong(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setLong(int index, long value) {
+ buffer.setLong(index, value);
+ }
+
+ @Override
+ public ByteBuf setLongLE(int index, long value) {
+ buffer.setLongLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setLongLE(int index, long value) {
+ buffer.setLongLE(index, value);
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+ buffer.setBytes(index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+ buffer.setBytes(index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuffer src) {
+ buffer.setBytes(index, src);
+ return this;
+ }
+
+ @Override
+ public int setBytes(int index, FileChannel in, long position, int length)
+ throws IOException {
+ return buffer.setBytes(index, in, position, length);
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, OutputStream out, int length)
+ throws IOException {
+ buffer.getBytes(index, out, length);
+ return this;
+ }
+
+ @Override
+ public int getBytes(int index, GatheringByteChannel out, int length)
+ throws IOException {
+ return buffer.getBytes(index, out, length);
+ }
+
+ @Override
+ public int setBytes(int index, InputStream in, int length)
+ throws IOException {
+ return buffer.setBytes(index, in, length);
+ }
+
+ @Override
+ public int setBytes(int index, ScatteringByteChannel in, int length)
+ throws IOException {
+ return buffer.setBytes(index, in, length);
+ }
+
+
+ @Override
+ public int getBytes(int index, FileChannel out, long position, int length)
+ throws IOException {
+ return buffer.getBytes(index, out, position, length);
+ }
+
+ @Override
+ public int nioBufferCount() {
+ return buffer.nioBufferCount();
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers(int index, int length) {
+ return buffer.nioBuffers(index, length);
+ }
+
+ @Override
+ public ByteBuffer internalNioBuffer(int index, int length) {
+ return nioBuffer(index, length);
+ }
+
+ @Override
+ public int forEachByte(int index, int length, ByteProcessor processor) {
+ return buffer.forEachByte(index, length, processor);
+ }
+
+ @Override
+ public int forEachByteDesc(int index, int length, ByteProcessor processor) {
+ return buffer.forEachByteDesc(index, length, processor);
+ }
+
+ @Override
+ public final int refCnt() {
+ return unwrap().refCnt();
+ }
+
+ @Override
+ public final ByteBuf touch() {
+ unwrap().touch();
+ return this;
+ }
+
+ @Override
+ public final ByteBuf touch(Object hint) {
+ unwrap().touch(hint);
+ return this;
+ }
+
+ @Override
+ public final ByteBuf retain() {
+ unwrap().retain();
+ return this;
+ }
+
+ @Override
+ public final ByteBuf retain(int increment) {
+ unwrap().retain(increment);
+ return this;
+ }
+
+ @Override
+ public boolean release() {
+ return release(1);
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ boolean released = unwrap().release(decrement);
+ return released;
+ }
+
+}
diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/NettyArrowBuf.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/NettyArrowBuf.java
new file mode 100644
index 000000000..8681b005f
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/NettyArrowBuf.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.ArrowByteBufAllocator;
+import org.apache.arrow.memory.BoundsChecking;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.Preconditions;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Netty specific wrapper over ArrowBuf for use in Netty framework.
+ */
+public class NettyArrowBuf extends AbstractByteBuf implements AutoCloseable {
+
+ private final ArrowBuf arrowBuf;
+ private final ArrowByteBufAllocator arrowByteBufAllocator;
+ private int length;
+ private final long address;
+
+ /**
+ * Constructs a new instance.
+ *
+ * @param arrowBuf The buffer to wrap.
+ * @param bufferAllocator The allocator for the buffer.
+ * @param length The length of this buffer.
+ */
+ public NettyArrowBuf(
+ final ArrowBuf arrowBuf,
+ final BufferAllocator bufferAllocator,
+ final int length) {
+ super(length);
+ this.arrowBuf = arrowBuf;
+ this.arrowByteBufAllocator = new ArrowByteBufAllocator(bufferAllocator);
+ this.length = length;
+ this.address = arrowBuf.memoryAddress();
+ }
+
+ @Override
+ public ByteBuf copy() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuf retain() {
+ arrowBuf.getReferenceManager().retain();
+ return this;
+ }
+
+ public ArrowBuf arrowBuf() {
+ return arrowBuf;
+ }
+
+ @Override
+ public ByteBuf retain(final int increment) {
+ arrowBuf.getReferenceManager().retain(increment);
+ return this;
+ }
+
+ @Override
+ public boolean isDirect() {
+ return true;
+ }
+
+ @Override
+ public synchronized ByteBuf capacity(int newCapacity) {
+ if (newCapacity == length) {
+ return this;
+ }
+ Preconditions.checkArgument(newCapacity >= 0);
+ if (newCapacity < length) {
+ length = newCapacity;
+ return this;
+ }
+ throw new UnsupportedOperationException("Buffers don't support resizing that increases the size.");
+ }
+
+ @Override
+ public ByteBuf unwrap() {
+ throw new UnsupportedOperationException("Unwrap not supported.");
+ }
+
+ @Override
+ public int refCnt() {
+ return arrowBuf.getReferenceManager().getRefCount();
+ }
+
+ @Override
+ public ArrowByteBufAllocator alloc() {
+ return arrowByteBufAllocator;
+ }
+
+ @Override
+ public boolean hasArray() {
+ return false;
+ }
+
+ @Override
+ public byte[] array() {
+ throw new UnsupportedOperationException("Operation not supported on direct buffer");
+ }
+
+ @Override
+ public int arrayOffset() {
+ throw new UnsupportedOperationException("Operation not supported on direct buffer");
+ }
+
+ @Override
+ public boolean hasMemoryAddress() {
+ return true;
+ }
+
+ @Override
+ public long memoryAddress() {
+ return this.address;
+ }
+
+ @Override
+ public ByteBuf touch() {
+ return this;
+ }
+
+ @Override
+ public ByteBuf touch(Object hint) {
+ return this;
+ }
+
+ @Override
+ public int capacity() {
+ return (int) Math.min(Integer.MAX_VALUE, arrowBuf.capacity());
+ }
+
+ @Override
+ public NettyArrowBuf slice() {
+ return unwrapBuffer(arrowBuf.slice(readerIndex, writerIndex - readerIndex));
+ }
+
+ @Override
+ public NettyArrowBuf slice(int index, int length) {
+ return unwrapBuffer(arrowBuf.slice(index, length));
+ }
+
+ @Override
+ public void close() {
+ arrowBuf.close();
+ }
+
+ @Override
+ public boolean release() {
+ return arrowBuf.getReferenceManager().release();
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ return arrowBuf.getReferenceManager().release(decrement);
+ }
+
+ @Override
+ public NettyArrowBuf readerIndex(int readerIndex) {
+ super.readerIndex(readerIndex);
+ return this;
+ }
+
+ @Override
+ public NettyArrowBuf writerIndex(int writerIndex) {
+ super.writerIndex(writerIndex);
+ return this;
+ }
+
+ @Override
+ public int nioBufferCount() {
+ return 1;
+ }
+
+ @Override
+ public ByteBuffer internalNioBuffer(int index, int length) {
+ ByteBuffer nioBuf = getDirectBuffer(index);
+ // Follows convention from other ByteBuf implementations.
+ return (ByteBuffer) nioBuf.clear().limit(length);
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers() {
+ return new ByteBuffer[] {nioBuffer()};
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers(int index, int length) {
+ return new ByteBuffer[] {nioBuffer(index, length)};
+ }
+
+ @Override
+ public ByteBuffer nioBuffer() {
+ return nioBuffer(readerIndex(), readableBytes());
+ }
+
+
+ /**
+ * Returns a buffer that is zero positioned but points
+ * to a slice of the original buffer starting at given index.
+ */
+ @Override
+ public ByteBuffer nioBuffer(int index, int length) {
+ chk(index, length);
+ final ByteBuffer buffer = getDirectBuffer(index);
+ buffer.limit(length);
+ return buffer;
+ }
+
+ /**
+ * Returns a buffer that is zero positioned but points
+ * to a slice of the original buffer starting at given index.
+ */
+ public ByteBuffer nioBuffer(long index, int length) {
+ chk(index, length);
+ final ByteBuffer buffer = getDirectBuffer(index);
+ buffer.limit(length);
+ return buffer;
+ }
+
+ /**
+ * Get this ArrowBuf as a direct {@link ByteBuffer}.
+ *
+ * @return ByteBuffer
+ */
+ private ByteBuffer getDirectBuffer(long index) {
+ return PlatformDependent.directBuffer(addr(index), checkedCastToInt(length - index));
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuffer dst) {
+ arrowBuf.getBytes(index, dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuffer src) {
+ arrowBuf.setBytes(index, src);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+ arrowBuf.getBytes(index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+ arrowBuf.setBytes(index, src, srcIndex, length);
+ return this;
+ }
+
+ /**
+ * Determine if the requested {@code index} and {@code length} will fit within {@code capacity}.
+ *
+ * @param index The starting index.
+ * @param length The length which will be utilized (starting from {@code index}).
+ * @param capacity The capacity that {@code index + length} is allowed to be within.
+ * @return {@code true} if the requested {@code index} and {@code length} will fit within {@code capacity}.
+ * {@code false} if this would result in an index out of bounds exception.
+ */
+ private static boolean isOutOfBounds(int index, int length, int capacity) {
+ return (index | length | (index + length) | (capacity - (index + length))) < 0;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+ chk(index, length);
+ Preconditions.checkArgument(dst != null, "Expecting valid dst ByteBuffer");
+ if (isOutOfBounds(dstIndex, length, dst.capacity())) {
+ throw new IndexOutOfBoundsException("dstIndex: " + dstIndex + " length: " + length);
+ } else {
+ final long srcAddress = addr(index);
+ if (dst.hasMemoryAddress()) {
+ final long dstAddress = dst.memoryAddress() + (long) dstIndex;
+ PlatformDependent.copyMemory(srcAddress, dstAddress, (long) length);
+ } else if (dst.hasArray()) {
+ dstIndex += dst.arrayOffset();
+ PlatformDependent.copyMemory(srcAddress, dst.array(), dstIndex, (long) length);
+ } else {
+ dst.setBytes(dstIndex, this, index, length);
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+ chk(index, length);
+ Preconditions.checkArgument(src != null, "Expecting valid src ByteBuffer");
+ if (isOutOfBounds(srcIndex, length, src.capacity())) {
+ throw new IndexOutOfBoundsException("srcIndex: " + srcIndex + " length: " + length);
+ } else {
+ if (length != 0) {
+ final long dstAddress = addr(index);
+ if (src.hasMemoryAddress()) {
+ final long srcAddress = src.memoryAddress() + (long) srcIndex;
+ PlatformDependent.copyMemory(srcAddress, dstAddress, (long) length);
+ } else if (src.hasArray()) {
+ srcIndex += src.arrayOffset();
+ PlatformDependent.copyMemory(src.array(), srcIndex, dstAddress, (long) length);
+ } else {
+ src.getBytes(srcIndex, this, index, length);
+ }
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+ arrowBuf.getBytes(index, out, length);
+ return this;
+ }
+
+ @Override
+ public int setBytes(int index, InputStream in, int length) throws IOException {
+ return arrowBuf.setBytes(index, in, length);
+ }
+
+ @Override
+ public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+ Preconditions.checkArgument(out != null, "expecting valid gathering byte channel");
+ chk(index, length);
+ if (length == 0) {
+ return 0;
+ } else {
+ final ByteBuffer tmpBuf = getDirectBuffer(index);
+ tmpBuf.clear().limit(length);
+ return out.write(tmpBuf);
+ }
+ }
+
+ @Override
+ public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+ chk(index, length);
+ if (length == 0) {
+ return 0;
+ } else {
+ final ByteBuffer tmpBuf = getDirectBuffer(index);
+ tmpBuf.clear().limit(length);
+ return out.write(tmpBuf, position);
+ }
+ }
+
+ @Override
+ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+ return (int) in.read(nioBuffers(index, length));
+ }
+
+ @Override
+ public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+ return (int) in.read(nioBuffers(index, length));
+ }
+
+ @Override
+ public ByteOrder order() {
+ return ByteOrder.LITTLE_ENDIAN;
+ }
+
+ @Override
+ public ByteBuf order(ByteOrder endianness) {
+ return this;
+ }
+
+ @Override
+ protected int _getUnsignedMedium(int index) {
+ return getUnsignedMedium(index);
+ }
+
+ @Override
+ protected int _getUnsignedMediumLE(int index) {
+ this.chk(index, 3);
+ long addr = this.addr(index);
+ return PlatformDependent.getByte(addr) & 255 |
+ (Short.reverseBytes(PlatformDependent.getShort(addr + 1L)) & '\uffff') << 8;
+ }
+
+
+ /*-------------------------------------------------*
+ | |
+ | get() APIs |
+ | |
+ *-------------------------------------------------*/
+
+
+ @Override
+ protected byte _getByte(int index) {
+ return getByte(index);
+ }
+
+ @Override
+ public byte getByte(int index) {
+ return arrowBuf.getByte(index);
+ }
+
+ @Override
+ protected short _getShortLE(int index) {
+ short s = getShort(index);
+ return Short.reverseBytes(s);
+ }
+
+ @Override
+ protected short _getShort(int index) {
+ return getShort(index);
+ }
+
+ @Override
+ public short getShort(int index) {
+ return arrowBuf.getShort(index);
+ }
+
+ @Override
+ protected int _getIntLE(int index) {
+ int value = getInt(index);
+ return Integer.reverseBytes(value);
+ }
+
+ @Override
+ protected int _getInt(int index) {
+ return getInt(index);
+ }
+
+ @Override
+ public int getInt(int index) {
+ return arrowBuf.getInt(index);
+ }
+
+ @Override
+ protected long _getLongLE(int index) {
+ long value = getLong(index);
+ return Long.reverseBytes(value);
+ }
+
+ @Override
+ protected long _getLong(int index) {
+ return getLong(index);
+ }
+
+ @Override
+ public long getLong(int index) {
+ return arrowBuf.getLong(index);
+ }
+
+
+ /*-------------------------------------------------*
+ | |
+ | set() APIs |
+ | |
+ *-------------------------------------------------*/
+
+
+ @Override
+ protected void _setByte(int index, int value) {
+ setByte(index, value);
+ }
+
+ @Override
+ public NettyArrowBuf setByte(int index, int value) {
+ arrowBuf.setByte(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setShortLE(int index, int value) {
+ this.chk(index, 2);
+ PlatformDependent.putShort(this.addr(index), Short.reverseBytes((short) value));
+ }
+
+ @Override
+ protected void _setShort(int index, int value) {
+ setShort(index, value);
+ }
+
+ @Override
+ public NettyArrowBuf setShort(int index, int value) {
+ arrowBuf.setShort(index, value);
+ return this;
+ }
+
+ private long addr(long index) {
+ return address + index;
+ }
+
+ /**
+ * Helper function to do bounds checking at a particular
+ * index for particular length of data.
+ *
+ * @param index index (0 based relative to this ArrowBuf)
+ * @param fieldLength provided length of data for get/set
+ */
+ private void chk(long index, long fieldLength) {
+ if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
+ // check reference count
+ ensureAccessible();
+ // check bounds
+ if (fieldLength < 0) {
+ throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
+ }
+ if (index < 0 || index > capacity() - fieldLength) {
+ throw new IndexOutOfBoundsException(String.format(
+ "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
+ }
+ }
+ }
+
+ @Override
+ protected void _setMedium(int index, int value) {
+ setMedium(index, value);
+ }
+
+ @Override
+ protected void _setMediumLE(int index, int value) {
+ this.chk(index, 3);
+ long addr = this.addr(index);
+ PlatformDependent.putByte(addr, (byte) value);
+ PlatformDependent.putShort(addr + 1L, Short.reverseBytes((short) (value >>> 8)));
+ }
+
+ @Override
+ public NettyArrowBuf setMedium(int index, int value) {
+ chk(index, 3);
+ final long addr = addr(index);
+ // we need to store 3 bytes starting from least significant byte
+ // and ignoring the most significant byte
+ // since arrow memory format is little endian, we will
+ // first store the first 2 bytes followed by third byte
+ // example: if the 4 byte int value is ABCD where A is MSB
+ // D is LSB then we effectively want to store DCB in increasing
+ // address to get Little Endian byte order
+ // (short)value will give us CD and PlatformDependent.putShort()
+ // will store them in LE order as DC starting at address addr
+ // in order to get B, we do ABCD >>> 16 = 00AB => (byte)AB which
+ // gives B. We store this at address addr + 2. So finally we get
+ // DCB
+ PlatformDependent.putShort(addr, (short) value);
+ PlatformDependent.putByte(addr + 2, (byte) (value >>> 16));
+ return this;
+ }
+
+ @Override
+ protected void _setInt(int index, int value) {
+ setInt(index, value);
+ }
+
+ @Override
+ protected void _setIntLE(int index, int value) {
+ this.chk(index, 4);
+ PlatformDependent.putInt(this.addr(index), Integer.reverseBytes(value));
+ }
+
+ @Override
+ public NettyArrowBuf setInt(int index, int value) {
+ arrowBuf.setInt(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setLong(int index, long value) {
+ setLong(index, value);
+ }
+
+ @Override
+ public void _setLongLE(int index, long value) {
+ this.chk(index, 8);
+ PlatformDependent.putLong(this.addr(index), Long.reverseBytes(value));
+ }
+
+ @Override
+ public NettyArrowBuf setLong(int index, long value) {
+ arrowBuf.setLong(index, value);
+ return this;
+ }
+
+ /**
+ * unwrap arrow buffer into a netty buffer.
+ */
+ public static NettyArrowBuf unwrapBuffer(ArrowBuf buf) {
+ final NettyArrowBuf nettyArrowBuf = new NettyArrowBuf(
+ buf,
+ buf.getReferenceManager().getAllocator(),
+ checkedCastToInt(buf.capacity()));
+ nettyArrowBuf.readerIndex(checkedCastToInt(buf.readerIndex()));
+ nettyArrowBuf.writerIndex(checkedCastToInt(buf.writerIndex()));
+ return nettyArrowBuf;
+ }
+
+}
diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
new file mode 100644
index 000000000..d0a5a9945
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.memory.util.LargeMemoryUtil;
+
+import io.netty.util.internal.OutOfDirectMemoryError;
+import io.netty.util.internal.StringUtil;
+
+/**
+ * The base allocator that we use for all of Arrow's memory management. Returns
+ * UnsafeDirectLittleEndian buffers.
+ */
+public class PooledByteBufAllocatorL {
+
+ private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");
+
+ private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+ public final UnsafeDirectLittleEndian empty;
+ private final AtomicLong hugeBufferSize = new AtomicLong(0);
+ private final AtomicLong hugeBufferCount = new AtomicLong(0);
+ private final AtomicLong normalBufferSize = new AtomicLong(0);
+ private final AtomicLong normalBufferCount = new AtomicLong(0);
+ private final InnerAllocator allocator;
+
+ public PooledByteBufAllocatorL() {
+ allocator = new InnerAllocator();
+ empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
+ }
+
+ /**
+ * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size.
+ */
+ public UnsafeDirectLittleEndian allocate(long size) {
+ try {
+ return allocator.directBuffer(LargeMemoryUtil.checkedCastToInt(size), Integer.MAX_VALUE);
+ } catch (OutOfMemoryError e) {
+ /*
+ * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by
+ * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by
+ * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice
+ * as Netty is expected to throw an OutOfDirectMemoryError first.
+ */
+ if (e instanceof OutOfDirectMemoryError || "Direct buffer memory".equals(e.getMessage())) {
+ throw new OutOfMemoryException("Failure allocating buffer.", e);
+ }
+ throw e;
+ }
+ }
+
+ public int getChunkSize() {
+ return allocator.chunkSize;
+ }
+
+ public long getHugeBufferSize() {
+ return hugeBufferSize.get();
+ }
+
+ public long getHugeBufferCount() {
+ return hugeBufferCount.get();
+ }
+
+ public long getNormalBufferSize() {
+ return normalBufferSize.get();
+ }
+
+ public long getNormalBufferCount() {
+ return normalBufferSize.get();
+ }
+
+ private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
+
+ private final long initialCapacity;
+ private final AtomicLong count;
+ private final AtomicLong size;
+
+ private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) {
+ super(buf);
+ this.initialCapacity = buf.capacity();
+ this.count = count;
+ this.size = size;
+ }
+
+ private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count,
+ AtomicLong size) {
+ super(buf);
+ this.initialCapacity = buf.capacity();
+ this.count = count;
+ this.size = size;
+ }
+
+ @Override
+ public ByteBuf copy() {
+ throw new UnsupportedOperationException("copy method is not supported");
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ throw new UnsupportedOperationException("copy method is not supported");
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ boolean released = super.release(decrement);
+ if (released) {
+ count.decrementAndGet();
+ size.addAndGet(-initialCapacity);
+ }
+ return released;
+ }
+
+ }
+
+ private class InnerAllocator extends PooledByteBufAllocator {
+
+ private final PoolArena<ByteBuffer>[] directArenas;
+ private final MemoryStatusThread statusThread;
+ private final int chunkSize;
+
+ public InnerAllocator() {
+ super(true);
+
+ try {
+ Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
+ f.setAccessible(true);
+ this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
+ } catch (Exception e) {
+ throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e);
+ }
+
+ this.chunkSize = directArenas[0].chunkSize;
+
+ if (memoryLogger.isTraceEnabled()) {
+ statusThread = new MemoryStatusThread();
+ statusThread.start();
+ } else {
+ statusThread = null;
+ }
+ }
+
+ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
+ PoolThreadCache cache = threadCache();
+ PoolArena<ByteBuffer> directArena = cache.directArena;
+
+ if (directArena != null) {
+
+ if (initialCapacity > directArena.chunkSize) {
+ // This is beyond chunk size so we'll allocate separately.
+ ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+
+ hugeBufferSize.addAndGet(buf.capacity());
+ hugeBufferCount.incrementAndGet();
+
+ // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
+ return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount,
+ hugeBufferSize);
+ } else {
+ // within chunk, use arena.
+ ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
+ if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
+ fail();
+ }
+
+ if (!ASSERT_ENABLED) {
+ return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf);
+ }
+
+ normalBufferSize.addAndGet(buf.capacity());
+ normalBufferCount.incrementAndGet();
+
+ return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf,
+ normalBufferCount, normalBufferSize);
+ }
+
+ } else {
+ throw fail();
+ }
+ }
+
+ private UnsupportedOperationException fail() {
+ return new UnsupportedOperationException(
+ "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " +
+ "didn't provide that functionality.");
+ }
+
+ @Override
+ public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
+ if (initialCapacity == 0 && maxCapacity == 0) {
+ newDirectBuffer(initialCapacity, maxCapacity);
+ }
+ validate(initialCapacity, maxCapacity);
+ return newDirectBufferL(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+ throw new UnsupportedOperationException("Arrow doesn't support using heap buffers.");
+ }
+
+
+ private void validate(int initialCapacity, int maxCapacity) {
+ if (initialCapacity < 0) {
+ throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: 0+)");
+ }
+ if (initialCapacity > maxCapacity) {
+ throw new IllegalArgumentException(String.format(
+ "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
+ initialCapacity, maxCapacity));
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(directArenas.length);
+ buf.append(" direct arena(s):");
+ buf.append(StringUtil.NEWLINE);
+ for (PoolArena<ByteBuffer> a : directArenas) {
+ buf.append(a);
+ }
+
+ buf.append("Large buffers outstanding: ");
+ buf.append(hugeBufferCount.get());
+ buf.append(" totaling ");
+ buf.append(hugeBufferSize.get());
+ buf.append(" bytes.");
+ buf.append('\n');
+ buf.append("Normal buffers outstanding: ");
+ buf.append(normalBufferCount.get());
+ buf.append(" totaling ");
+ buf.append(normalBufferSize.get());
+ buf.append(" bytes.");
+ return buf.toString();
+ }
+
+ private class MemoryStatusThread extends Thread {
+
+ public MemoryStatusThread() {
+ super("allocation.logger");
+ this.setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
+ try {
+ Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+ }
+
+
+ }
+}
diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
new file mode 100644
index 000000000..e900b1ca7
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteOrder;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs
+ * to abstract away the
+ * Netty classes and underlying Netty memory management.
+ */
+public class UnsafeDirectLittleEndian extends WrappedByteBuf {
+
+ public static final boolean ASSERT_ENABLED;
+ private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+
+ static {
+ boolean isAssertEnabled = false;
+ assert isAssertEnabled = true;
+ ASSERT_ENABLED = isAssertEnabled;
+ }
+
+ public final long id = ID_GENERATOR.incrementAndGet();
+ private final AbstractByteBuf wrapped;
+ private final long memoryAddress;
+
+ UnsafeDirectLittleEndian(DuplicatedByteBuf buf) {
+ this(buf, true);
+ }
+
+ UnsafeDirectLittleEndian(LargeBuffer buf) {
+ this(buf, true);
+ }
+
+ UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) {
+ this(buf, true);
+ }
+
+ private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
+ super(buf);
+
+ this.wrapped = buf;
+ this.memoryAddress = buf.memoryAddress();
+ }
+
+ private long addr(int index) {
+ return memoryAddress + index;
+ }
+
+ @Override
+ public long getLong(int index) {
+ // wrapped.checkIndex(index, 8);
+ long v = PlatformDependent.getLong(addr(index));
+ return v;
+ }
+
+ @Override
+ public float getFloat(int index) {
+ return Float.intBitsToFloat(getInt(index));
+ }
+
+ @Override
+ public ByteBuf slice() {
+ return slice(this.readerIndex(), readableBytes());
+ }
+
+ @Override
+ public ByteBuf slice(int index, int length) {
+ return new SlicedByteBuf(this, index, length);
+ }
+
+ @Override
+ public ByteBuf order(ByteOrder endianness) {
+ return this;
+ }
+
+ @Override
+ public double getDouble(int index) {
+ return Double.longBitsToDouble(getLong(index));
+ }
+
+ @Override
+ public char getChar(int index) {
+ return (char) getShort(index);
+ }
+
+ @Override
+ public long getUnsignedInt(int index) {
+ return getInt(index) & 0xFFFFFFFFL;
+ }
+
+ @Override
+ public int getInt(int index) {
+ int v = PlatformDependent.getInt(addr(index));
+ return v;
+ }
+
+ @Override
+ public int getUnsignedShort(int index) {
+ return getShort(index) & 0xFFFF;
+ }
+
+ @Override
+ public short getShort(int index) {
+ short v = PlatformDependent.getShort(addr(index));
+ return v;
+ }
+
+ @Override
+ public ByteBuf setShort(int index, int value) {
+ wrapped.checkIndex(index, 2);
+ setShort_(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setInt(int index, int value) {
+ wrapped.checkIndex(index, 4);
+ setInt_(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setLong(int index, long value) {
+ wrapped.checkIndex(index, 8);
+ setLong_(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setChar(int index, int value) {
+ setShort(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setFloat(int index, float value) {
+ setInt(index, Float.floatToRawIntBits(value));
+ return this;
+ }
+
+ @Override
+ public ByteBuf setDouble(int index, double value) {
+ setLong(index, Double.doubleToRawLongBits(value));
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeShort(int value) {
+ wrapped.ensureWritable(2);
+ setShort_(wrapped.writerIndex, value);
+ wrapped.writerIndex += 2;
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeInt(int value) {
+ wrapped.ensureWritable(4);
+ setInt_(wrapped.writerIndex, value);
+ wrapped.writerIndex += 4;
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeLong(long value) {
+ wrapped.ensureWritable(8);
+ setLong_(wrapped.writerIndex, value);
+ wrapped.writerIndex += 8;
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeChar(int value) {
+ writeShort(value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeFloat(float value) {
+ writeInt(Float.floatToRawIntBits(value));
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeDouble(double value) {
+ writeLong(Double.doubleToRawLongBits(value));
+ return this;
+ }
+
+ private void setShort_(int index, int value) {
+ PlatformDependent.putShort(addr(index), (short) value);
+ }
+
+ private void setInt_(int index, int value) {
+ PlatformDependent.putInt(addr(index), value);
+ }
+
+ private void setLong_(int index, long value) {
+ PlatformDependent.putLong(addr(index), value);
+ }
+
+ @Override
+ public byte getByte(int index) {
+ return PlatformDependent.getByte(addr(index));
+ }
+
+ @Override
+ public ByteBuf setByte(int index, int value) {
+ PlatformDependent.putByte(addr(index), (byte) value);
+ return this;
+ }
+
+ @Override
+ public boolean release() {
+ return release(1);
+ }
+
+ @Override
+ public int setBytes(int index, InputStream in, int length) throws IOException {
+ wrapped.checkIndex(index, length);
+ byte[] tmp = new byte[length];
+ int readBytes = in.read(tmp);
+ if (readBytes > 0) {
+ PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes);
+ }
+ return readBytes;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+ wrapped.checkIndex(index, length);
+ if (length != 0) {
+ byte[] tmp = new byte[length];
+ PlatformDependent.copyMemory(addr(index), tmp, 0, length);
+ out.write(tmp);
+ }
+ return this;
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(this);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj;
+ }
+}
diff --git a/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
new file mode 100644
index 000000000..ff40b49ff
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import io.netty.buffer.AbstractByteBufAllocator;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.ExpandableByteBuf;
+import io.netty.buffer.NettyArrowBuf;
+
+/**
+ * An implementation of ByteBufAllocator that wraps a Arrow BufferAllocator. This allows the RPC
+ * layer to be accounted
+ * and managed using Arrow's BufferAllocator infrastructure. The only thin different from a
+ * typical BufferAllocator is
+ * the signature and the fact that this Allocator returns ExpandableByteBufs which enable
+ * otherwise non-expandable
+ * ArrowBufs to be expandable.
+ *
+ * @deprecated This class may be removed in a future release.
+ */
+@Deprecated
+public class ArrowByteBufAllocator extends AbstractByteBufAllocator {
+
+ private static final int DEFAULT_BUFFER_SIZE = 4096;
+ private static final int DEFAULT_MAX_COMPOSITE_COMPONENTS = 16;
+
+ private final BufferAllocator allocator;
+
+ public ArrowByteBufAllocator(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ public BufferAllocator unwrap() {
+ return allocator;
+ }
+
+ @Override
+ public ByteBuf buffer() {
+ return buffer(DEFAULT_BUFFER_SIZE);
+ }
+
+ @Override
+ public ByteBuf buffer(int initialCapacity) {
+ return new ExpandableByteBuf(NettyArrowBuf.unwrapBuffer(allocator.buffer(initialCapacity)), allocator);
+ }
+
+ @Override
+ public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+ return buffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf ioBuffer() {
+ return buffer();
+ }
+
+ @Override
+ public ByteBuf ioBuffer(int initialCapacity) {
+ return buffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
+ return buffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf directBuffer() {
+ return buffer();
+ }
+
+ @Override
+ public ByteBuf directBuffer(int initialCapacity) {
+ return NettyArrowBuf.unwrapBuffer(allocator.buffer(initialCapacity));
+ }
+
+ @Override
+ public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
+ return buffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public CompositeByteBuf compositeBuffer() {
+ return compositeBuffer(DEFAULT_MAX_COMPOSITE_COMPONENTS);
+ }
+
+ @Override
+ public CompositeByteBuf compositeBuffer(int maxNumComponents) {
+ return new CompositeByteBuf(this, true, maxNumComponents);
+ }
+
+ @Override
+ public CompositeByteBuf compositeDirectBuffer() {
+ return compositeBuffer();
+ }
+
+ @Override
+ public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
+ return compositeBuffer(maxNumComponents);
+ }
+
+ @Override
+ public boolean isDirectBufferPooled() {
+ return false;
+ }
+
+ @Override
+ public ByteBuf heapBuffer() {
+ throw fail();
+ }
+
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity) {
+ throw fail();
+ }
+
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+ throw fail();
+ }
+
+ @Override
+ public CompositeByteBuf compositeHeapBuffer() {
+ throw fail();
+ }
+
+ @Override
+ public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
+ throw fail();
+ }
+
+ @Override
+ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+ throw fail();
+ }
+
+ @Override
+ protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+ return buffer(initialCapacity, maxCapacity);
+ }
+
+ private RuntimeException fail() {
+ throw new UnsupportedOperationException("Allocator doesn't support heap-based memory.");
+ }
+}
diff --git a/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java
new file mode 100644
index 000000000..10cfb5c16
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+/**
+ * The default Allocation Manager Factory for a module.
+ *
+ */
+public class DefaultAllocationManagerFactory implements AllocationManager.Factory {
+
+ public static final AllocationManager.Factory FACTORY = NettyAllocationManager.FACTORY;
+
+ @Override
+ public AllocationManager create(BufferAllocator accountingAllocator, long size) {
+ return FACTORY.create(accountingAllocator, size);
+ }
+
+ @Override
+ public ArrowBuf empty() {
+ return FACTORY.empty();
+ }
+
+}
diff --git a/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java
new file mode 100644
index 000000000..200047783
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * The default implementation of {@link AllocationManager}. The implementation is responsible for managing when memory
+ * is allocated and returned to the Netty-based PooledByteBufAllocatorL.
+ */
+public class NettyAllocationManager extends AllocationManager {
+
+ public static final AllocationManager.Factory FACTORY = new AllocationManager.Factory() {
+
+ @Override
+ public AllocationManager create(BufferAllocator accountingAllocator, long size) {
+ return new NettyAllocationManager(accountingAllocator, size);
+ }
+
+ @Override
+ public ArrowBuf empty() {
+ return EMPTY_BUFFER;
+ }
+ };
+
+ /**
+ * The default cut-off value for switching allocation strategies.
+ * If the request size is not greater than the cut-off value, we will allocate memory by
+ * {@link PooledByteBufAllocatorL} APIs,
+ * otherwise, we will use {@link PlatformDependent} APIs.
+ */
+ public static final int DEFAULT_ALLOCATION_CUTOFF_VALUE = Integer.MAX_VALUE;
+
+ private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL();
+ static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty;
+ static final ArrowBuf EMPTY_BUFFER = new ArrowBuf(ReferenceManager.NO_OP,
+ null,
+ 0,
+ NettyAllocationManager.EMPTY.memoryAddress());
+ static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize();
+
+ private final long allocatedSize;
+ private final UnsafeDirectLittleEndian memoryChunk;
+ private final long allocatedAddress;
+
+ /**
+ * The cut-off value for switching allocation strategies.
+ */
+ private final int allocationCutOffValue;
+
+ NettyAllocationManager(BufferAllocator accountingAllocator, long requestedSize, int allocationCutOffValue) {
+ super(accountingAllocator);
+ this.allocationCutOffValue = allocationCutOffValue;
+
+ if (requestedSize > allocationCutOffValue) {
+ this.memoryChunk = null;
+ this.allocatedAddress = PlatformDependent.allocateMemory(requestedSize);
+ this.allocatedSize = requestedSize;
+ } else {
+ this.memoryChunk = INNER_ALLOCATOR.allocate(requestedSize);
+ this.allocatedAddress = memoryChunk.memoryAddress();
+ this.allocatedSize = memoryChunk.capacity();
+ }
+ }
+
+ NettyAllocationManager(BufferAllocator accountingAllocator, long requestedSize) {
+ this(accountingAllocator, requestedSize, DEFAULT_ALLOCATION_CUTOFF_VALUE);
+ }
+
+ /**
+ * Get the underlying memory chunk managed by this AllocationManager.
+ * @return the underlying memory chunk if the request size is not greater than the
+ * {@link NettyAllocationManager#allocationCutOffValue}, or null otherwise.
+ *
+ * @deprecated this method will be removed in a future release.
+ */
+ @Deprecated
+ UnsafeDirectLittleEndian getMemoryChunk() {
+ return memoryChunk;
+ }
+
+ @Override
+ protected long memoryAddress() {
+ return allocatedAddress;
+ }
+
+ @Override
+ protected void release0() {
+ if (memoryChunk == null) {
+ PlatformDependent.freeMemory(allocatedAddress);
+ } else {
+ memoryChunk.release();
+ }
+ }
+
+ /**
+ * Returns the underlying memory chunk size managed.
+ *
+ * <p>NettyAllocationManager rounds requested size up to the next power of two.
+ */
+ @Override
+ public long getSize() {
+ return allocatedSize;
+ }
+
+}
diff --git a/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestNettyArrowBuf.java b/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestNettyArrowBuf.java
new file mode 100644
index 000000000..916cf82e7
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestNettyArrowBuf.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.ArrowByteBufAllocator;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestNettyArrowBuf {
+
+ @Test
+ public void testSliceWithoutArgs() {
+ try (BufferAllocator allocator = new RootAllocator(128);
+ ArrowBuf buf = allocator.buffer(20);
+ ) {
+ NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf);
+ nettyBuf.writerIndex(20);
+ nettyBuf.readerIndex(10);
+ NettyArrowBuf slicedBuffer = nettyBuf.slice();
+ int readableBytes = slicedBuffer.readableBytes();
+ Assert.assertEquals(10, readableBytes);
+ }
+ }
+
+ @Test
+ public void testNioBuffer() {
+ try (BufferAllocator allocator = new RootAllocator(128);
+ ArrowBuf buf = allocator.buffer(20);
+ ) {
+ NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf);
+ ByteBuffer byteBuffer = nettyBuf.nioBuffer(4, 6);
+ // Nio Buffers should always be 0 indexed
+ Assert.assertEquals(0, byteBuffer.position());
+ Assert.assertEquals(6, byteBuffer.limit());
+ // Underlying buffer has size 32 excluding 4 should have capacity of 28.
+ Assert.assertEquals(28, byteBuffer.capacity());
+
+ }
+ }
+
+ @Test
+ public void testInternalNioBuffer() {
+ try (BufferAllocator allocator = new RootAllocator(128);
+ ArrowBuf buf = allocator.buffer(20);
+ ) {
+ NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf);
+ ByteBuffer byteBuffer = nettyBuf.internalNioBuffer(4, 6);
+ Assert.assertEquals(0, byteBuffer.position());
+ Assert.assertEquals(6, byteBuffer.limit());
+ // Underlying buffer has size 32 excluding 4 should have capacity of 28.
+ Assert.assertEquals(28, byteBuffer.capacity());
+
+ }
+ }
+
+ @Test
+ public void testSetLEValues() {
+ try (BufferAllocator allocator = new RootAllocator(128);
+ ArrowBuf buf = allocator.buffer(20);
+ ) {
+ NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf);
+ int [] intVals = new int[] {Integer.MIN_VALUE, Short.MIN_VALUE - 1, Short.MIN_VALUE, 0 ,
+ Short.MAX_VALUE , Short.MAX_VALUE + 1, Integer.MAX_VALUE};
+ for (int intValue :intVals ) {
+ nettyBuf._setInt(0, intValue);
+ Assert.assertEquals(nettyBuf._getIntLE(0), Integer.reverseBytes(intValue));
+ }
+
+ long [] longVals = new long[] {Long.MIN_VALUE, 0 , Long.MAX_VALUE};
+ for (long longValue :longVals ) {
+ nettyBuf._setLong(0, longValue);
+ Assert.assertEquals(nettyBuf._getLongLE(0), Long.reverseBytes(longValue));
+ }
+
+ short [] shortVals = new short[] {Short.MIN_VALUE, 0 , Short.MAX_VALUE};
+ for (short shortValue :shortVals ) {
+ nettyBuf._setShort(0, shortValue);
+ Assert.assertEquals(nettyBuf._getShortLE(0), Short.reverseBytes(shortValue));
+ }
+ }
+ }
+
+ @Test
+ public void testSetCompositeBuffer() {
+ try (BufferAllocator allocator = new RootAllocator(128);
+ ArrowBuf buf = allocator.buffer(20);
+ NettyArrowBuf buf2 = NettyArrowBuf.unwrapBuffer(allocator.buffer(20));
+ ) {
+ CompositeByteBuf byteBufs = new CompositeByteBuf(new ArrowByteBufAllocator(allocator),
+ true, 1);
+ int expected = 4;
+ buf2.setInt(0, expected);
+ buf2.writerIndex(4);
+ byteBufs.addComponent(true, buf2);
+ NettyArrowBuf.unwrapBuffer(buf).setBytes(0, byteBufs, 4);
+ int actual = buf.getInt(0);
+ Assert.assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void testGetCompositeBuffer() {
+ try (BufferAllocator allocator = new RootAllocator(128);
+ ArrowBuf buf = allocator.buffer(20);
+ ) {
+ CompositeByteBuf byteBufs = new CompositeByteBuf(new ArrowByteBufAllocator(allocator),
+ true, 1);
+ int expected = 4;
+ buf.setInt(0, expected);
+ NettyArrowBuf buf2 = NettyArrowBuf.unwrapBuffer(allocator.buffer(20));
+ // composite buffers are a bit weird, need to jump hoops
+ // to set capacity.
+ byteBufs.addComponent(true, buf2);
+ byteBufs.capacity(20);
+ NettyArrowBuf.unwrapBuffer(buf).getBytes(0, byteBufs, 4);
+ int actual = byteBufs.getInt(0);
+ Assert.assertEquals(expected, actual);
+ byteBufs.component(0).release();
+ }
+ }
+}
diff --git a/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestUnsafeDirectLittleEndian.java b/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestUnsafeDirectLittleEndian.java
new file mode 100644
index 000000000..c2bd95bb3
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestUnsafeDirectLittleEndian.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Test;
+
+public class TestUnsafeDirectLittleEndian {
+ private static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
+
+ @Test
+ public void testPrimitiveGetSet() {
+ ByteBuf byteBuf = Unpooled.directBuffer(64);
+ UnsafeDirectLittleEndian unsafeDirect = new UnsafeDirectLittleEndian(new LargeBuffer(byteBuf));
+
+ unsafeDirect.setByte(0, Byte.MAX_VALUE);
+ unsafeDirect.setByte(1, -1); // 0xFF
+ unsafeDirect.setShort(2, Short.MAX_VALUE);
+ unsafeDirect.setShort(4, -2); // 0xFFFE
+ unsafeDirect.setInt(8, Integer.MAX_VALUE);
+ unsafeDirect.setInt(12, -66052); // 0xFFFE FDFC
+ unsafeDirect.setLong(16, Long.MAX_VALUE);
+ unsafeDirect.setLong(24, -4295098372L); // 0xFFFF FFFE FFFD FFFC
+ unsafeDirect.setFloat(32, 1.23F);
+ unsafeDirect.setFloat(36, -1.23F);
+ unsafeDirect.setDouble(40, 1.234567D);
+ unsafeDirect.setDouble(48, -1.234567D);
+
+ assertEquals(Byte.MAX_VALUE, unsafeDirect.getByte(0));
+ assertEquals(-1, unsafeDirect.getByte(1));
+ assertEquals(Short.MAX_VALUE, unsafeDirect.getShort(2));
+ assertEquals(-2, unsafeDirect.getShort(4));
+ assertEquals((char) 65534, unsafeDirect.getChar(4));
+ assertEquals(Integer.MAX_VALUE, unsafeDirect.getInt(8));
+ assertEquals(-66052, unsafeDirect.getInt(12));
+ assertEquals(4294901244L, unsafeDirect.getUnsignedInt(12));
+ assertEquals(Long.MAX_VALUE, unsafeDirect.getLong(16));
+ assertEquals(-4295098372L, unsafeDirect.getLong(24));
+ assertEquals(1.23F, unsafeDirect.getFloat(32), 0.0);
+ assertEquals(-1.23F, unsafeDirect.getFloat(36), 0.0);
+ assertEquals(1.234567D, unsafeDirect.getDouble(40), 0.0);
+ assertEquals(-1.234567D, unsafeDirect.getDouble(48), 0.0);
+
+ byte[] inBytes = "1234567".getBytes(StandardCharsets.UTF_8);
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(inBytes);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ assertEquals(5, unsafeDirect.setBytes(56, bais, 5));
+ unsafeDirect.getBytes(56, baos, 5);
+ assertEquals("12345", new String(baos.toByteArray(), StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/ITTestLargeArrowBuf.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/ITTestLargeArrowBuf.java
new file mode 100644
index 000000000..fa8d510e3
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/ITTestLargeArrowBuf.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Integration test for large (more than 2GB) {@link org.apache.arrow.memory.ArrowBuf}.
+ * To run this test, please make sure there is at least 4GB memory in the system.
+ */
+public class ITTestLargeArrowBuf {
+ private static final Logger logger = LoggerFactory.getLogger(ITTestLargeArrowBuf.class);
+
+ private void run(long bufSize) {
+ try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ ArrowBuf largeBuf = allocator.buffer(bufSize)) {
+ assertEquals(bufSize, largeBuf.capacity());
+ logger.trace("Successfully allocated a buffer with capacity {}", largeBuf.capacity());
+
+ for (long i = 0; i < bufSize / 8; i++) {
+ largeBuf.setLong(i * 8, i);
+
+ if ((i + 1) % 10000 == 0) {
+ logger.trace("Successfully written {} long words", i + 1);
+ }
+ }
+ logger.trace("Successfully written {} long words", bufSize / 8);
+
+ for (long i = 0; i < bufSize / 8; i++) {
+ long val = largeBuf.getLong(i * 8);
+ assertEquals(i, val);
+
+ if ((i + 1) % 10000 == 0) {
+ logger.trace("Successfully read {} long words", i + 1);
+ }
+ }
+ logger.trace("Successfully read {} long words", bufSize / 8);
+ }
+ logger.trace("Successfully released the large buffer.");
+ }
+
+ @Test
+ public void testLargeArrowBuf() {
+ run(4 * 1024 * 1024 * 1024L);
+ }
+
+ @Test
+ public void testMaxIntArrowBuf() {
+ run(Integer.MAX_VALUE);
+ }
+
+}
diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestAllocationManagerNetty.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestAllocationManagerNetty.java
new file mode 100644
index 000000000..2dbd56480
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestAllocationManagerNetty.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Test cases for {@link AllocationManager}.
+ */
+public class TestAllocationManagerNetty {
+
+ @Test
+ public void testAllocationManagerType() {
+ // test netty allocation manager type
+ System.setProperty(
+ DefaultAllocationManagerOption.ALLOCATION_MANAGER_TYPE_PROPERTY_NAME, "Netty");
+ DefaultAllocationManagerOption.AllocationManagerType mgrType =
+ DefaultAllocationManagerOption.getDefaultAllocationManagerType();
+
+ assertEquals(DefaultAllocationManagerOption.AllocationManagerType.Netty, mgrType);
+ }
+}
diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
new file mode 100644
index 000000000..ef49e4178
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
@@ -0,0 +1,1183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.arrow.memory.AllocationOutcomeDetails.Entry;
+import org.apache.arrow.memory.rounding.RoundingPolicy;
+import org.apache.arrow.memory.rounding.SegmentRoundingPolicy;
+import org.apache.arrow.memory.util.AssertionUtil;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import sun.misc.Unsafe;
+
+public class TestBaseAllocator {
+
+ private static final int MAX_ALLOCATION = 8 * 1024;
+
+ /*
+ // ---------------------------------------- DEBUG -----------------------------------
+
+ @After
+ public void checkBuffers() {
+ final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
+ if (bufferCount != 0) {
+ UnsafeDirectLittleEndian.logBuffers(logger);
+ UnsafeDirectLittleEndian.releaseBuffers();
+ }
+
+ assertEquals(0, bufferCount);
+ }
+
+ // @AfterClass
+ // public static void dumpBuffers() {
+ // UnsafeDirectLittleEndian.logBuffers(logger);
+ // }
+
+ // ---------------------------------------- DEBUG ------------------------------------
+ */
+
+
+ @Test
+ public void test_privateMax() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
+ assertNotNull("allocation failed", arrowBuf1);
+
+ try (final BufferAllocator childAllocator =
+ rootAllocator.newChildAllocator("noLimits", 0, MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
+ assertNotNull("allocation failed", arrowBuf2);
+ arrowBuf2.getReferenceManager().release();
+ }
+
+ arrowBuf1.getReferenceManager().release();
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRootAllocator_closeWithOutstanding() throws Exception {
+ try {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf = rootAllocator.buffer(512);
+ assertNotNull("allocation failed", arrowBuf);
+ }
+ } finally {
+ /*
+ * We expect there to be one unreleased underlying buffer because we're closing
+ * without releasing it.
+ */
+ /*
+ // ------------------------------- DEBUG ---------------------------------
+ final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
+ UnsafeDirectLittleEndian.releaseBuffers();
+ assertEquals(1, bufferCount);
+ // ------------------------------- DEBUG ---------------------------------
+ */
+ }
+ }
+
+ @Test
+ public void testRootAllocator_getEmpty() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf = rootAllocator.buffer(0);
+ assertNotNull("allocation failed", arrowBuf);
+ assertEquals("capacity was non-zero", 0, arrowBuf.capacity());
+ assertTrue("address should be valid", arrowBuf.memoryAddress() != 0);
+ arrowBuf.getReferenceManager().release();
+ }
+ }
+
+ @Ignore // TODO(DRILL-2740)
+ @Test(expected = IllegalStateException.class)
+ public void testAllocator_unreleasedEmpty() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ @SuppressWarnings("unused")
+ final ArrowBuf arrowBuf = rootAllocator.buffer(0);
+ }
+ }
+
+ @Test
+ public void testAllocator_transferOwnership() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator1 =
+ rootAllocator.newChildAllocator("changeOwnership1", 0, MAX_ALLOCATION);
+ final BufferAllocator childAllocator2 =
+ rootAllocator.newChildAllocator("changeOwnership2", 0, MAX_ALLOCATION);
+
+ final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
+ rootAllocator.verify();
+ final ReferenceManager referenceManager = arrowBuf1.getReferenceManager();
+ OwnershipTransferResult transferOwnership = referenceManager.transferOwnership(arrowBuf1, childAllocator2);
+ assertEquiv(arrowBuf1, transferOwnership.getTransferredBuffer());
+ final boolean allocationFit = transferOwnership.getAllocationFit();
+ rootAllocator.verify();
+ assertTrue(allocationFit);
+
+ arrowBuf1.getReferenceManager().release();
+ childAllocator1.close();
+ rootAllocator.verify();
+
+ transferOwnership.getTransferredBuffer().getReferenceManager().release();
+ childAllocator2.close();
+ }
+ }
+
+ static <T> boolean equalsIgnoreOrder(Collection<T> c1, Collection<T> c2) {
+ return (c1.size() == c2.size() && c1.containsAll(c2));
+ }
+
+ @Test
+ public void testAllocator_getParentAndChild() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ assertEquals(rootAllocator.getParentAllocator(), null);
+
+ try (final BufferAllocator childAllocator1 =
+ rootAllocator.newChildAllocator("child1", 0, MAX_ALLOCATION)) {
+ assertEquals(childAllocator1.getParentAllocator(), rootAllocator);
+ assertTrue(
+ equalsIgnoreOrder(Arrays.asList(childAllocator1), rootAllocator.getChildAllocators()));
+
+ try (final BufferAllocator childAllocator2 =
+ rootAllocator.newChildAllocator("child2", 0, MAX_ALLOCATION)) {
+ assertEquals(childAllocator2.getParentAllocator(), rootAllocator);
+ assertTrue(equalsIgnoreOrder(Arrays.asList(childAllocator1, childAllocator2),
+ rootAllocator.getChildAllocators()));
+
+ try (final BufferAllocator grandChildAllocator =
+ childAllocator1.newChildAllocator("grand-child", 0, MAX_ALLOCATION)) {
+ assertEquals(grandChildAllocator.getParentAllocator(), childAllocator1);
+ assertTrue(equalsIgnoreOrder(Arrays.asList(grandChildAllocator),
+ childAllocator1.getChildAllocators()));
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAllocator_childRemovedOnClose() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator1 =
+ rootAllocator.newChildAllocator("child1", 0, MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator2 =
+ rootAllocator.newChildAllocator("child2", 0, MAX_ALLOCATION)) {
+
+ // root has two child allocators
+ assertTrue(equalsIgnoreOrder(Arrays.asList(childAllocator1, childAllocator2),
+ rootAllocator.getChildAllocators()));
+
+ try (final BufferAllocator grandChildAllocator =
+ childAllocator1.newChildAllocator("grand-child", 0, MAX_ALLOCATION)) {
+
+ // child1 has one allocator i.e grand-child
+ assertTrue(equalsIgnoreOrder(Arrays.asList(grandChildAllocator),
+ childAllocator1.getChildAllocators()));
+ }
+
+ // grand-child closed
+ assertTrue(
+ equalsIgnoreOrder(Collections.EMPTY_SET, childAllocator1.getChildAllocators()));
+ }
+ // root has only one child left
+ assertTrue(
+ equalsIgnoreOrder(Arrays.asList(childAllocator1), rootAllocator.getChildAllocators()));
+ }
+ // all child allocators closed.
+ assertTrue(equalsIgnoreOrder(Collections.EMPTY_SET, rootAllocator.getChildAllocators()));
+ }
+ }
+
+ @Test
+ public void testAllocator_shareOwnership() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("shareOwnership1", 0,
+ MAX_ALLOCATION);
+ final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("shareOwnership2", 0,
+ MAX_ALLOCATION);
+ final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
+ rootAllocator.verify();
+
+ // share ownership of buffer.
+ final ArrowBuf arrowBuf2 = arrowBuf1.getReferenceManager().retain(arrowBuf1, childAllocator2);
+ rootAllocator.verify();
+ assertNotNull(arrowBuf2);
+ assertNotEquals(arrowBuf2, arrowBuf1);
+ assertEquiv(arrowBuf1, arrowBuf2);
+
+ // release original buffer (thus transferring ownership to allocator 2. (should leave
+ // allocator 1 in empty state)
+ arrowBuf1.getReferenceManager().release();
+ rootAllocator.verify();
+ childAllocator1.close();
+ rootAllocator.verify();
+
+ final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("shareOwnership3", 0,
+ MAX_ALLOCATION);
+ final ArrowBuf arrowBuf3 = arrowBuf1.getReferenceManager().retain(arrowBuf1, childAllocator3);
+ assertNotNull(arrowBuf3);
+ assertNotEquals(arrowBuf3, arrowBuf1);
+ assertNotEquals(arrowBuf3, arrowBuf2);
+ assertEquiv(arrowBuf1, arrowBuf3);
+ rootAllocator.verify();
+
+ arrowBuf2.getReferenceManager().release();
+ rootAllocator.verify();
+ childAllocator2.close();
+ rootAllocator.verify();
+
+ arrowBuf3.getReferenceManager().release();
+ rootAllocator.verify();
+ childAllocator3.close();
+ }
+ }
+
+ @Test
+ public void testRootAllocator_createChildAndUse() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator(
+ "createChildAndUse", 0, MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf = childAllocator.buffer(512);
+ assertNotNull("allocation failed", arrowBuf);
+ arrowBuf.getReferenceManager().release();
+ }
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRootAllocator_createChildDontClose() throws Exception {
+ try {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator = rootAllocator.newChildAllocator(
+ "createChildDontClose", 0, MAX_ALLOCATION);
+ final ArrowBuf arrowBuf = childAllocator.buffer(512);
+ assertNotNull("allocation failed", arrowBuf);
+ }
+ } finally {
+ /*
+ * We expect one underlying buffer because we closed a child allocator without
+ * releasing the buffer allocated from it.
+ */
+ /*
+ // ------------------------------- DEBUG ---------------------------------
+ final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
+ UnsafeDirectLittleEndian.releaseBuffers();
+ assertEquals(1, bufferCount);
+ // ------------------------------- DEBUG ---------------------------------
+ */
+ }
+ }
+
+ @Test
+ public void testSegmentAllocator() {
+ RoundingPolicy policy = new SegmentRoundingPolicy(1024);
+ try (RootAllocator allocator = new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy)) {
+ ArrowBuf buf = allocator.buffer(798);
+ assertEquals(1024, buf.capacity());
+ buf.setInt(333, 959);
+ assertEquals(959, buf.getInt(333));
+ buf.close();
+
+ buf = allocator.buffer(1025);
+ assertEquals(2048, buf.capacity());
+ buf.setInt(193, 939);
+ assertEquals(939, buf.getInt(193));
+ buf.close();
+ }
+ }
+
+ @Test
+ public void testSegmentAllocator_childAllocator() {
+ RoundingPolicy policy = new SegmentRoundingPolicy(1024);
+ try (RootAllocator allocator = new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy);
+ BufferAllocator childAllocator = allocator.newChildAllocator("child", 0, 512 * 1024)) {
+
+ assertEquals("child", childAllocator.getName());
+
+ ArrowBuf buf = childAllocator.buffer(798);
+ assertEquals(1024, buf.capacity());
+ buf.setInt(333, 959);
+ assertEquals(959, buf.getInt(333));
+ buf.close();
+
+ buf = childAllocator.buffer(1025);
+ assertEquals(2048, buf.capacity());
+ buf.setInt(193, 939);
+ assertEquals(939, buf.getInt(193));
+ buf.close();
+ }
+ }
+
+ @Test
+ public void testSegmentAllocator_smallSegment() {
+ IllegalArgumentException e = Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new SegmentRoundingPolicy(128)
+ );
+ assertEquals("The segment size cannot be smaller than 1024", e.getMessage());
+ }
+
+ @Test
+ public void testSegmentAllocator_segmentSizeNotPowerOf2() {
+ IllegalArgumentException e = Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new SegmentRoundingPolicy(4097)
+ );
+ assertEquals("The segment size must be a power of 2", e.getMessage());
+ }
+
+ @Test
+ public void testCustomizedAllocationManager() {
+ try (BaseAllocator allocator = createAllocatorWithCustomizedAllocationManager()) {
+ final ArrowBuf arrowBuf1 = allocator.buffer(MAX_ALLOCATION);
+ assertNotNull("allocation failed", arrowBuf1);
+
+ arrowBuf1.setInt(0, 1);
+ assertEquals(1, arrowBuf1.getInt(0));
+
+ try {
+ final ArrowBuf arrowBuf2 = allocator.buffer(1);
+ fail("allocated memory beyond max allowed");
+ } catch (OutOfMemoryException e) {
+ // expected
+ }
+ arrowBuf1.getReferenceManager().release();
+
+ try {
+ arrowBuf1.getInt(0);
+ fail("data read from released buffer");
+ } catch (RuntimeException e) {
+ // expected
+ }
+ }
+ }
+
+ private BaseAllocator createAllocatorWithCustomizedAllocationManager() {
+ return new RootAllocator(BaseAllocator.configBuilder()
+ .maxAllocation(MAX_ALLOCATION)
+ .allocationManagerFactory(new AllocationManager.Factory() {
+ @Override
+ public AllocationManager create(BufferAllocator accountingAllocator, long requestedSize) {
+ return new AllocationManager(accountingAllocator) {
+ private final Unsafe unsafe = getUnsafe();
+ private final long address = unsafe.allocateMemory(requestedSize);
+
+ @Override
+ protected long memoryAddress() {
+ return address;
+ }
+
+ @Override
+ protected void release0() {
+ unsafe.setMemory(address, requestedSize, (byte) 0);
+ unsafe.freeMemory(address);
+ }
+
+ @Override
+ public long getSize() {
+ return requestedSize;
+ }
+
+ private Unsafe getUnsafe() {
+ Field f = null;
+ try {
+ f = Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (Unsafe) f.get(null);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (f != null) {
+ f.setAccessible(false);
+ }
+ }
+ }
+ };
+ }
+
+ @Override
+ public ArrowBuf empty() {
+ return null;
+ }
+ }).build());
+ }
+
+ // Allocation listener
+ // It counts the number of times it has been invoked, and how much memory allocation it has seen
+ // When set to 'expand on fail', it attempts to expand the associated allocator's limit
+ private static final class TestAllocationListener implements AllocationListener {
+ private int numPreCalls;
+ private int numCalls;
+ private int numReleaseCalls;
+ private int numChildren;
+ private long totalMem;
+ private boolean expandOnFail;
+ BufferAllocator expandAlloc;
+ long expandLimit;
+
+ TestAllocationListener() {
+ this.numCalls = 0;
+ this.numChildren = 0;
+ this.totalMem = 0;
+ this.expandOnFail = false;
+ this.expandAlloc = null;
+ this.expandLimit = 0;
+ }
+
+ @Override
+ public void onPreAllocation(long size) {
+ numPreCalls++;
+ }
+
+ @Override
+ public void onAllocation(long size) {
+ numCalls++;
+ totalMem += size;
+ }
+
+ @Override
+ public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
+ if (expandOnFail) {
+ expandAlloc.setLimit(expandLimit);
+ return true;
+ }
+ return false;
+ }
+
+
+ @Override
+ public void onRelease(long size) {
+ numReleaseCalls++;
+ }
+
+ @Override
+ public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
+ ++numChildren;
+ }
+
+ @Override
+ public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
+ --numChildren;
+ }
+
+ void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) {
+ this.expandOnFail = true;
+ this.expandAlloc = expandAlloc;
+ this.expandLimit = expandLimit;
+ }
+
+ int getNumPreCalls() {
+ return numPreCalls;
+ }
+
+ int getNumReleaseCalls() {
+ return numReleaseCalls;
+ }
+
+ int getNumCalls() {
+ return numCalls;
+ }
+
+ int getNumChildren() {
+ return numChildren;
+ }
+
+ long getTotalMem() {
+ return totalMem;
+ }
+ }
+
+ @Test
+ public void testRootAllocator_listeners() throws Exception {
+ TestAllocationListener l1 = new TestAllocationListener();
+ assertEquals(0, l1.getNumPreCalls());
+ assertEquals(0, l1.getNumCalls());
+ assertEquals(0, l1.getNumReleaseCalls());
+ assertEquals(0, l1.getNumChildren());
+ assertEquals(0, l1.getTotalMem());
+ TestAllocationListener l2 = new TestAllocationListener();
+ assertEquals(0, l2.getNumPreCalls());
+ assertEquals(0, l2.getNumCalls());
+ assertEquals(0, l2.getNumReleaseCalls());
+ assertEquals(0, l2.getNumChildren());
+ assertEquals(0, l2.getTotalMem());
+ // root and first-level child share the first listener
+ // second-level and third-level child share the second listener
+ try (final RootAllocator rootAllocator = new RootAllocator(l1, MAX_ALLOCATION)) {
+ try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", 0, MAX_ALLOCATION)) {
+ assertEquals(1, l1.getNumChildren());
+ final ArrowBuf buf1 = c1.buffer(16);
+ assertNotNull("allocation failed", buf1);
+ assertEquals(1, l1.getNumPreCalls());
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(0, l1.getNumReleaseCalls());
+ assertEquals(16, l1.getTotalMem());
+ buf1.getReferenceManager().release();
+ try (final BufferAllocator c2 = c1.newChildAllocator("c2", l2, 0, MAX_ALLOCATION)) {
+ assertEquals(2, l1.getNumChildren()); // c1 got a new child, so c1's listener (l1) is notified
+ assertEquals(0, l2.getNumChildren());
+ final ArrowBuf buf2 = c2.buffer(32);
+ assertNotNull("allocation failed", buf2);
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(16, l1.getTotalMem());
+ assertEquals(1, l2.getNumPreCalls());
+ assertEquals(1, l2.getNumCalls());
+ assertEquals(0, l2.getNumReleaseCalls());
+ assertEquals(32, l2.getTotalMem());
+ buf2.getReferenceManager().release();
+ try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0, MAX_ALLOCATION)) {
+ assertEquals(2, l1.getNumChildren());
+ assertEquals(1, l2.getNumChildren());
+ final ArrowBuf buf3 = c3.buffer(64);
+ assertNotNull("allocation failed", buf3);
+ assertEquals(1, l1.getNumPreCalls());
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(1, l1.getNumReleaseCalls());
+ assertEquals(16, l1.getTotalMem());
+ assertEquals(2, l2.getNumPreCalls());
+ assertEquals(2, l2.getNumCalls());
+ assertEquals(1, l2.getNumReleaseCalls());
+ assertEquals(32 + 64, l2.getTotalMem());
+ buf3.getReferenceManager().release();
+ }
+ assertEquals(2, l1.getNumChildren());
+ assertEquals(0, l2.getNumChildren()); // third-level child removed
+ }
+ assertEquals(1, l1.getNumChildren()); // second-level child removed
+ assertEquals(0, l2.getNumChildren());
+ }
+ assertEquals(0, l1.getNumChildren()); // first-level child removed
+
+ assertEquals(2, l2.getNumReleaseCalls());
+ }
+ }
+
+ @Test
+ public void testRootAllocator_listenerAllocationFail() throws Exception {
+ TestAllocationListener l1 = new TestAllocationListener();
+ assertEquals(0, l1.getNumCalls());
+ assertEquals(0, l1.getTotalMem());
+ // Test attempts to allocate too much from a child whose limit is set to half of the max
+ // allocation. The listener's callback triggers, expanding the child allocator's limit, so then
+ // the allocation succeeds.
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", l1, 0,
+ MAX_ALLOCATION / 2)) {
+ try {
+ c1.buffer(MAX_ALLOCATION);
+ fail("allocated memory beyond max allowed");
+ } catch (OutOfMemoryException e) {
+ // expected
+ }
+ assertEquals(0, l1.getNumCalls());
+ assertEquals(0, l1.getTotalMem());
+
+ l1.setExpandOnFail(c1, MAX_ALLOCATION);
+ ArrowBuf arrowBuf = c1.buffer(MAX_ALLOCATION);
+ assertNotNull("allocation failed", arrowBuf);
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(MAX_ALLOCATION, l1.getTotalMem());
+ arrowBuf.getReferenceManager().release();
+ }
+ }
+ }
+
+ private static void allocateAndFree(final BufferAllocator allocator) {
+ final ArrowBuf arrowBuf = allocator.buffer(512);
+ assertNotNull("allocation failed", arrowBuf);
+ arrowBuf.getReferenceManager().release();
+
+ final ArrowBuf arrowBuf2 = allocator.buffer(MAX_ALLOCATION);
+ assertNotNull("allocation failed", arrowBuf2);
+ arrowBuf2.getReferenceManager().release();
+
+ final int nBufs = 8;
+ final ArrowBuf[] arrowBufs = new ArrowBuf[nBufs];
+ for (int i = 0; i < arrowBufs.length; ++i) {
+ ArrowBuf arrowBufi = allocator.buffer(MAX_ALLOCATION / nBufs);
+ assertNotNull("allocation failed", arrowBufi);
+ arrowBufs[i] = arrowBufi;
+ }
+ for (ArrowBuf arrowBufi : arrowBufs) {
+ arrowBufi.getReferenceManager().release();
+ }
+ }
+
+ @Test
+ public void testAllocator_manyAllocations() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator =
+ rootAllocator.newChildAllocator("manyAllocations", 0, MAX_ALLOCATION)) {
+ allocateAndFree(childAllocator);
+ }
+ }
+ }
+
+ @Test
+ public void testAllocator_overAllocate() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator =
+ rootAllocator.newChildAllocator("overAllocate", 0, MAX_ALLOCATION)) {
+ allocateAndFree(childAllocator);
+
+ try {
+ childAllocator.buffer(MAX_ALLOCATION + 1);
+ fail("allocated memory beyond max allowed");
+ } catch (OutOfMemoryException e) {
+ // expected
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAllocator_overAllocateParent() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator =
+ rootAllocator.newChildAllocator("overAllocateParent", 0, MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
+ assertNotNull("allocation failed", arrowBuf1);
+ final ArrowBuf arrowBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
+ assertNotNull("allocation failed", arrowBuf2);
+
+ try {
+ childAllocator.buffer(MAX_ALLOCATION / 4);
+ fail("allocated memory beyond max allowed");
+ } catch (OutOfMemoryException e) {
+ // expected
+ }
+
+ arrowBuf1.getReferenceManager().release();
+ arrowBuf2.getReferenceManager().release();
+ }
+ }
+ }
+
+ @Test
+ public void testAllocator_failureAtParentLimitOutcomeDetails() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator =
+ rootAllocator.newChildAllocator("child", 0, MAX_ALLOCATION / 2)) {
+ try (final BufferAllocator grandChildAllocator =
+ childAllocator.newChildAllocator("grandchild", MAX_ALLOCATION / 4, MAX_ALLOCATION)) {
+ OutOfMemoryException e = assertThrows(OutOfMemoryException.class,
+ () -> grandChildAllocator.buffer(MAX_ALLOCATION));
+ // expected
+ assertTrue(e.getMessage().contains("Unable to allocate buffer"));
+
+ assertTrue("missing outcome details", e.getOutcomeDetails().isPresent());
+ AllocationOutcomeDetails outcomeDetails = e.getOutcomeDetails().get();
+
+ assertEquals(outcomeDetails.getFailedAllocator(), childAllocator);
+
+ // The order of allocators should be child to root (request propagates to parent if
+ // child cannot satisfy the request).
+ Iterator<Entry> iterator = outcomeDetails.allocEntries.iterator();
+ AllocationOutcomeDetails.Entry first = iterator.next();
+ assertEquals(MAX_ALLOCATION / 4, first.getAllocatedSize());
+ assertEquals(MAX_ALLOCATION, first.getRequestedSize());
+ assertEquals(false, first.isAllocationFailed());
+
+ AllocationOutcomeDetails.Entry second = iterator.next();
+ assertEquals(MAX_ALLOCATION - MAX_ALLOCATION / 4, second.getRequestedSize());
+ assertEquals(0, second.getAllocatedSize());
+ assertEquals(true, second.isAllocationFailed());
+
+ assertFalse(iterator.hasNext());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAllocator_failureAtRootLimitOutcomeDetails() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator =
+ rootAllocator.newChildAllocator("child", MAX_ALLOCATION / 2, Long.MAX_VALUE)) {
+ try (final BufferAllocator grandChildAllocator =
+ childAllocator.newChildAllocator("grandchild", MAX_ALLOCATION / 4, Long.MAX_VALUE)) {
+ OutOfMemoryException e = assertThrows(OutOfMemoryException.class,
+ () -> grandChildAllocator.buffer(MAX_ALLOCATION * 2));
+
+ assertTrue(e.getMessage().contains("Unable to allocate buffer"));
+ assertTrue("missing outcome details", e.getOutcomeDetails().isPresent());
+ AllocationOutcomeDetails outcomeDetails = e.getOutcomeDetails().get();
+
+ assertEquals(outcomeDetails.getFailedAllocator(), rootAllocator);
+
+ // The order of allocators should be child to root (request propagates to parent if
+ // child cannot satisfy the request).
+ Iterator<Entry> iterator = outcomeDetails.allocEntries.iterator();
+ AllocationOutcomeDetails.Entry first = iterator.next();
+ assertEquals(MAX_ALLOCATION / 4, first.getAllocatedSize());
+ assertEquals(2 * MAX_ALLOCATION, first.getRequestedSize());
+ assertEquals(false, first.isAllocationFailed());
+
+ AllocationOutcomeDetails.Entry second = iterator.next();
+ assertEquals(MAX_ALLOCATION / 4, second.getAllocatedSize());
+ assertEquals(2 * MAX_ALLOCATION - MAX_ALLOCATION / 4, second.getRequestedSize());
+ assertEquals(false, second.isAllocationFailed());
+
+ AllocationOutcomeDetails.Entry third = iterator.next();
+ assertEquals(0, third.getAllocatedSize());
+ assertEquals(true, third.isAllocationFailed());
+
+ assertFalse(iterator.hasNext());
+ }
+ }
+ }
+ }
+
+ private static void testAllocator_sliceUpBufferAndRelease(
+ final RootAllocator rootAllocator, final BufferAllocator bufferAllocator) {
+ final ArrowBuf arrowBuf1 = bufferAllocator.buffer(MAX_ALLOCATION / 2);
+ rootAllocator.verify();
+
+ final ArrowBuf arrowBuf2 = arrowBuf1.slice(16, arrowBuf1.capacity() - 32);
+ rootAllocator.verify();
+ final ArrowBuf arrowBuf3 = arrowBuf2.slice(16, arrowBuf2.capacity() - 32);
+ rootAllocator.verify();
+ @SuppressWarnings("unused")
+ final ArrowBuf arrowBuf4 = arrowBuf3.slice(16, arrowBuf3.capacity() - 32);
+ rootAllocator.verify();
+
+ arrowBuf3.getReferenceManager().release(); // since they share refcounts, one is enough to release them all
+ rootAllocator.verify();
+ }
+
+ @Test
+ public void testAllocator_createSlices() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);
+
+ try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0,
+ MAX_ALLOCATION)) {
+ testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
+ }
+ rootAllocator.verify();
+
+ testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);
+
+ try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0,
+ MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator2 =
+ childAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf1 = childAllocator2.buffer(MAX_ALLOCATION / 8);
+ @SuppressWarnings("unused")
+ final ArrowBuf arrowBuf2 = arrowBuf1.slice(MAX_ALLOCATION / 16, MAX_ALLOCATION / 16);
+ testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
+ arrowBuf1.getReferenceManager().release();
+ rootAllocator.verify();
+ }
+ rootAllocator.verify();
+
+ testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
+ }
+ rootAllocator.verify();
+ }
+ }
+
+ @Test
+ public void testAllocator_sliceRanges() throws Exception {
+ // final AllocatorOwner allocatorOwner = new NamedOwner("sliceRanges");
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ // Populate a buffer with byte values corresponding to their indices.
+ final ArrowBuf arrowBuf = rootAllocator.buffer(256);
+ assertEquals(256, arrowBuf.capacity());
+ assertEquals(0, arrowBuf.readerIndex());
+ assertEquals(0, arrowBuf.readableBytes());
+ assertEquals(0, arrowBuf.writerIndex());
+ assertEquals(256, arrowBuf.writableBytes());
+
+ final ArrowBuf slice3 = arrowBuf.slice();
+ assertEquals(0, slice3.readerIndex());
+ assertEquals(0, slice3.readableBytes());
+ assertEquals(0, slice3.writerIndex());
+ // assertEquals(256, slice3.capacity());
+ // assertEquals(256, slice3.writableBytes());
+
+ for (int i = 0; i < 256; ++i) {
+ arrowBuf.writeByte(i);
+ }
+ assertEquals(0, arrowBuf.readerIndex());
+ assertEquals(256, arrowBuf.readableBytes());
+ assertEquals(256, arrowBuf.writerIndex());
+ assertEquals(0, arrowBuf.writableBytes());
+
+ final ArrowBuf slice1 = arrowBuf.slice();
+ assertEquals(0, slice1.readerIndex());
+ assertEquals(256, slice1.readableBytes());
+ for (int i = 0; i < 10; ++i) {
+ assertEquals(i, slice1.readByte());
+ }
+ assertEquals(256 - 10, slice1.readableBytes());
+ for (int i = 0; i < 256; ++i) {
+ assertEquals((byte) i, slice1.getByte(i));
+ }
+
+ final ArrowBuf slice2 = arrowBuf.slice(25, 25);
+ assertEquals(0, slice2.readerIndex());
+ assertEquals(25, slice2.readableBytes());
+ for (int i = 25; i < 50; ++i) {
+ assertEquals(i, slice2.readByte());
+ }
+
+ /*
+ for(int i = 256; i > 0; --i) {
+ slice3.writeByte(i - 1);
+ }
+ for(int i = 0; i < 256; ++i) {
+ assertEquals(255 - i, slice1.getByte(i));
+ }
+ */
+
+ arrowBuf.getReferenceManager().release(); // all the derived buffers share this fate
+ }
+ }
+
+ @Test
+ public void testAllocator_slicesOfSlices() throws Exception {
+ // final AllocatorOwner allocatorOwner = new NamedOwner("slicesOfSlices");
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ // Populate a buffer with byte values corresponding to their indices.
+ final ArrowBuf arrowBuf = rootAllocator.buffer(256);
+ for (int i = 0; i < 256; ++i) {
+ arrowBuf.writeByte(i);
+ }
+
+ // Slice it up.
+ final ArrowBuf slice0 = arrowBuf.slice(0, arrowBuf.capacity());
+ for (int i = 0; i < 256; ++i) {
+ assertEquals((byte) i, arrowBuf.getByte(i));
+ }
+
+ final ArrowBuf slice10 = slice0.slice(10, arrowBuf.capacity() - 10);
+ for (int i = 10; i < 256; ++i) {
+ assertEquals((byte) i, slice10.getByte(i - 10));
+ }
+
+ final ArrowBuf slice20 = slice10.slice(10, arrowBuf.capacity() - 20);
+ for (int i = 20; i < 256; ++i) {
+ assertEquals((byte) i, slice20.getByte(i - 20));
+ }
+
+ final ArrowBuf slice30 = slice20.slice(10, arrowBuf.capacity() - 30);
+ for (int i = 30; i < 256; ++i) {
+ assertEquals((byte) i, slice30.getByte(i - 30));
+ }
+
+ arrowBuf.getReferenceManager().release();
+ }
+ }
+
+ @Test
+ public void testAllocator_transferSliced() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced1", 0, MAX_ALLOCATION);
+ final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced2", 0, MAX_ALLOCATION);
+
+ final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
+ final ArrowBuf arrowBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);
+
+ final ArrowBuf arrowBuf1s = arrowBuf1.slice(0, arrowBuf1.capacity() / 2);
+ final ArrowBuf arrowBuf2s = arrowBuf2.slice(0, arrowBuf2.capacity() / 2);
+
+ rootAllocator.verify();
+
+ OwnershipTransferResult result1 = arrowBuf2s.getReferenceManager().transferOwnership(arrowBuf2s, childAllocator1);
+ assertEquiv(arrowBuf2s, result1.getTransferredBuffer());
+ rootAllocator.verify();
+ OwnershipTransferResult result2 = arrowBuf1s.getReferenceManager().transferOwnership(arrowBuf1s, childAllocator2);
+ assertEquiv(arrowBuf1s, result2.getTransferredBuffer());
+ rootAllocator.verify();
+
+ result1.getTransferredBuffer().getReferenceManager().release();
+ result2.getTransferredBuffer().getReferenceManager().release();
+
+ arrowBuf1s.getReferenceManager().release(); // releases arrowBuf1
+ arrowBuf2s.getReferenceManager().release(); // releases arrowBuf2
+
+ childAllocator1.close();
+ childAllocator2.close();
+ }
+ }
+
+ @Test
+ public void testAllocator_shareSliced() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION);
+ final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION);
+
+ final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
+ final ArrowBuf arrowBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);
+
+ final ArrowBuf arrowBuf1s = arrowBuf1.slice(0, arrowBuf1.capacity() / 2);
+ final ArrowBuf arrowBuf2s = arrowBuf2.slice(0, arrowBuf2.capacity() / 2);
+
+ rootAllocator.verify();
+
+ final ArrowBuf arrowBuf2s1 = arrowBuf2s.getReferenceManager().retain(arrowBuf2s, childAllocator1);
+ assertEquiv(arrowBuf2s, arrowBuf2s1);
+ final ArrowBuf arrowBuf1s2 = arrowBuf1s.getReferenceManager().retain(arrowBuf1s, childAllocator2);
+ assertEquiv(arrowBuf1s, arrowBuf1s2);
+ rootAllocator.verify();
+
+ arrowBuf1s.getReferenceManager().release(); // releases arrowBuf1
+ arrowBuf2s.getReferenceManager().release(); // releases arrowBuf2
+ rootAllocator.verify();
+
+ arrowBuf2s1.getReferenceManager().release(); // releases the shared arrowBuf2 slice
+ arrowBuf1s2.getReferenceManager().release(); // releases the shared arrowBuf1 slice
+
+ childAllocator1.close();
+ childAllocator2.close();
+ }
+ }
+
+ @Test
+ public void testAllocator_transferShared() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferShared1", 0, MAX_ALLOCATION);
+ final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferShared2", 0, MAX_ALLOCATION);
+ final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("transferShared3", 0, MAX_ALLOCATION);
+
+ final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
+
+ boolean allocationFit;
+
+ ArrowBuf arrowBuf2 = arrowBuf1.getReferenceManager().retain(arrowBuf1, childAllocator2);
+ rootAllocator.verify();
+ assertNotNull(arrowBuf2);
+ assertNotEquals(arrowBuf2, arrowBuf1);
+ assertEquiv(arrowBuf1, arrowBuf2);
+
+ final ReferenceManager refManager1 = arrowBuf1.getReferenceManager();
+ final OwnershipTransferResult result1 = refManager1.transferOwnership(arrowBuf1, childAllocator3);
+ allocationFit = result1.getAllocationFit();
+ final ArrowBuf arrowBuf3 = result1.getTransferredBuffer();
+ assertTrue(allocationFit);
+ assertEquiv(arrowBuf1, arrowBuf3);
+ rootAllocator.verify();
+
+ // Since childAllocator3 now has childAllocator1's buffer, 1, can close
+ arrowBuf1.getReferenceManager().release();
+ childAllocator1.close();
+ rootAllocator.verify();
+
+ arrowBuf2.getReferenceManager().release();
+ childAllocator2.close();
+ rootAllocator.verify();
+
+ final BufferAllocator childAllocator4 = rootAllocator.newChildAllocator("transferShared4", 0, MAX_ALLOCATION);
+ final ReferenceManager refManager3 = arrowBuf3.getReferenceManager();
+ final OwnershipTransferResult result3 = refManager3.transferOwnership(arrowBuf3, childAllocator4);
+ allocationFit = result3.getAllocationFit();
+ final ArrowBuf arrowBuf4 = result3.getTransferredBuffer();
+ assertTrue(allocationFit);
+ assertEquiv(arrowBuf3, arrowBuf4);
+ rootAllocator.verify();
+
+ arrowBuf3.getReferenceManager().release();
+ childAllocator3.close();
+ rootAllocator.verify();
+
+ arrowBuf4.getReferenceManager().release();
+ childAllocator4.close();
+ rootAllocator.verify();
+ }
+ }
+
+ @Test
+ public void testAllocator_unclaimedReservation() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator1 =
+ rootAllocator.newChildAllocator("unclaimedReservation", 0, MAX_ALLOCATION)) {
+ try (final AllocationReservation reservation = childAllocator1.newReservation()) {
+ assertTrue(reservation.add(64));
+ }
+ rootAllocator.verify();
+ }
+ }
+ }
+
+ @Test
+ public void testAllocator_claimedReservation() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+
+ try (final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator(
+ "claimedReservation", 0, MAX_ALLOCATION)) {
+
+ try (final AllocationReservation reservation = childAllocator1.newReservation()) {
+ assertTrue(reservation.add(32));
+ assertTrue(reservation.add(32));
+
+ final ArrowBuf arrowBuf = reservation.allocateBuffer();
+ assertEquals(64, arrowBuf.capacity());
+ rootAllocator.verify();
+
+ arrowBuf.getReferenceManager().release();
+ rootAllocator.verify();
+ }
+ rootAllocator.verify();
+ }
+ }
+ }
+
+ @Test
+ public void testInitReservationAndLimit() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator(
+ "child", 2048, 4096)) {
+ assertEquals(2048, childAllocator.getInitReservation());
+ assertEquals(4096, childAllocator.getLimit());
+ }
+ }
+ }
+
+ @Test
+ public void multiple() throws Exception {
+ final String owner = "test";
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+
+ final int op = 100000;
+
+ BufferAllocator frag1 = allocator.newChildAllocator(owner, 1500000, Long.MAX_VALUE);
+ BufferAllocator frag2 = allocator.newChildAllocator(owner, 500000, Long.MAX_VALUE);
+
+ allocator.verify();
+
+ BufferAllocator allocator11 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);
+ ArrowBuf b11 = allocator11.buffer(1000000);
+
+ allocator.verify();
+
+ BufferAllocator allocator12 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);
+ ArrowBuf b12 = allocator12.buffer(500000);
+
+ allocator.verify();
+
+ BufferAllocator allocator21 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);
+
+ allocator.verify();
+
+ BufferAllocator allocator22 = frag2.newChildAllocator(owner, op, Long.MAX_VALUE);
+ ArrowBuf b22 = allocator22.buffer(2000000);
+
+ allocator.verify();
+
+ BufferAllocator frag3 = allocator.newChildAllocator(owner, 1000000, Long.MAX_VALUE);
+
+ allocator.verify();
+
+ BufferAllocator allocator31 = frag3.newChildAllocator(owner, op, Long.MAX_VALUE);
+ ArrowBuf b31a = allocator31.buffer(200000);
+
+ allocator.verify();
+
+ // Previously running operator completes
+ b22.getReferenceManager().release();
+
+ allocator.verify();
+
+ allocator22.close();
+
+ b31a.getReferenceManager().release();
+ allocator31.close();
+
+ b12.getReferenceManager().release();
+ allocator12.close();
+
+ allocator21.close();
+
+ b11.getReferenceManager().release();
+ allocator11.close();
+
+ frag1.close();
+ frag2.close();
+ frag3.close();
+
+ }
+ }
+
+ // This test needs to run in non-debug mode. So disabling the assertion status through class loader for this.
+ // The test passes if run individually with -Dtest=TestBaseAllocator#testMemoryLeakWithReservation
+ // but fails generally since the assertion status cannot be changed once the class is initialized.
+ // So setting the test to @ignore
+ @Test(expected = IllegalStateException.class)
+ @Ignore
+ public void testMemoryLeakWithReservation() throws Exception {
+ // disabling assertion status
+ AssertionUtil.class.getClassLoader().setClassAssertionStatus(AssertionUtil.class.getName(), false);
+ try (RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ ChildAllocator childAllocator1 = (ChildAllocator) rootAllocator.newChildAllocator(
+ "child1", 1024, MAX_ALLOCATION);
+ rootAllocator.verify();
+
+ ChildAllocator childAllocator2 = (ChildAllocator) childAllocator1.newChildAllocator(
+ "child2", 1024, MAX_ALLOCATION);
+ rootAllocator.verify();
+
+ ArrowBuf buff = childAllocator2.buffer(256);
+
+ Exception exception = assertThrows(IllegalStateException.class, () -> {
+ childAllocator2.close();
+ });
+ String exMessage = exception.getMessage();
+ assertTrue(exMessage.contains("Memory leaked: (256)"));
+
+ exception = assertThrows(IllegalStateException.class, () -> {
+ childAllocator1.close();
+ });
+ exMessage = exception.getMessage();
+ assertTrue(exMessage.contains("Memory leaked: (256)"));
+ }
+ }
+
+ public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) {
+ assertEquals(origBuf.readerIndex(), newBuf.readerIndex());
+ assertEquals(origBuf.writerIndex(), newBuf.writerIndex());
+ }
+}
diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEmptyArrowBuf.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEmptyArrowBuf.java
new file mode 100644
index 000000000..3fd7ce74a
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEmptyArrowBuf.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.PooledByteBufAllocatorL;
+
+public class TestEmptyArrowBuf {
+
+ private static final int MAX_ALLOCATION = 8 * 1024;
+ private static RootAllocator allocator;
+
+ @BeforeClass
+ public static void beforeClass() {
+ allocator = new RootAllocator(MAX_ALLOCATION);
+ }
+
+ /** Ensure the allocator is closed. */
+ @AfterClass
+ public static void afterClass() {
+ if (allocator != null) {
+ allocator.close();
+ }
+ }
+
+ @Test
+ public void testZeroBuf() {
+ // Exercise the historical log inside the empty ArrowBuf. This is initialized statically, and there is a circular
+ // dependency between ArrowBuf and BaseAllocator, so if the initialization happens in the wrong order, the
+ // historical log will be null even though BaseAllocator.DEBUG is true.
+ allocator.getEmpty().print(new StringBuilder(), 0, BaseAllocator.Verbosity.LOG_WITH_STACKTRACE);
+ }
+
+ @Test
+ public void testEmptyArrowBuf() {
+ ArrowBuf buf = new ArrowBuf(ReferenceManager.NO_OP, null,
+ 1024, new PooledByteBufAllocatorL().empty.memoryAddress());
+
+ buf.getReferenceManager().retain();
+ buf.getReferenceManager().retain(8);
+ assertEquals(1024, buf.capacity());
+ assertEquals(1, buf.getReferenceManager().getRefCount());
+ assertEquals(0, buf.getActualMemoryConsumed());
+
+ for (int i = 0; i < 10; i++) {
+ buf.setByte(i, i);
+ }
+ assertEquals(0, buf.getActualMemoryConsumed());
+ assertEquals(0, buf.getReferenceManager().getSize());
+ assertEquals(0, buf.getReferenceManager().getAccountedSize());
+ assertEquals(false, buf.getReferenceManager().release());
+ assertEquals(false, buf.getReferenceManager().release(2));
+ assertEquals(0, buf.getReferenceManager().getAllocator().getLimit());
+ assertEquals(buf, buf.getReferenceManager().transferOwnership(buf, allocator).getTransferredBuffer());
+ assertEquals(0, buf.readerIndex());
+ assertEquals(0, buf.writerIndex());
+ assertEquals(1, buf.refCnt());
+
+ ArrowBuf derive = buf.getReferenceManager().deriveBuffer(buf, 0, 100);
+ assertEquals(derive, buf);
+ assertEquals(1, buf.refCnt());
+ assertEquals(1, derive.refCnt());
+
+ buf.close();
+
+ }
+
+}
diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEndianness.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEndianness.java
new file mode 100644
index 000000000..dcaeb2488
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEndianness.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteOrder;
+
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.NettyArrowBuf;
+
+public class TestEndianness {
+
+ @Test
+ public void testNativeEndian() {
+ final BufferAllocator a = new RootAllocator(10000);
+ final ByteBuf b = NettyArrowBuf.unwrapBuffer(a.buffer(4));
+ b.setInt(0, 35);
+ if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
+ assertEquals(b.getByte(0), 35);
+ assertEquals(b.getByte(1), 0);
+ assertEquals(b.getByte(2), 0);
+ assertEquals(b.getByte(3), 0);
+ } else {
+ assertEquals(b.getByte(0), 0);
+ assertEquals(b.getByte(1), 0);
+ assertEquals(b.getByte(2), 0);
+ assertEquals(b.getByte(3), 35);
+ }
+ b.release();
+ a.close();
+ }
+
+}
diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java
new file mode 100644
index 000000000..1b64cd733
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Test cases for {@link NettyAllocationManager}.
+ */
+public class TestNettyAllocationManager {
+
+ static int CUSTOMIZED_ALLOCATION_CUTOFF_VALUE = 1024;
+
+ private BaseAllocator createCustomizedAllocator() {
+ return new RootAllocator(BaseAllocator.configBuilder()
+ .allocationManagerFactory(new AllocationManager.Factory() {
+ @Override
+ public AllocationManager create(BufferAllocator accountingAllocator, long size) {
+ return new NettyAllocationManager(accountingAllocator, size, CUSTOMIZED_ALLOCATION_CUTOFF_VALUE);
+ }
+
+ @Override
+ public ArrowBuf empty() {
+ return null;
+ }
+ }).build());
+ }
+
+ private void readWriteArrowBuf(ArrowBuf buffer) {
+ // write buffer
+ for (long i = 0; i < buffer.capacity() / 8; i++) {
+ buffer.setLong(i * 8, i);
+ }
+
+ // read buffer
+ for (long i = 0; i < buffer.capacity() / 8; i++) {
+ long val = buffer.getLong(i * 8);
+ assertEquals(i, val);
+ }
+ }
+
+ /**
+ * Test the allocation strategy for small buffers..
+ */
+ @Test
+ public void testSmallBufferAllocation() {
+ final long bufSize = CUSTOMIZED_ALLOCATION_CUTOFF_VALUE - 512L;
+ try (BaseAllocator allocator = createCustomizedAllocator();
+ ArrowBuf buffer = allocator.buffer(bufSize)) {
+
+ assertTrue(buffer.getReferenceManager() instanceof BufferLedger);
+ BufferLedger bufferLedger = (BufferLedger) buffer.getReferenceManager();
+
+ // make sure we are using netty allocation manager
+ AllocationManager allocMgr = bufferLedger.getAllocationManager();
+ assertTrue(allocMgr instanceof NettyAllocationManager);
+ NettyAllocationManager nettyMgr = (NettyAllocationManager) allocMgr;
+
+ // for the small buffer allocation strategy, the chunk is not null
+ assertNotNull(nettyMgr.getMemoryChunk());
+
+ readWriteArrowBuf(buffer);
+ }
+ }
+
+ /**
+ * Test the allocation strategy for large buffers..
+ */
+ @Test
+ public void testLargeBufferAllocation() {
+ final long bufSize = CUSTOMIZED_ALLOCATION_CUTOFF_VALUE + 1024L;
+ try (BaseAllocator allocator = createCustomizedAllocator();
+ ArrowBuf buffer = allocator.buffer(bufSize)) {
+ assertTrue(buffer.getReferenceManager() instanceof BufferLedger);
+ BufferLedger bufferLedger = (BufferLedger) buffer.getReferenceManager();
+
+ // make sure we are using netty allocation manager
+ AllocationManager allocMgr = bufferLedger.getAllocationManager();
+ assertTrue(allocMgr instanceof NettyAllocationManager);
+ NettyAllocationManager nettyMgr = (NettyAllocationManager) allocMgr;
+
+ // for the large buffer allocation strategy, the chunk is null
+ assertNull(nettyMgr.getMemoryChunk());
+
+ readWriteArrowBuf(buffer);
+ }
+ }
+}
diff --git a/src/arrow/java/memory/memory-netty/src/test/resources/logback.xml b/src/arrow/java/memory/memory-netty/src/test/resources/logback.xml
new file mode 100644
index 000000000..4c54d18a2
--- /dev/null
+++ b/src/arrow/java/memory/memory-netty/src/test/resources/logback.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<configuration>
+ <statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.arrow" additivity="false">
+ <level value="info" />
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+</configuration>