diff options
Diffstat (limited to 'src/arrow/java/memory/memory-netty')
19 files changed, 3891 insertions, 0 deletions
diff --git a/src/arrow/java/memory/memory-netty/pom.xml b/src/arrow/java/memory/memory-netty/pom.xml new file mode 100644 index 000000000..dee06a321 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/pom.xml @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>arrow-memory</artifactId> + <groupId>org.apache.arrow</groupId> + <version>6.0.1</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>arrow-memory-netty</artifactId> + <name>Arrow Memory - Netty</name> + <description>Netty allocator and utils for allocating memory in Arrow</description> + + <dependencies> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-common</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.immutables</groupId> + <artifactId>value</artifactId> + </dependency> + </dependencies> + + <profiles> + <profile> + <!-- This profile turns on integration testing. It activates the failsafe plugin and will run any tests + with the 'IT' prefix. This should be run in a separate CI build or on developers machines as it potentially + uses quite a bit of memory. Activate the tests by adding -Pintegration-tests to your maven command line --> + <id>integration-tests</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/ExpandableByteBuf.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/ExpandableByteBuf.java new file mode 100644 index 000000000..09b730044 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/ExpandableByteBuf.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + +import org.apache.arrow.memory.BufferAllocator; + +/** + * Allows us to decorate ArrowBuf to make it expandable so that we can use them in the context of + * the Netty framework + * (thus supporting RPC level memory accounting). + */ +public class ExpandableByteBuf extends MutableWrappedByteBuf { + + private final BufferAllocator allocator; + + public ExpandableByteBuf(ByteBuf buffer, BufferAllocator allocator) { + super(buffer); + this.allocator = allocator; + } + + @Override + public ByteBuf copy(int index, int length) { + return new ExpandableByteBuf(buffer.copy(index, length), allocator); + } + + @Override + public ByteBuf capacity(int newCapacity) { + if (newCapacity > capacity()) { + ByteBuf newBuf = NettyArrowBuf.unwrapBuffer(allocator.buffer(newCapacity)); + newBuf.writeBytes(buffer, 0, buffer.capacity()); + newBuf.readerIndex(buffer.readerIndex()); + newBuf.writerIndex(buffer.writerIndex()); + buffer.release(); + buffer = newBuf; + return newBuf; + } else { + return super.capacity(newCapacity); + } + } + +} diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/LargeBuffer.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/LargeBuffer.java new file mode 100644 index 000000000..792b3b814 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/LargeBuffer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + +/** + * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and + * counts. + */ +public class LargeBuffer extends MutableWrappedByteBuf { + + public LargeBuffer(ByteBuf buffer) { + super(buffer); + } + + @Override + public ByteBuf copy(int index, int length) { + return new LargeBuffer(buffer.copy(index, length)); + } +} diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java new file mode 100644 index 000000000..5221dd3c1 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; + +import io.netty.util.ByteProcessor; + +/** + * This is basically a complete copy of netty's DuplicatedByteBuf. We copy because we want to override + * some behaviors and make buffer mutable. + */ +abstract class MutableWrappedByteBuf extends AbstractByteBuf { + + ByteBuf buffer; + + public MutableWrappedByteBuf(ByteBuf buffer) { + super(buffer.maxCapacity()); + + if (buffer instanceof MutableWrappedByteBuf) { + this.buffer = ((MutableWrappedByteBuf) buffer).buffer; + } else { + this.buffer = buffer; + } + + setIndex(buffer.readerIndex(), buffer.writerIndex()); + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + return unwrap().nioBuffer(index, length); + } + + @Override + public ByteBuf unwrap() { + return buffer; + } + + @Override + public ByteBufAllocator alloc() { + return buffer.alloc(); + } + + @Override + public ByteOrder order() { + return buffer.order(); + } + + @Override + public boolean isDirect() { + return buffer.isDirect(); + } + + @Override + public int capacity() { + return buffer.capacity(); + } + + @Override + public ByteBuf capacity(int newCapacity) { + buffer.capacity(newCapacity); + return this; + } + + @Override + public boolean hasArray() { + return buffer.hasArray(); + } + + @Override + public byte[] array() { + return buffer.array(); + } + + @Override + public int arrayOffset() { + return buffer.arrayOffset(); + } + + @Override + public boolean hasMemoryAddress() { + return buffer.hasMemoryAddress(); + } + + @Override + public long memoryAddress() { + return buffer.memoryAddress(); + } + + @Override + public byte getByte(int index) { + return _getByte(index); + } + + @Override + protected byte _getByte(int index) { + return buffer.getByte(index); + } + + @Override + public short getShort(int index) { + return _getShort(index); + } + + @Override + protected short _getShort(int index) { + return buffer.getShort(index); + } + + @Override + public short getShortLE(int index) { + return buffer.getShortLE(index); + } + + @Override + protected short _getShortLE(int index) { + return buffer.getShortLE(index); + } + + @Override + public int getUnsignedMedium(int index) { + return _getUnsignedMedium(index); + } + + @Override + protected int _getUnsignedMedium(int index) { + return buffer.getUnsignedMedium(index); + } + + @Override + public int getUnsignedMediumLE(int index) { + return buffer.getUnsignedMediumLE(index); + } + + @Override + protected int _getUnsignedMediumLE(int index) { + return buffer.getUnsignedMediumLE(index); + } + + @Override + public int getInt(int index) { + return _getInt(index); + } + + @Override + protected int _getInt(int index) { + return buffer.getInt(index); + } + + @Override + public int getIntLE(int index) { + return buffer.getIntLE(index); + } + + @Override + protected int _getIntLE(int index) { + return buffer.getIntLE(index); + } + + @Override + public long getLong(int index) { + return _getLong(index); + } + + @Override + protected long _getLong(int index) { + return buffer.getLong(index); + } + + @Override + public long getLongLE(int index) { + return buffer.getLongLE(index); + } + + @Override + protected long _getLongLE(int index) { + return buffer.getLongLE(index); + } + + @Override + public abstract ByteBuf copy(int index, int length); + + @Override + public ByteBuf slice(int index, int length) { + return new SlicedByteBuf(this, index, length); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + buffer.getBytes(index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + buffer.getBytes(index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + buffer.getBytes(index, dst); + return this; + } + + @Override + public ByteBuf setByte(int index, int value) { + _setByte(index, value); + return this; + } + + @Override + protected void _setByte(int index, int value) { + buffer.setByte(index, value); + } + + @Override + public ByteBuf setShort(int index, int value) { + _setShort(index, value); + return this; + } + + @Override + protected void _setShort(int index, int value) { + buffer.setShort(index, value); + } + + @Override + public ByteBuf setShortLE(int index, int value) { + buffer.setShortLE(index, value); + return this; + } + + @Override + protected void _setShortLE(int index, int value) { + buffer.setShortLE(index, value); + } + + @Override + public ByteBuf setMedium(int index, int value) { + _setMedium(index, value); + return this; + } + + @Override + protected void _setMedium(int index, int value) { + buffer.setMedium(index, value); + } + + @Override + public ByteBuf setMediumLE(int index, int value) { + buffer.setMediumLE(index, value); + return this; + } + + @Override + protected void _setMediumLE(int index, int value) { + buffer.setMediumLE(index, value); + } + + @Override + public ByteBuf setInt(int index, int value) { + _setInt(index, value); + return this; + } + + @Override + protected void _setInt(int index, int value) { + buffer.setInt(index, value); + } + + @Override + public ByteBuf setIntLE(int index, int value) { + buffer.setIntLE(index, value); + return this; + } + + @Override + protected void _setIntLE(int index, int value) { + buffer.setIntLE(index, value); + } + + @Override + public ByteBuf setLong(int index, long value) { + _setLong(index, value); + return this; + } + + @Override + protected void _setLong(int index, long value) { + buffer.setLong(index, value); + } + + @Override + public ByteBuf setLongLE(int index, long value) { + buffer.setLongLE(index, value); + return this; + } + + @Override + protected void _setLongLE(int index, long value) { + buffer.setLongLE(index, value); + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + buffer.setBytes(index, src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + buffer.setBytes(index, src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + buffer.setBytes(index, src); + return this; + } + + @Override + public int setBytes(int index, FileChannel in, long position, int length) + throws IOException { + return buffer.setBytes(index, in, position, length); + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) + throws IOException { + buffer.getBytes(index, out, length); + return this; + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) + throws IOException { + return buffer.getBytes(index, out, length); + } + + @Override + public int setBytes(int index, InputStream in, int length) + throws IOException { + return buffer.setBytes(index, in, length); + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) + throws IOException { + return buffer.setBytes(index, in, length); + } + + + @Override + public int getBytes(int index, FileChannel out, long position, int length) + throws IOException { + return buffer.getBytes(index, out, position, length); + } + + @Override + public int nioBufferCount() { + return buffer.nioBufferCount(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return buffer.nioBuffers(index, length); + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + return nioBuffer(index, length); + } + + @Override + public int forEachByte(int index, int length, ByteProcessor processor) { + return buffer.forEachByte(index, length, processor); + } + + @Override + public int forEachByteDesc(int index, int length, ByteProcessor processor) { + return buffer.forEachByteDesc(index, length, processor); + } + + @Override + public final int refCnt() { + return unwrap().refCnt(); + } + + @Override + public final ByteBuf touch() { + unwrap().touch(); + return this; + } + + @Override + public final ByteBuf touch(Object hint) { + unwrap().touch(hint); + return this; + } + + @Override + public final ByteBuf retain() { + unwrap().retain(); + return this; + } + + @Override + public final ByteBuf retain(int increment) { + unwrap().retain(increment); + return this; + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + boolean released = unwrap().release(decrement); + return released; + } + +} diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/NettyArrowBuf.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/NettyArrowBuf.java new file mode 100644 index 000000000..8681b005f --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/NettyArrowBuf.java @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + +import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.ArrowByteBufAllocator; +import org.apache.arrow.memory.BoundsChecking; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; + +import io.netty.util.internal.PlatformDependent; + +/** + * Netty specific wrapper over ArrowBuf for use in Netty framework. + */ +public class NettyArrowBuf extends AbstractByteBuf implements AutoCloseable { + + private final ArrowBuf arrowBuf; + private final ArrowByteBufAllocator arrowByteBufAllocator; + private int length; + private final long address; + + /** + * Constructs a new instance. + * + * @param arrowBuf The buffer to wrap. + * @param bufferAllocator The allocator for the buffer. + * @param length The length of this buffer. + */ + public NettyArrowBuf( + final ArrowBuf arrowBuf, + final BufferAllocator bufferAllocator, + final int length) { + super(length); + this.arrowBuf = arrowBuf; + this.arrowByteBufAllocator = new ArrowByteBufAllocator(bufferAllocator); + this.length = length; + this.address = arrowBuf.memoryAddress(); + } + + @Override + public ByteBuf copy() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf copy(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retain() { + arrowBuf.getReferenceManager().retain(); + return this; + } + + public ArrowBuf arrowBuf() { + return arrowBuf; + } + + @Override + public ByteBuf retain(final int increment) { + arrowBuf.getReferenceManager().retain(increment); + return this; + } + + @Override + public boolean isDirect() { + return true; + } + + @Override + public synchronized ByteBuf capacity(int newCapacity) { + if (newCapacity == length) { + return this; + } + Preconditions.checkArgument(newCapacity >= 0); + if (newCapacity < length) { + length = newCapacity; + return this; + } + throw new UnsupportedOperationException("Buffers don't support resizing that increases the size."); + } + + @Override + public ByteBuf unwrap() { + throw new UnsupportedOperationException("Unwrap not supported."); + } + + @Override + public int refCnt() { + return arrowBuf.getReferenceManager().getRefCount(); + } + + @Override + public ArrowByteBufAllocator alloc() { + return arrowByteBufAllocator; + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public byte[] array() { + throw new UnsupportedOperationException("Operation not supported on direct buffer"); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException("Operation not supported on direct buffer"); + } + + @Override + public boolean hasMemoryAddress() { + return true; + } + + @Override + public long memoryAddress() { + return this.address; + } + + @Override + public ByteBuf touch() { + return this; + } + + @Override + public ByteBuf touch(Object hint) { + return this; + } + + @Override + public int capacity() { + return (int) Math.min(Integer.MAX_VALUE, arrowBuf.capacity()); + } + + @Override + public NettyArrowBuf slice() { + return unwrapBuffer(arrowBuf.slice(readerIndex, writerIndex - readerIndex)); + } + + @Override + public NettyArrowBuf slice(int index, int length) { + return unwrapBuffer(arrowBuf.slice(index, length)); + } + + @Override + public void close() { + arrowBuf.close(); + } + + @Override + public boolean release() { + return arrowBuf.getReferenceManager().release(); + } + + @Override + public boolean release(int decrement) { + return arrowBuf.getReferenceManager().release(decrement); + } + + @Override + public NettyArrowBuf readerIndex(int readerIndex) { + super.readerIndex(readerIndex); + return this; + } + + @Override + public NettyArrowBuf writerIndex(int writerIndex) { + super.writerIndex(writerIndex); + return this; + } + + @Override + public int nioBufferCount() { + return 1; + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + ByteBuffer nioBuf = getDirectBuffer(index); + // Follows convention from other ByteBuf implementations. + return (ByteBuffer) nioBuf.clear().limit(length); + } + + @Override + public ByteBuffer[] nioBuffers() { + return new ByteBuffer[] {nioBuffer()}; + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return new ByteBuffer[] {nioBuffer(index, length)}; + } + + @Override + public ByteBuffer nioBuffer() { + return nioBuffer(readerIndex(), readableBytes()); + } + + + /** + * Returns a buffer that is zero positioned but points + * to a slice of the original buffer starting at given index. + */ + @Override + public ByteBuffer nioBuffer(int index, int length) { + chk(index, length); + final ByteBuffer buffer = getDirectBuffer(index); + buffer.limit(length); + return buffer; + } + + /** + * Returns a buffer that is zero positioned but points + * to a slice of the original buffer starting at given index. + */ + public ByteBuffer nioBuffer(long index, int length) { + chk(index, length); + final ByteBuffer buffer = getDirectBuffer(index); + buffer.limit(length); + return buffer; + } + + /** + * Get this ArrowBuf as a direct {@link ByteBuffer}. + * + * @return ByteBuffer + */ + private ByteBuffer getDirectBuffer(long index) { + return PlatformDependent.directBuffer(addr(index), checkedCastToInt(length - index)); + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + arrowBuf.getBytes(index, dst); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + arrowBuf.setBytes(index, src); + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + arrowBuf.getBytes(index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + arrowBuf.setBytes(index, src, srcIndex, length); + return this; + } + + /** + * Determine if the requested {@code index} and {@code length} will fit within {@code capacity}. + * + * @param index The starting index. + * @param length The length which will be utilized (starting from {@code index}). + * @param capacity The capacity that {@code index + length} is allowed to be within. + * @return {@code true} if the requested {@code index} and {@code length} will fit within {@code capacity}. + * {@code false} if this would result in an index out of bounds exception. + */ + private static boolean isOutOfBounds(int index, int length, int capacity) { + return (index | length | (index + length) | (capacity - (index + length))) < 0; + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + chk(index, length); + Preconditions.checkArgument(dst != null, "Expecting valid dst ByteBuffer"); + if (isOutOfBounds(dstIndex, length, dst.capacity())) { + throw new IndexOutOfBoundsException("dstIndex: " + dstIndex + " length: " + length); + } else { + final long srcAddress = addr(index); + if (dst.hasMemoryAddress()) { + final long dstAddress = dst.memoryAddress() + (long) dstIndex; + PlatformDependent.copyMemory(srcAddress, dstAddress, (long) length); + } else if (dst.hasArray()) { + dstIndex += dst.arrayOffset(); + PlatformDependent.copyMemory(srcAddress, dst.array(), dstIndex, (long) length); + } else { + dst.setBytes(dstIndex, this, index, length); + } + } + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + chk(index, length); + Preconditions.checkArgument(src != null, "Expecting valid src ByteBuffer"); + if (isOutOfBounds(srcIndex, length, src.capacity())) { + throw new IndexOutOfBoundsException("srcIndex: " + srcIndex + " length: " + length); + } else { + if (length != 0) { + final long dstAddress = addr(index); + if (src.hasMemoryAddress()) { + final long srcAddress = src.memoryAddress() + (long) srcIndex; + PlatformDependent.copyMemory(srcAddress, dstAddress, (long) length); + } else if (src.hasArray()) { + srcIndex += src.arrayOffset(); + PlatformDependent.copyMemory(src.array(), srcIndex, dstAddress, (long) length); + } else { + src.getBytes(srcIndex, this, index, length); + } + } + } + return this; + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + arrowBuf.getBytes(index, out, length); + return this; + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + return arrowBuf.setBytes(index, in, length); + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + Preconditions.checkArgument(out != null, "expecting valid gathering byte channel"); + chk(index, length); + if (length == 0) { + return 0; + } else { + final ByteBuffer tmpBuf = getDirectBuffer(index); + tmpBuf.clear().limit(length); + return out.write(tmpBuf); + } + } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) throws IOException { + chk(index, length); + if (length == 0) { + return 0; + } else { + final ByteBuffer tmpBuf = getDirectBuffer(index); + tmpBuf.clear().limit(length); + return out.write(tmpBuf, position); + } + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + return (int) in.read(nioBuffers(index, length)); + } + + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + return (int) in.read(nioBuffers(index, length)); + } + + @Override + public ByteOrder order() { + return ByteOrder.LITTLE_ENDIAN; + } + + @Override + public ByteBuf order(ByteOrder endianness) { + return this; + } + + @Override + protected int _getUnsignedMedium(int index) { + return getUnsignedMedium(index); + } + + @Override + protected int _getUnsignedMediumLE(int index) { + this.chk(index, 3); + long addr = this.addr(index); + return PlatformDependent.getByte(addr) & 255 | + (Short.reverseBytes(PlatformDependent.getShort(addr + 1L)) & '\uffff') << 8; + } + + + /*-------------------------------------------------* + | | + | get() APIs | + | | + *-------------------------------------------------*/ + + + @Override + protected byte _getByte(int index) { + return getByte(index); + } + + @Override + public byte getByte(int index) { + return arrowBuf.getByte(index); + } + + @Override + protected short _getShortLE(int index) { + short s = getShort(index); + return Short.reverseBytes(s); + } + + @Override + protected short _getShort(int index) { + return getShort(index); + } + + @Override + public short getShort(int index) { + return arrowBuf.getShort(index); + } + + @Override + protected int _getIntLE(int index) { + int value = getInt(index); + return Integer.reverseBytes(value); + } + + @Override + protected int _getInt(int index) { + return getInt(index); + } + + @Override + public int getInt(int index) { + return arrowBuf.getInt(index); + } + + @Override + protected long _getLongLE(int index) { + long value = getLong(index); + return Long.reverseBytes(value); + } + + @Override + protected long _getLong(int index) { + return getLong(index); + } + + @Override + public long getLong(int index) { + return arrowBuf.getLong(index); + } + + + /*-------------------------------------------------* + | | + | set() APIs | + | | + *-------------------------------------------------*/ + + + @Override + protected void _setByte(int index, int value) { + setByte(index, value); + } + + @Override + public NettyArrowBuf setByte(int index, int value) { + arrowBuf.setByte(index, value); + return this; + } + + @Override + protected void _setShortLE(int index, int value) { + this.chk(index, 2); + PlatformDependent.putShort(this.addr(index), Short.reverseBytes((short) value)); + } + + @Override + protected void _setShort(int index, int value) { + setShort(index, value); + } + + @Override + public NettyArrowBuf setShort(int index, int value) { + arrowBuf.setShort(index, value); + return this; + } + + private long addr(long index) { + return address + index; + } + + /** + * Helper function to do bounds checking at a particular + * index for particular length of data. + * + * @param index index (0 based relative to this ArrowBuf) + * @param fieldLength provided length of data for get/set + */ + private void chk(long index, long fieldLength) { + if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { + // check reference count + ensureAccessible(); + // check bounds + if (fieldLength < 0) { + throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)"); + } + if (index < 0 || index > capacity() - fieldLength) { + throw new IndexOutOfBoundsException(String.format( + "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity())); + } + } + } + + @Override + protected void _setMedium(int index, int value) { + setMedium(index, value); + } + + @Override + protected void _setMediumLE(int index, int value) { + this.chk(index, 3); + long addr = this.addr(index); + PlatformDependent.putByte(addr, (byte) value); + PlatformDependent.putShort(addr + 1L, Short.reverseBytes((short) (value >>> 8))); + } + + @Override + public NettyArrowBuf setMedium(int index, int value) { + chk(index, 3); + final long addr = addr(index); + // we need to store 3 bytes starting from least significant byte + // and ignoring the most significant byte + // since arrow memory format is little endian, we will + // first store the first 2 bytes followed by third byte + // example: if the 4 byte int value is ABCD where A is MSB + // D is LSB then we effectively want to store DCB in increasing + // address to get Little Endian byte order + // (short)value will give us CD and PlatformDependent.putShort() + // will store them in LE order as DC starting at address addr + // in order to get B, we do ABCD >>> 16 = 00AB => (byte)AB which + // gives B. We store this at address addr + 2. So finally we get + // DCB + PlatformDependent.putShort(addr, (short) value); + PlatformDependent.putByte(addr + 2, (byte) (value >>> 16)); + return this; + } + + @Override + protected void _setInt(int index, int value) { + setInt(index, value); + } + + @Override + protected void _setIntLE(int index, int value) { + this.chk(index, 4); + PlatformDependent.putInt(this.addr(index), Integer.reverseBytes(value)); + } + + @Override + public NettyArrowBuf setInt(int index, int value) { + arrowBuf.setInt(index, value); + return this; + } + + @Override + protected void _setLong(int index, long value) { + setLong(index, value); + } + + @Override + public void _setLongLE(int index, long value) { + this.chk(index, 8); + PlatformDependent.putLong(this.addr(index), Long.reverseBytes(value)); + } + + @Override + public NettyArrowBuf setLong(int index, long value) { + arrowBuf.setLong(index, value); + return this; + } + + /** + * unwrap arrow buffer into a netty buffer. + */ + public static NettyArrowBuf unwrapBuffer(ArrowBuf buf) { + final NettyArrowBuf nettyArrowBuf = new NettyArrowBuf( + buf, + buf.getReferenceManager().getAllocator(), + checkedCastToInt(buf.capacity())); + nettyArrowBuf.readerIndex(checkedCastToInt(buf.readerIndex())); + nettyArrowBuf.writerIndex(checkedCastToInt(buf.writerIndex())); + return nettyArrowBuf; + } + +} diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java new file mode 100644 index 000000000..d0a5a9945 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + +import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.arrow.memory.OutOfMemoryException; +import org.apache.arrow.memory.util.LargeMemoryUtil; + +import io.netty.util.internal.OutOfDirectMemoryError; +import io.netty.util.internal.StringUtil; + +/** + * The base allocator that we use for all of Arrow's memory management. Returns + * UnsafeDirectLittleEndian buffers. + */ +public class PooledByteBufAllocatorL { + + private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator"); + + private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60; + public final UnsafeDirectLittleEndian empty; + private final AtomicLong hugeBufferSize = new AtomicLong(0); + private final AtomicLong hugeBufferCount = new AtomicLong(0); + private final AtomicLong normalBufferSize = new AtomicLong(0); + private final AtomicLong normalBufferCount = new AtomicLong(0); + private final InnerAllocator allocator; + + public PooledByteBufAllocatorL() { + allocator = new InnerAllocator(); + empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER)); + } + + /** + * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size. + */ + public UnsafeDirectLittleEndian allocate(long size) { + try { + return allocator.directBuffer(LargeMemoryUtil.checkedCastToInt(size), Integer.MAX_VALUE); + } catch (OutOfMemoryError e) { + /* + * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by + * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by + * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice + * as Netty is expected to throw an OutOfDirectMemoryError first. + */ + if (e instanceof OutOfDirectMemoryError || "Direct buffer memory".equals(e.getMessage())) { + throw new OutOfMemoryException("Failure allocating buffer.", e); + } + throw e; + } + } + + public int getChunkSize() { + return allocator.chunkSize; + } + + public long getHugeBufferSize() { + return hugeBufferSize.get(); + } + + public long getHugeBufferCount() { + return hugeBufferCount.get(); + } + + public long getNormalBufferSize() { + return normalBufferSize.get(); + } + + public long getNormalBufferCount() { + return normalBufferSize.get(); + } + + private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian { + + private final long initialCapacity; + private final AtomicLong count; + private final AtomicLong size; + + private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) { + super(buf); + this.initialCapacity = buf.capacity(); + this.count = count; + this.size = size; + } + + private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, + AtomicLong size) { + super(buf); + this.initialCapacity = buf.capacity(); + this.count = count; + this.size = size; + } + + @Override + public ByteBuf copy() { + throw new UnsupportedOperationException("copy method is not supported"); + } + + @Override + public ByteBuf copy(int index, int length) { + throw new UnsupportedOperationException("copy method is not supported"); + } + + @Override + public boolean release(int decrement) { + boolean released = super.release(decrement); + if (released) { + count.decrementAndGet(); + size.addAndGet(-initialCapacity); + } + return released; + } + + } + + private class InnerAllocator extends PooledByteBufAllocator { + + private final PoolArena<ByteBuffer>[] directArenas; + private final MemoryStatusThread statusThread; + private final int chunkSize; + + public InnerAllocator() { + super(true); + + try { + Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas"); + f.setAccessible(true); + this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this); + } catch (Exception e) { + throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e); + } + + this.chunkSize = directArenas[0].chunkSize; + + if (memoryLogger.isTraceEnabled()) { + statusThread = new MemoryStatusThread(); + statusThread.start(); + } else { + statusThread = null; + } + } + + private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) { + PoolThreadCache cache = threadCache(); + PoolArena<ByteBuffer> directArena = cache.directArena; + + if (directArena != null) { + + if (initialCapacity > directArena.chunkSize) { + // This is beyond chunk size so we'll allocate separately. + ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity); + + hugeBufferSize.addAndGet(buf.capacity()); + hugeBufferCount.incrementAndGet(); + + // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception()); + return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, + hugeBufferSize); + } else { + // within chunk, use arena. + ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity); + if (!(buf instanceof PooledUnsafeDirectByteBuf)) { + fail(); + } + + if (!ASSERT_ENABLED) { + return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf); + } + + normalBufferSize.addAndGet(buf.capacity()); + normalBufferCount.incrementAndGet(); + + return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, + normalBufferCount, normalBufferSize); + } + + } else { + throw fail(); + } + } + + private UnsupportedOperationException fail() { + return new UnsupportedOperationException( + "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " + + "didn't provide that functionality."); + } + + @Override + public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) { + if (initialCapacity == 0 && maxCapacity == 0) { + newDirectBuffer(initialCapacity, maxCapacity); + } + validate(initialCapacity, maxCapacity); + return newDirectBufferL(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + throw new UnsupportedOperationException("Arrow doesn't support using heap buffers."); + } + + + private void validate(int initialCapacity, int maxCapacity) { + if (initialCapacity < 0) { + throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: 0+)"); + } + if (initialCapacity > maxCapacity) { + throw new IllegalArgumentException(String.format( + "initialCapacity: %d (expected: not greater than maxCapacity(%d)", + initialCapacity, maxCapacity)); + } + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append(directArenas.length); + buf.append(" direct arena(s):"); + buf.append(StringUtil.NEWLINE); + for (PoolArena<ByteBuffer> a : directArenas) { + buf.append(a); + } + + buf.append("Large buffers outstanding: "); + buf.append(hugeBufferCount.get()); + buf.append(" totaling "); + buf.append(hugeBufferSize.get()); + buf.append(" bytes."); + buf.append('\n'); + buf.append("Normal buffers outstanding: "); + buf.append(normalBufferCount.get()); + buf.append(" totaling "); + buf.append(normalBufferSize.get()); + buf.append(" bytes."); + return buf.toString(); + } + + private class MemoryStatusThread extends Thread { + + public MemoryStatusThread() { + super("allocation.logger"); + this.setDaemon(true); + } + + @Override + public void run() { + while (true) { + memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString()); + try { + Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000); + } catch (InterruptedException e) { + return; + } + } + } + } + + + } +} diff --git a/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java new file mode 100644 index 000000000..e900b1ca7 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteOrder; +import java.util.concurrent.atomic.AtomicLong; + +import io.netty.util.internal.PlatformDependent; + +/** + * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs + * to abstract away the + * Netty classes and underlying Netty memory management. + */ +public class UnsafeDirectLittleEndian extends WrappedByteBuf { + + public static final boolean ASSERT_ENABLED; + private static final AtomicLong ID_GENERATOR = new AtomicLong(0); + + static { + boolean isAssertEnabled = false; + assert isAssertEnabled = true; + ASSERT_ENABLED = isAssertEnabled; + } + + public final long id = ID_GENERATOR.incrementAndGet(); + private final AbstractByteBuf wrapped; + private final long memoryAddress; + + UnsafeDirectLittleEndian(DuplicatedByteBuf buf) { + this(buf, true); + } + + UnsafeDirectLittleEndian(LargeBuffer buf) { + this(buf, true); + } + + UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) { + this(buf, true); + } + + private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) { + super(buf); + + this.wrapped = buf; + this.memoryAddress = buf.memoryAddress(); + } + + private long addr(int index) { + return memoryAddress + index; + } + + @Override + public long getLong(int index) { + // wrapped.checkIndex(index, 8); + long v = PlatformDependent.getLong(addr(index)); + return v; + } + + @Override + public float getFloat(int index) { + return Float.intBitsToFloat(getInt(index)); + } + + @Override + public ByteBuf slice() { + return slice(this.readerIndex(), readableBytes()); + } + + @Override + public ByteBuf slice(int index, int length) { + return new SlicedByteBuf(this, index, length); + } + + @Override + public ByteBuf order(ByteOrder endianness) { + return this; + } + + @Override + public double getDouble(int index) { + return Double.longBitsToDouble(getLong(index)); + } + + @Override + public char getChar(int index) { + return (char) getShort(index); + } + + @Override + public long getUnsignedInt(int index) { + return getInt(index) & 0xFFFFFFFFL; + } + + @Override + public int getInt(int index) { + int v = PlatformDependent.getInt(addr(index)); + return v; + } + + @Override + public int getUnsignedShort(int index) { + return getShort(index) & 0xFFFF; + } + + @Override + public short getShort(int index) { + short v = PlatformDependent.getShort(addr(index)); + return v; + } + + @Override + public ByteBuf setShort(int index, int value) { + wrapped.checkIndex(index, 2); + setShort_(index, value); + return this; + } + + @Override + public ByteBuf setInt(int index, int value) { + wrapped.checkIndex(index, 4); + setInt_(index, value); + return this; + } + + @Override + public ByteBuf setLong(int index, long value) { + wrapped.checkIndex(index, 8); + setLong_(index, value); + return this; + } + + @Override + public ByteBuf setChar(int index, int value) { + setShort(index, value); + return this; + } + + @Override + public ByteBuf setFloat(int index, float value) { + setInt(index, Float.floatToRawIntBits(value)); + return this; + } + + @Override + public ByteBuf setDouble(int index, double value) { + setLong(index, Double.doubleToRawLongBits(value)); + return this; + } + + @Override + public ByteBuf writeShort(int value) { + wrapped.ensureWritable(2); + setShort_(wrapped.writerIndex, value); + wrapped.writerIndex += 2; + return this; + } + + @Override + public ByteBuf writeInt(int value) { + wrapped.ensureWritable(4); + setInt_(wrapped.writerIndex, value); + wrapped.writerIndex += 4; + return this; + } + + @Override + public ByteBuf writeLong(long value) { + wrapped.ensureWritable(8); + setLong_(wrapped.writerIndex, value); + wrapped.writerIndex += 8; + return this; + } + + @Override + public ByteBuf writeChar(int value) { + writeShort(value); + return this; + } + + @Override + public ByteBuf writeFloat(float value) { + writeInt(Float.floatToRawIntBits(value)); + return this; + } + + @Override + public ByteBuf writeDouble(double value) { + writeLong(Double.doubleToRawLongBits(value)); + return this; + } + + private void setShort_(int index, int value) { + PlatformDependent.putShort(addr(index), (short) value); + } + + private void setInt_(int index, int value) { + PlatformDependent.putInt(addr(index), value); + } + + private void setLong_(int index, long value) { + PlatformDependent.putLong(addr(index), value); + } + + @Override + public byte getByte(int index) { + return PlatformDependent.getByte(addr(index)); + } + + @Override + public ByteBuf setByte(int index, int value) { + PlatformDependent.putByte(addr(index), (byte) value); + return this; + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + wrapped.checkIndex(index, length); + byte[] tmp = new byte[length]; + int readBytes = in.read(tmp); + if (readBytes > 0) { + PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes); + } + return readBytes; + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + wrapped.checkIndex(index, length); + if (length != 0) { + byte[] tmp = new byte[length]; + PlatformDependent.copyMemory(addr(index), tmp, 0, length); + out.write(tmp); + } + return this; + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override + public boolean equals(Object obj) { + return this == obj; + } +} diff --git a/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java new file mode 100644 index 000000000..ff40b49ff --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import io.netty.buffer.AbstractByteBufAllocator; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.ExpandableByteBuf; +import io.netty.buffer.NettyArrowBuf; + +/** + * An implementation of ByteBufAllocator that wraps a Arrow BufferAllocator. This allows the RPC + * layer to be accounted + * and managed using Arrow's BufferAllocator infrastructure. The only thin different from a + * typical BufferAllocator is + * the signature and the fact that this Allocator returns ExpandableByteBufs which enable + * otherwise non-expandable + * ArrowBufs to be expandable. + * + * @deprecated This class may be removed in a future release. + */ +@Deprecated +public class ArrowByteBufAllocator extends AbstractByteBufAllocator { + + private static final int DEFAULT_BUFFER_SIZE = 4096; + private static final int DEFAULT_MAX_COMPOSITE_COMPONENTS = 16; + + private final BufferAllocator allocator; + + public ArrowByteBufAllocator(BufferAllocator allocator) { + this.allocator = allocator; + } + + public BufferAllocator unwrap() { + return allocator; + } + + @Override + public ByteBuf buffer() { + return buffer(DEFAULT_BUFFER_SIZE); + } + + @Override + public ByteBuf buffer(int initialCapacity) { + return new ExpandableByteBuf(NettyArrowBuf.unwrapBuffer(allocator.buffer(initialCapacity)), allocator); + } + + @Override + public ByteBuf buffer(int initialCapacity, int maxCapacity) { + return buffer(initialCapacity); + } + + @Override + public ByteBuf ioBuffer() { + return buffer(); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity) { + return buffer(initialCapacity); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { + return buffer(initialCapacity); + } + + @Override + public ByteBuf directBuffer() { + return buffer(); + } + + @Override + public ByteBuf directBuffer(int initialCapacity) { + return NettyArrowBuf.unwrapBuffer(allocator.buffer(initialCapacity)); + } + + @Override + public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { + return buffer(initialCapacity, maxCapacity); + } + + @Override + public CompositeByteBuf compositeBuffer() { + return compositeBuffer(DEFAULT_MAX_COMPOSITE_COMPONENTS); + } + + @Override + public CompositeByteBuf compositeBuffer(int maxNumComponents) { + return new CompositeByteBuf(this, true, maxNumComponents); + } + + @Override + public CompositeByteBuf compositeDirectBuffer() { + return compositeBuffer(); + } + + @Override + public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { + return compositeBuffer(maxNumComponents); + } + + @Override + public boolean isDirectBufferPooled() { + return false; + } + + @Override + public ByteBuf heapBuffer() { + throw fail(); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity) { + throw fail(); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + throw fail(); + } + + @Override + public CompositeByteBuf compositeHeapBuffer() { + throw fail(); + } + + @Override + public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { + throw fail(); + } + + @Override + protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { + throw fail(); + } + + @Override + protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { + return buffer(initialCapacity, maxCapacity); + } + + private RuntimeException fail() { + throw new UnsupportedOperationException("Allocator doesn't support heap-based memory."); + } +} diff --git a/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java new file mode 100644 index 000000000..10cfb5c16 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +/** + * The default Allocation Manager Factory for a module. + * + */ +public class DefaultAllocationManagerFactory implements AllocationManager.Factory { + + public static final AllocationManager.Factory FACTORY = NettyAllocationManager.FACTORY; + + @Override + public AllocationManager create(BufferAllocator accountingAllocator, long size) { + return FACTORY.create(accountingAllocator, size); + } + + @Override + public ArrowBuf empty() { + return FACTORY.empty(); + } + +} diff --git a/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java new file mode 100644 index 000000000..200047783 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import io.netty.buffer.PooledByteBufAllocatorL; +import io.netty.buffer.UnsafeDirectLittleEndian; +import io.netty.util.internal.PlatformDependent; + +/** + * The default implementation of {@link AllocationManager}. The implementation is responsible for managing when memory + * is allocated and returned to the Netty-based PooledByteBufAllocatorL. + */ +public class NettyAllocationManager extends AllocationManager { + + public static final AllocationManager.Factory FACTORY = new AllocationManager.Factory() { + + @Override + public AllocationManager create(BufferAllocator accountingAllocator, long size) { + return new NettyAllocationManager(accountingAllocator, size); + } + + @Override + public ArrowBuf empty() { + return EMPTY_BUFFER; + } + }; + + /** + * The default cut-off value for switching allocation strategies. + * If the request size is not greater than the cut-off value, we will allocate memory by + * {@link PooledByteBufAllocatorL} APIs, + * otherwise, we will use {@link PlatformDependent} APIs. + */ + public static final int DEFAULT_ALLOCATION_CUTOFF_VALUE = Integer.MAX_VALUE; + + private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(); + static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty; + static final ArrowBuf EMPTY_BUFFER = new ArrowBuf(ReferenceManager.NO_OP, + null, + 0, + NettyAllocationManager.EMPTY.memoryAddress()); + static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize(); + + private final long allocatedSize; + private final UnsafeDirectLittleEndian memoryChunk; + private final long allocatedAddress; + + /** + * The cut-off value for switching allocation strategies. + */ + private final int allocationCutOffValue; + + NettyAllocationManager(BufferAllocator accountingAllocator, long requestedSize, int allocationCutOffValue) { + super(accountingAllocator); + this.allocationCutOffValue = allocationCutOffValue; + + if (requestedSize > allocationCutOffValue) { + this.memoryChunk = null; + this.allocatedAddress = PlatformDependent.allocateMemory(requestedSize); + this.allocatedSize = requestedSize; + } else { + this.memoryChunk = INNER_ALLOCATOR.allocate(requestedSize); + this.allocatedAddress = memoryChunk.memoryAddress(); + this.allocatedSize = memoryChunk.capacity(); + } + } + + NettyAllocationManager(BufferAllocator accountingAllocator, long requestedSize) { + this(accountingAllocator, requestedSize, DEFAULT_ALLOCATION_CUTOFF_VALUE); + } + + /** + * Get the underlying memory chunk managed by this AllocationManager. + * @return the underlying memory chunk if the request size is not greater than the + * {@link NettyAllocationManager#allocationCutOffValue}, or null otherwise. + * + * @deprecated this method will be removed in a future release. + */ + @Deprecated + UnsafeDirectLittleEndian getMemoryChunk() { + return memoryChunk; + } + + @Override + protected long memoryAddress() { + return allocatedAddress; + } + + @Override + protected void release0() { + if (memoryChunk == null) { + PlatformDependent.freeMemory(allocatedAddress); + } else { + memoryChunk.release(); + } + } + + /** + * Returns the underlying memory chunk size managed. + * + * <p>NettyAllocationManager rounds requested size up to the next power of two. + */ + @Override + public long getSize() { + return allocatedSize; + } + +} diff --git a/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestNettyArrowBuf.java b/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestNettyArrowBuf.java new file mode 100644 index 000000000..916cf82e7 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestNettyArrowBuf.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + +import java.nio.ByteBuffer; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.ArrowByteBufAllocator; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.junit.Assert; +import org.junit.Test; + +public class TestNettyArrowBuf { + + @Test + public void testSliceWithoutArgs() { + try (BufferAllocator allocator = new RootAllocator(128); + ArrowBuf buf = allocator.buffer(20); + ) { + NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf); + nettyBuf.writerIndex(20); + nettyBuf.readerIndex(10); + NettyArrowBuf slicedBuffer = nettyBuf.slice(); + int readableBytes = slicedBuffer.readableBytes(); + Assert.assertEquals(10, readableBytes); + } + } + + @Test + public void testNioBuffer() { + try (BufferAllocator allocator = new RootAllocator(128); + ArrowBuf buf = allocator.buffer(20); + ) { + NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf); + ByteBuffer byteBuffer = nettyBuf.nioBuffer(4, 6); + // Nio Buffers should always be 0 indexed + Assert.assertEquals(0, byteBuffer.position()); + Assert.assertEquals(6, byteBuffer.limit()); + // Underlying buffer has size 32 excluding 4 should have capacity of 28. + Assert.assertEquals(28, byteBuffer.capacity()); + + } + } + + @Test + public void testInternalNioBuffer() { + try (BufferAllocator allocator = new RootAllocator(128); + ArrowBuf buf = allocator.buffer(20); + ) { + NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf); + ByteBuffer byteBuffer = nettyBuf.internalNioBuffer(4, 6); + Assert.assertEquals(0, byteBuffer.position()); + Assert.assertEquals(6, byteBuffer.limit()); + // Underlying buffer has size 32 excluding 4 should have capacity of 28. + Assert.assertEquals(28, byteBuffer.capacity()); + + } + } + + @Test + public void testSetLEValues() { + try (BufferAllocator allocator = new RootAllocator(128); + ArrowBuf buf = allocator.buffer(20); + ) { + NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf); + int [] intVals = new int[] {Integer.MIN_VALUE, Short.MIN_VALUE - 1, Short.MIN_VALUE, 0 , + Short.MAX_VALUE , Short.MAX_VALUE + 1, Integer.MAX_VALUE}; + for (int intValue :intVals ) { + nettyBuf._setInt(0, intValue); + Assert.assertEquals(nettyBuf._getIntLE(0), Integer.reverseBytes(intValue)); + } + + long [] longVals = new long[] {Long.MIN_VALUE, 0 , Long.MAX_VALUE}; + for (long longValue :longVals ) { + nettyBuf._setLong(0, longValue); + Assert.assertEquals(nettyBuf._getLongLE(0), Long.reverseBytes(longValue)); + } + + short [] shortVals = new short[] {Short.MIN_VALUE, 0 , Short.MAX_VALUE}; + for (short shortValue :shortVals ) { + nettyBuf._setShort(0, shortValue); + Assert.assertEquals(nettyBuf._getShortLE(0), Short.reverseBytes(shortValue)); + } + } + } + + @Test + public void testSetCompositeBuffer() { + try (BufferAllocator allocator = new RootAllocator(128); + ArrowBuf buf = allocator.buffer(20); + NettyArrowBuf buf2 = NettyArrowBuf.unwrapBuffer(allocator.buffer(20)); + ) { + CompositeByteBuf byteBufs = new CompositeByteBuf(new ArrowByteBufAllocator(allocator), + true, 1); + int expected = 4; + buf2.setInt(0, expected); + buf2.writerIndex(4); + byteBufs.addComponent(true, buf2); + NettyArrowBuf.unwrapBuffer(buf).setBytes(0, byteBufs, 4); + int actual = buf.getInt(0); + Assert.assertEquals(expected, actual); + } + } + + @Test + public void testGetCompositeBuffer() { + try (BufferAllocator allocator = new RootAllocator(128); + ArrowBuf buf = allocator.buffer(20); + ) { + CompositeByteBuf byteBufs = new CompositeByteBuf(new ArrowByteBufAllocator(allocator), + true, 1); + int expected = 4; + buf.setInt(0, expected); + NettyArrowBuf buf2 = NettyArrowBuf.unwrapBuffer(allocator.buffer(20)); + // composite buffers are a bit weird, need to jump hoops + // to set capacity. + byteBufs.addComponent(true, buf2); + byteBufs.capacity(20); + NettyArrowBuf.unwrapBuffer(buf).getBytes(0, byteBufs, 4); + int actual = byteBufs.getInt(0); + Assert.assertEquals(expected, actual); + byteBufs.component(0).release(); + } + } +} diff --git a/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestUnsafeDirectLittleEndian.java b/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestUnsafeDirectLittleEndian.java new file mode 100644 index 000000000..c2bd95bb3 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/test/java/io/netty/buffer/TestUnsafeDirectLittleEndian.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; + +import org.junit.Test; + +public class TestUnsafeDirectLittleEndian { + private static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + + @Test + public void testPrimitiveGetSet() { + ByteBuf byteBuf = Unpooled.directBuffer(64); + UnsafeDirectLittleEndian unsafeDirect = new UnsafeDirectLittleEndian(new LargeBuffer(byteBuf)); + + unsafeDirect.setByte(0, Byte.MAX_VALUE); + unsafeDirect.setByte(1, -1); // 0xFF + unsafeDirect.setShort(2, Short.MAX_VALUE); + unsafeDirect.setShort(4, -2); // 0xFFFE + unsafeDirect.setInt(8, Integer.MAX_VALUE); + unsafeDirect.setInt(12, -66052); // 0xFFFE FDFC + unsafeDirect.setLong(16, Long.MAX_VALUE); + unsafeDirect.setLong(24, -4295098372L); // 0xFFFF FFFE FFFD FFFC + unsafeDirect.setFloat(32, 1.23F); + unsafeDirect.setFloat(36, -1.23F); + unsafeDirect.setDouble(40, 1.234567D); + unsafeDirect.setDouble(48, -1.234567D); + + assertEquals(Byte.MAX_VALUE, unsafeDirect.getByte(0)); + assertEquals(-1, unsafeDirect.getByte(1)); + assertEquals(Short.MAX_VALUE, unsafeDirect.getShort(2)); + assertEquals(-2, unsafeDirect.getShort(4)); + assertEquals((char) 65534, unsafeDirect.getChar(4)); + assertEquals(Integer.MAX_VALUE, unsafeDirect.getInt(8)); + assertEquals(-66052, unsafeDirect.getInt(12)); + assertEquals(4294901244L, unsafeDirect.getUnsignedInt(12)); + assertEquals(Long.MAX_VALUE, unsafeDirect.getLong(16)); + assertEquals(-4295098372L, unsafeDirect.getLong(24)); + assertEquals(1.23F, unsafeDirect.getFloat(32), 0.0); + assertEquals(-1.23F, unsafeDirect.getFloat(36), 0.0); + assertEquals(1.234567D, unsafeDirect.getDouble(40), 0.0); + assertEquals(-1.234567D, unsafeDirect.getDouble(48), 0.0); + + byte[] inBytes = "1234567".getBytes(StandardCharsets.UTF_8); + try (ByteArrayInputStream bais = new ByteArrayInputStream(inBytes); + ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + assertEquals(5, unsafeDirect.setBytes(56, bais, 5)); + unsafeDirect.getBytes(56, baos, 5); + assertEquals("12345", new String(baos.toByteArray(), StandardCharsets.UTF_8)); + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/ITTestLargeArrowBuf.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/ITTestLargeArrowBuf.java new file mode 100644 index 000000000..fa8d510e3 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/ITTestLargeArrowBuf.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Integration test for large (more than 2GB) {@link org.apache.arrow.memory.ArrowBuf}. + * To run this test, please make sure there is at least 4GB memory in the system. + */ +public class ITTestLargeArrowBuf { + private static final Logger logger = LoggerFactory.getLogger(ITTestLargeArrowBuf.class); + + private void run(long bufSize) { + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + ArrowBuf largeBuf = allocator.buffer(bufSize)) { + assertEquals(bufSize, largeBuf.capacity()); + logger.trace("Successfully allocated a buffer with capacity {}", largeBuf.capacity()); + + for (long i = 0; i < bufSize / 8; i++) { + largeBuf.setLong(i * 8, i); + + if ((i + 1) % 10000 == 0) { + logger.trace("Successfully written {} long words", i + 1); + } + } + logger.trace("Successfully written {} long words", bufSize / 8); + + for (long i = 0; i < bufSize / 8; i++) { + long val = largeBuf.getLong(i * 8); + assertEquals(i, val); + + if ((i + 1) % 10000 == 0) { + logger.trace("Successfully read {} long words", i + 1); + } + } + logger.trace("Successfully read {} long words", bufSize / 8); + } + logger.trace("Successfully released the large buffer."); + } + + @Test + public void testLargeArrowBuf() { + run(4 * 1024 * 1024 * 1024L); + } + + @Test + public void testMaxIntArrowBuf() { + run(Integer.MAX_VALUE); + } + +} diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestAllocationManagerNetty.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestAllocationManagerNetty.java new file mode 100644 index 000000000..2dbd56480 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestAllocationManagerNetty.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Test cases for {@link AllocationManager}. + */ +public class TestAllocationManagerNetty { + + @Test + public void testAllocationManagerType() { + // test netty allocation manager type + System.setProperty( + DefaultAllocationManagerOption.ALLOCATION_MANAGER_TYPE_PROPERTY_NAME, "Netty"); + DefaultAllocationManagerOption.AllocationManagerType mgrType = + DefaultAllocationManagerOption.getDefaultAllocationManagerType(); + + assertEquals(DefaultAllocationManagerOption.AllocationManagerType.Netty, mgrType); + } +} diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java new file mode 100644 index 000000000..ef49e4178 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -0,0 +1,1183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.arrow.memory.AllocationOutcomeDetails.Entry; +import org.apache.arrow.memory.rounding.RoundingPolicy; +import org.apache.arrow.memory.rounding.SegmentRoundingPolicy; +import org.apache.arrow.memory.util.AssertionUtil; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import sun.misc.Unsafe; + +public class TestBaseAllocator { + + private static final int MAX_ALLOCATION = 8 * 1024; + + /* + // ---------------------------------------- DEBUG ----------------------------------- + + @After + public void checkBuffers() { + final int bufferCount = UnsafeDirectLittleEndian.getBufferCount(); + if (bufferCount != 0) { + UnsafeDirectLittleEndian.logBuffers(logger); + UnsafeDirectLittleEndian.releaseBuffers(); + } + + assertEquals(0, bufferCount); + } + + // @AfterClass + // public static void dumpBuffers() { + // UnsafeDirectLittleEndian.logBuffers(logger); + // } + + // ---------------------------------------- DEBUG ------------------------------------ + */ + + + @Test + public void test_privateMax() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2); + assertNotNull("allocation failed", arrowBuf1); + + try (final BufferAllocator childAllocator = + rootAllocator.newChildAllocator("noLimits", 0, MAX_ALLOCATION)) { + final ArrowBuf arrowBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2); + assertNotNull("allocation failed", arrowBuf2); + arrowBuf2.getReferenceManager().release(); + } + + arrowBuf1.getReferenceManager().release(); + } + } + + @Test(expected = IllegalStateException.class) + public void testRootAllocator_closeWithOutstanding() throws Exception { + try { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + final ArrowBuf arrowBuf = rootAllocator.buffer(512); + assertNotNull("allocation failed", arrowBuf); + } + } finally { + /* + * We expect there to be one unreleased underlying buffer because we're closing + * without releasing it. + */ + /* + // ------------------------------- DEBUG --------------------------------- + final int bufferCount = UnsafeDirectLittleEndian.getBufferCount(); + UnsafeDirectLittleEndian.releaseBuffers(); + assertEquals(1, bufferCount); + // ------------------------------- DEBUG --------------------------------- + */ + } + } + + @Test + public void testRootAllocator_getEmpty() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + final ArrowBuf arrowBuf = rootAllocator.buffer(0); + assertNotNull("allocation failed", arrowBuf); + assertEquals("capacity was non-zero", 0, arrowBuf.capacity()); + assertTrue("address should be valid", arrowBuf.memoryAddress() != 0); + arrowBuf.getReferenceManager().release(); + } + } + + @Ignore // TODO(DRILL-2740) + @Test(expected = IllegalStateException.class) + public void testAllocator_unreleasedEmpty() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + @SuppressWarnings("unused") + final ArrowBuf arrowBuf = rootAllocator.buffer(0); + } + } + + @Test + public void testAllocator_transferOwnership() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator1 = + rootAllocator.newChildAllocator("changeOwnership1", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator2 = + rootAllocator.newChildAllocator("changeOwnership2", 0, MAX_ALLOCATION); + + final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4); + rootAllocator.verify(); + final ReferenceManager referenceManager = arrowBuf1.getReferenceManager(); + OwnershipTransferResult transferOwnership = referenceManager.transferOwnership(arrowBuf1, childAllocator2); + assertEquiv(arrowBuf1, transferOwnership.getTransferredBuffer()); + final boolean allocationFit = transferOwnership.getAllocationFit(); + rootAllocator.verify(); + assertTrue(allocationFit); + + arrowBuf1.getReferenceManager().release(); + childAllocator1.close(); + rootAllocator.verify(); + + transferOwnership.getTransferredBuffer().getReferenceManager().release(); + childAllocator2.close(); + } + } + + static <T> boolean equalsIgnoreOrder(Collection<T> c1, Collection<T> c2) { + return (c1.size() == c2.size() && c1.containsAll(c2)); + } + + @Test + public void testAllocator_getParentAndChild() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + assertEquals(rootAllocator.getParentAllocator(), null); + + try (final BufferAllocator childAllocator1 = + rootAllocator.newChildAllocator("child1", 0, MAX_ALLOCATION)) { + assertEquals(childAllocator1.getParentAllocator(), rootAllocator); + assertTrue( + equalsIgnoreOrder(Arrays.asList(childAllocator1), rootAllocator.getChildAllocators())); + + try (final BufferAllocator childAllocator2 = + rootAllocator.newChildAllocator("child2", 0, MAX_ALLOCATION)) { + assertEquals(childAllocator2.getParentAllocator(), rootAllocator); + assertTrue(equalsIgnoreOrder(Arrays.asList(childAllocator1, childAllocator2), + rootAllocator.getChildAllocators())); + + try (final BufferAllocator grandChildAllocator = + childAllocator1.newChildAllocator("grand-child", 0, MAX_ALLOCATION)) { + assertEquals(grandChildAllocator.getParentAllocator(), childAllocator1); + assertTrue(equalsIgnoreOrder(Arrays.asList(grandChildAllocator), + childAllocator1.getChildAllocators())); + } + } + } + } + } + + @Test + public void testAllocator_childRemovedOnClose() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator1 = + rootAllocator.newChildAllocator("child1", 0, MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator2 = + rootAllocator.newChildAllocator("child2", 0, MAX_ALLOCATION)) { + + // root has two child allocators + assertTrue(equalsIgnoreOrder(Arrays.asList(childAllocator1, childAllocator2), + rootAllocator.getChildAllocators())); + + try (final BufferAllocator grandChildAllocator = + childAllocator1.newChildAllocator("grand-child", 0, MAX_ALLOCATION)) { + + // child1 has one allocator i.e grand-child + assertTrue(equalsIgnoreOrder(Arrays.asList(grandChildAllocator), + childAllocator1.getChildAllocators())); + } + + // grand-child closed + assertTrue( + equalsIgnoreOrder(Collections.EMPTY_SET, childAllocator1.getChildAllocators())); + } + // root has only one child left + assertTrue( + equalsIgnoreOrder(Arrays.asList(childAllocator1), rootAllocator.getChildAllocators())); + } + // all child allocators closed. + assertTrue(equalsIgnoreOrder(Collections.EMPTY_SET, rootAllocator.getChildAllocators())); + } + } + + @Test + public void testAllocator_shareOwnership() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("shareOwnership1", 0, + MAX_ALLOCATION); + final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("shareOwnership2", 0, + MAX_ALLOCATION); + final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4); + rootAllocator.verify(); + + // share ownership of buffer. + final ArrowBuf arrowBuf2 = arrowBuf1.getReferenceManager().retain(arrowBuf1, childAllocator2); + rootAllocator.verify(); + assertNotNull(arrowBuf2); + assertNotEquals(arrowBuf2, arrowBuf1); + assertEquiv(arrowBuf1, arrowBuf2); + + // release original buffer (thus transferring ownership to allocator 2. (should leave + // allocator 1 in empty state) + arrowBuf1.getReferenceManager().release(); + rootAllocator.verify(); + childAllocator1.close(); + rootAllocator.verify(); + + final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("shareOwnership3", 0, + MAX_ALLOCATION); + final ArrowBuf arrowBuf3 = arrowBuf1.getReferenceManager().retain(arrowBuf1, childAllocator3); + assertNotNull(arrowBuf3); + assertNotEquals(arrowBuf3, arrowBuf1); + assertNotEquals(arrowBuf3, arrowBuf2); + assertEquiv(arrowBuf1, arrowBuf3); + rootAllocator.verify(); + + arrowBuf2.getReferenceManager().release(); + rootAllocator.verify(); + childAllocator2.close(); + rootAllocator.verify(); + + arrowBuf3.getReferenceManager().release(); + rootAllocator.verify(); + childAllocator3.close(); + } + } + + @Test + public void testRootAllocator_createChildAndUse() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator( + "createChildAndUse", 0, MAX_ALLOCATION)) { + final ArrowBuf arrowBuf = childAllocator.buffer(512); + assertNotNull("allocation failed", arrowBuf); + arrowBuf.getReferenceManager().release(); + } + } + } + + @Test(expected = IllegalStateException.class) + public void testRootAllocator_createChildDontClose() throws Exception { + try { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator = rootAllocator.newChildAllocator( + "createChildDontClose", 0, MAX_ALLOCATION); + final ArrowBuf arrowBuf = childAllocator.buffer(512); + assertNotNull("allocation failed", arrowBuf); + } + } finally { + /* + * We expect one underlying buffer because we closed a child allocator without + * releasing the buffer allocated from it. + */ + /* + // ------------------------------- DEBUG --------------------------------- + final int bufferCount = UnsafeDirectLittleEndian.getBufferCount(); + UnsafeDirectLittleEndian.releaseBuffers(); + assertEquals(1, bufferCount); + // ------------------------------- DEBUG --------------------------------- + */ + } + } + + @Test + public void testSegmentAllocator() { + RoundingPolicy policy = new SegmentRoundingPolicy(1024); + try (RootAllocator allocator = new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy)) { + ArrowBuf buf = allocator.buffer(798); + assertEquals(1024, buf.capacity()); + buf.setInt(333, 959); + assertEquals(959, buf.getInt(333)); + buf.close(); + + buf = allocator.buffer(1025); + assertEquals(2048, buf.capacity()); + buf.setInt(193, 939); + assertEquals(939, buf.getInt(193)); + buf.close(); + } + } + + @Test + public void testSegmentAllocator_childAllocator() { + RoundingPolicy policy = new SegmentRoundingPolicy(1024); + try (RootAllocator allocator = new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy); + BufferAllocator childAllocator = allocator.newChildAllocator("child", 0, 512 * 1024)) { + + assertEquals("child", childAllocator.getName()); + + ArrowBuf buf = childAllocator.buffer(798); + assertEquals(1024, buf.capacity()); + buf.setInt(333, 959); + assertEquals(959, buf.getInt(333)); + buf.close(); + + buf = childAllocator.buffer(1025); + assertEquals(2048, buf.capacity()); + buf.setInt(193, 939); + assertEquals(939, buf.getInt(193)); + buf.close(); + } + } + + @Test + public void testSegmentAllocator_smallSegment() { + IllegalArgumentException e = Assertions.assertThrows( + IllegalArgumentException.class, + () -> new SegmentRoundingPolicy(128) + ); + assertEquals("The segment size cannot be smaller than 1024", e.getMessage()); + } + + @Test + public void testSegmentAllocator_segmentSizeNotPowerOf2() { + IllegalArgumentException e = Assertions.assertThrows( + IllegalArgumentException.class, + () -> new SegmentRoundingPolicy(4097) + ); + assertEquals("The segment size must be a power of 2", e.getMessage()); + } + + @Test + public void testCustomizedAllocationManager() { + try (BaseAllocator allocator = createAllocatorWithCustomizedAllocationManager()) { + final ArrowBuf arrowBuf1 = allocator.buffer(MAX_ALLOCATION); + assertNotNull("allocation failed", arrowBuf1); + + arrowBuf1.setInt(0, 1); + assertEquals(1, arrowBuf1.getInt(0)); + + try { + final ArrowBuf arrowBuf2 = allocator.buffer(1); + fail("allocated memory beyond max allowed"); + } catch (OutOfMemoryException e) { + // expected + } + arrowBuf1.getReferenceManager().release(); + + try { + arrowBuf1.getInt(0); + fail("data read from released buffer"); + } catch (RuntimeException e) { + // expected + } + } + } + + private BaseAllocator createAllocatorWithCustomizedAllocationManager() { + return new RootAllocator(BaseAllocator.configBuilder() + .maxAllocation(MAX_ALLOCATION) + .allocationManagerFactory(new AllocationManager.Factory() { + @Override + public AllocationManager create(BufferAllocator accountingAllocator, long requestedSize) { + return new AllocationManager(accountingAllocator) { + private final Unsafe unsafe = getUnsafe(); + private final long address = unsafe.allocateMemory(requestedSize); + + @Override + protected long memoryAddress() { + return address; + } + + @Override + protected void release0() { + unsafe.setMemory(address, requestedSize, (byte) 0); + unsafe.freeMemory(address); + } + + @Override + public long getSize() { + return requestedSize; + } + + private Unsafe getUnsafe() { + Field f = null; + try { + f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } finally { + if (f != null) { + f.setAccessible(false); + } + } + } + }; + } + + @Override + public ArrowBuf empty() { + return null; + } + }).build()); + } + + // Allocation listener + // It counts the number of times it has been invoked, and how much memory allocation it has seen + // When set to 'expand on fail', it attempts to expand the associated allocator's limit + private static final class TestAllocationListener implements AllocationListener { + private int numPreCalls; + private int numCalls; + private int numReleaseCalls; + private int numChildren; + private long totalMem; + private boolean expandOnFail; + BufferAllocator expandAlloc; + long expandLimit; + + TestAllocationListener() { + this.numCalls = 0; + this.numChildren = 0; + this.totalMem = 0; + this.expandOnFail = false; + this.expandAlloc = null; + this.expandLimit = 0; + } + + @Override + public void onPreAllocation(long size) { + numPreCalls++; + } + + @Override + public void onAllocation(long size) { + numCalls++; + totalMem += size; + } + + @Override + public boolean onFailedAllocation(long size, AllocationOutcome outcome) { + if (expandOnFail) { + expandAlloc.setLimit(expandLimit); + return true; + } + return false; + } + + + @Override + public void onRelease(long size) { + numReleaseCalls++; + } + + @Override + public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) { + ++numChildren; + } + + @Override + public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) { + --numChildren; + } + + void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) { + this.expandOnFail = true; + this.expandAlloc = expandAlloc; + this.expandLimit = expandLimit; + } + + int getNumPreCalls() { + return numPreCalls; + } + + int getNumReleaseCalls() { + return numReleaseCalls; + } + + int getNumCalls() { + return numCalls; + } + + int getNumChildren() { + return numChildren; + } + + long getTotalMem() { + return totalMem; + } + } + + @Test + public void testRootAllocator_listeners() throws Exception { + TestAllocationListener l1 = new TestAllocationListener(); + assertEquals(0, l1.getNumPreCalls()); + assertEquals(0, l1.getNumCalls()); + assertEquals(0, l1.getNumReleaseCalls()); + assertEquals(0, l1.getNumChildren()); + assertEquals(0, l1.getTotalMem()); + TestAllocationListener l2 = new TestAllocationListener(); + assertEquals(0, l2.getNumPreCalls()); + assertEquals(0, l2.getNumCalls()); + assertEquals(0, l2.getNumReleaseCalls()); + assertEquals(0, l2.getNumChildren()); + assertEquals(0, l2.getTotalMem()); + // root and first-level child share the first listener + // second-level and third-level child share the second listener + try (final RootAllocator rootAllocator = new RootAllocator(l1, MAX_ALLOCATION)) { + try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", 0, MAX_ALLOCATION)) { + assertEquals(1, l1.getNumChildren()); + final ArrowBuf buf1 = c1.buffer(16); + assertNotNull("allocation failed", buf1); + assertEquals(1, l1.getNumPreCalls()); + assertEquals(1, l1.getNumCalls()); + assertEquals(0, l1.getNumReleaseCalls()); + assertEquals(16, l1.getTotalMem()); + buf1.getReferenceManager().release(); + try (final BufferAllocator c2 = c1.newChildAllocator("c2", l2, 0, MAX_ALLOCATION)) { + assertEquals(2, l1.getNumChildren()); // c1 got a new child, so c1's listener (l1) is notified + assertEquals(0, l2.getNumChildren()); + final ArrowBuf buf2 = c2.buffer(32); + assertNotNull("allocation failed", buf2); + assertEquals(1, l1.getNumCalls()); + assertEquals(16, l1.getTotalMem()); + assertEquals(1, l2.getNumPreCalls()); + assertEquals(1, l2.getNumCalls()); + assertEquals(0, l2.getNumReleaseCalls()); + assertEquals(32, l2.getTotalMem()); + buf2.getReferenceManager().release(); + try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0, MAX_ALLOCATION)) { + assertEquals(2, l1.getNumChildren()); + assertEquals(1, l2.getNumChildren()); + final ArrowBuf buf3 = c3.buffer(64); + assertNotNull("allocation failed", buf3); + assertEquals(1, l1.getNumPreCalls()); + assertEquals(1, l1.getNumCalls()); + assertEquals(1, l1.getNumReleaseCalls()); + assertEquals(16, l1.getTotalMem()); + assertEquals(2, l2.getNumPreCalls()); + assertEquals(2, l2.getNumCalls()); + assertEquals(1, l2.getNumReleaseCalls()); + assertEquals(32 + 64, l2.getTotalMem()); + buf3.getReferenceManager().release(); + } + assertEquals(2, l1.getNumChildren()); + assertEquals(0, l2.getNumChildren()); // third-level child removed + } + assertEquals(1, l1.getNumChildren()); // second-level child removed + assertEquals(0, l2.getNumChildren()); + } + assertEquals(0, l1.getNumChildren()); // first-level child removed + + assertEquals(2, l2.getNumReleaseCalls()); + } + } + + @Test + public void testRootAllocator_listenerAllocationFail() throws Exception { + TestAllocationListener l1 = new TestAllocationListener(); + assertEquals(0, l1.getNumCalls()); + assertEquals(0, l1.getTotalMem()); + // Test attempts to allocate too much from a child whose limit is set to half of the max + // allocation. The listener's callback triggers, expanding the child allocator's limit, so then + // the allocation succeeds. + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", l1, 0, + MAX_ALLOCATION / 2)) { + try { + c1.buffer(MAX_ALLOCATION); + fail("allocated memory beyond max allowed"); + } catch (OutOfMemoryException e) { + // expected + } + assertEquals(0, l1.getNumCalls()); + assertEquals(0, l1.getTotalMem()); + + l1.setExpandOnFail(c1, MAX_ALLOCATION); + ArrowBuf arrowBuf = c1.buffer(MAX_ALLOCATION); + assertNotNull("allocation failed", arrowBuf); + assertEquals(1, l1.getNumCalls()); + assertEquals(MAX_ALLOCATION, l1.getTotalMem()); + arrowBuf.getReferenceManager().release(); + } + } + } + + private static void allocateAndFree(final BufferAllocator allocator) { + final ArrowBuf arrowBuf = allocator.buffer(512); + assertNotNull("allocation failed", arrowBuf); + arrowBuf.getReferenceManager().release(); + + final ArrowBuf arrowBuf2 = allocator.buffer(MAX_ALLOCATION); + assertNotNull("allocation failed", arrowBuf2); + arrowBuf2.getReferenceManager().release(); + + final int nBufs = 8; + final ArrowBuf[] arrowBufs = new ArrowBuf[nBufs]; + for (int i = 0; i < arrowBufs.length; ++i) { + ArrowBuf arrowBufi = allocator.buffer(MAX_ALLOCATION / nBufs); + assertNotNull("allocation failed", arrowBufi); + arrowBufs[i] = arrowBufi; + } + for (ArrowBuf arrowBufi : arrowBufs) { + arrowBufi.getReferenceManager().release(); + } + } + + @Test + public void testAllocator_manyAllocations() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator = + rootAllocator.newChildAllocator("manyAllocations", 0, MAX_ALLOCATION)) { + allocateAndFree(childAllocator); + } + } + } + + @Test + public void testAllocator_overAllocate() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator = + rootAllocator.newChildAllocator("overAllocate", 0, MAX_ALLOCATION)) { + allocateAndFree(childAllocator); + + try { + childAllocator.buffer(MAX_ALLOCATION + 1); + fail("allocated memory beyond max allowed"); + } catch (OutOfMemoryException e) { + // expected + } + } + } + } + + @Test + public void testAllocator_overAllocateParent() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator = + rootAllocator.newChildAllocator("overAllocateParent", 0, MAX_ALLOCATION)) { + final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2); + assertNotNull("allocation failed", arrowBuf1); + final ArrowBuf arrowBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2); + assertNotNull("allocation failed", arrowBuf2); + + try { + childAllocator.buffer(MAX_ALLOCATION / 4); + fail("allocated memory beyond max allowed"); + } catch (OutOfMemoryException e) { + // expected + } + + arrowBuf1.getReferenceManager().release(); + arrowBuf2.getReferenceManager().release(); + } + } + } + + @Test + public void testAllocator_failureAtParentLimitOutcomeDetails() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator = + rootAllocator.newChildAllocator("child", 0, MAX_ALLOCATION / 2)) { + try (final BufferAllocator grandChildAllocator = + childAllocator.newChildAllocator("grandchild", MAX_ALLOCATION / 4, MAX_ALLOCATION)) { + OutOfMemoryException e = assertThrows(OutOfMemoryException.class, + () -> grandChildAllocator.buffer(MAX_ALLOCATION)); + // expected + assertTrue(e.getMessage().contains("Unable to allocate buffer")); + + assertTrue("missing outcome details", e.getOutcomeDetails().isPresent()); + AllocationOutcomeDetails outcomeDetails = e.getOutcomeDetails().get(); + + assertEquals(outcomeDetails.getFailedAllocator(), childAllocator); + + // The order of allocators should be child to root (request propagates to parent if + // child cannot satisfy the request). + Iterator<Entry> iterator = outcomeDetails.allocEntries.iterator(); + AllocationOutcomeDetails.Entry first = iterator.next(); + assertEquals(MAX_ALLOCATION / 4, first.getAllocatedSize()); + assertEquals(MAX_ALLOCATION, first.getRequestedSize()); + assertEquals(false, first.isAllocationFailed()); + + AllocationOutcomeDetails.Entry second = iterator.next(); + assertEquals(MAX_ALLOCATION - MAX_ALLOCATION / 4, second.getRequestedSize()); + assertEquals(0, second.getAllocatedSize()); + assertEquals(true, second.isAllocationFailed()); + + assertFalse(iterator.hasNext()); + } + } + } + } + + @Test + public void testAllocator_failureAtRootLimitOutcomeDetails() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator = + rootAllocator.newChildAllocator("child", MAX_ALLOCATION / 2, Long.MAX_VALUE)) { + try (final BufferAllocator grandChildAllocator = + childAllocator.newChildAllocator("grandchild", MAX_ALLOCATION / 4, Long.MAX_VALUE)) { + OutOfMemoryException e = assertThrows(OutOfMemoryException.class, + () -> grandChildAllocator.buffer(MAX_ALLOCATION * 2)); + + assertTrue(e.getMessage().contains("Unable to allocate buffer")); + assertTrue("missing outcome details", e.getOutcomeDetails().isPresent()); + AllocationOutcomeDetails outcomeDetails = e.getOutcomeDetails().get(); + + assertEquals(outcomeDetails.getFailedAllocator(), rootAllocator); + + // The order of allocators should be child to root (request propagates to parent if + // child cannot satisfy the request). + Iterator<Entry> iterator = outcomeDetails.allocEntries.iterator(); + AllocationOutcomeDetails.Entry first = iterator.next(); + assertEquals(MAX_ALLOCATION / 4, first.getAllocatedSize()); + assertEquals(2 * MAX_ALLOCATION, first.getRequestedSize()); + assertEquals(false, first.isAllocationFailed()); + + AllocationOutcomeDetails.Entry second = iterator.next(); + assertEquals(MAX_ALLOCATION / 4, second.getAllocatedSize()); + assertEquals(2 * MAX_ALLOCATION - MAX_ALLOCATION / 4, second.getRequestedSize()); + assertEquals(false, second.isAllocationFailed()); + + AllocationOutcomeDetails.Entry third = iterator.next(); + assertEquals(0, third.getAllocatedSize()); + assertEquals(true, third.isAllocationFailed()); + + assertFalse(iterator.hasNext()); + } + } + } + } + + private static void testAllocator_sliceUpBufferAndRelease( + final RootAllocator rootAllocator, final BufferAllocator bufferAllocator) { + final ArrowBuf arrowBuf1 = bufferAllocator.buffer(MAX_ALLOCATION / 2); + rootAllocator.verify(); + + final ArrowBuf arrowBuf2 = arrowBuf1.slice(16, arrowBuf1.capacity() - 32); + rootAllocator.verify(); + final ArrowBuf arrowBuf3 = arrowBuf2.slice(16, arrowBuf2.capacity() - 32); + rootAllocator.verify(); + @SuppressWarnings("unused") + final ArrowBuf arrowBuf4 = arrowBuf3.slice(16, arrowBuf3.capacity() - 32); + rootAllocator.verify(); + + arrowBuf3.getReferenceManager().release(); // since they share refcounts, one is enough to release them all + rootAllocator.verify(); + } + + @Test + public void testAllocator_createSlices() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator); + + try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0, + MAX_ALLOCATION)) { + testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator); + } + rootAllocator.verify(); + + testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator); + + try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0, + MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator2 = + childAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) { + final ArrowBuf arrowBuf1 = childAllocator2.buffer(MAX_ALLOCATION / 8); + @SuppressWarnings("unused") + final ArrowBuf arrowBuf2 = arrowBuf1.slice(MAX_ALLOCATION / 16, MAX_ALLOCATION / 16); + testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator); + arrowBuf1.getReferenceManager().release(); + rootAllocator.verify(); + } + rootAllocator.verify(); + + testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator); + } + rootAllocator.verify(); + } + } + + @Test + public void testAllocator_sliceRanges() throws Exception { + // final AllocatorOwner allocatorOwner = new NamedOwner("sliceRanges"); + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + // Populate a buffer with byte values corresponding to their indices. + final ArrowBuf arrowBuf = rootAllocator.buffer(256); + assertEquals(256, arrowBuf.capacity()); + assertEquals(0, arrowBuf.readerIndex()); + assertEquals(0, arrowBuf.readableBytes()); + assertEquals(0, arrowBuf.writerIndex()); + assertEquals(256, arrowBuf.writableBytes()); + + final ArrowBuf slice3 = arrowBuf.slice(); + assertEquals(0, slice3.readerIndex()); + assertEquals(0, slice3.readableBytes()); + assertEquals(0, slice3.writerIndex()); + // assertEquals(256, slice3.capacity()); + // assertEquals(256, slice3.writableBytes()); + + for (int i = 0; i < 256; ++i) { + arrowBuf.writeByte(i); + } + assertEquals(0, arrowBuf.readerIndex()); + assertEquals(256, arrowBuf.readableBytes()); + assertEquals(256, arrowBuf.writerIndex()); + assertEquals(0, arrowBuf.writableBytes()); + + final ArrowBuf slice1 = arrowBuf.slice(); + assertEquals(0, slice1.readerIndex()); + assertEquals(256, slice1.readableBytes()); + for (int i = 0; i < 10; ++i) { + assertEquals(i, slice1.readByte()); + } + assertEquals(256 - 10, slice1.readableBytes()); + for (int i = 0; i < 256; ++i) { + assertEquals((byte) i, slice1.getByte(i)); + } + + final ArrowBuf slice2 = arrowBuf.slice(25, 25); + assertEquals(0, slice2.readerIndex()); + assertEquals(25, slice2.readableBytes()); + for (int i = 25; i < 50; ++i) { + assertEquals(i, slice2.readByte()); + } + + /* + for(int i = 256; i > 0; --i) { + slice3.writeByte(i - 1); + } + for(int i = 0; i < 256; ++i) { + assertEquals(255 - i, slice1.getByte(i)); + } + */ + + arrowBuf.getReferenceManager().release(); // all the derived buffers share this fate + } + } + + @Test + public void testAllocator_slicesOfSlices() throws Exception { + // final AllocatorOwner allocatorOwner = new NamedOwner("slicesOfSlices"); + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + // Populate a buffer with byte values corresponding to their indices. + final ArrowBuf arrowBuf = rootAllocator.buffer(256); + for (int i = 0; i < 256; ++i) { + arrowBuf.writeByte(i); + } + + // Slice it up. + final ArrowBuf slice0 = arrowBuf.slice(0, arrowBuf.capacity()); + for (int i = 0; i < 256; ++i) { + assertEquals((byte) i, arrowBuf.getByte(i)); + } + + final ArrowBuf slice10 = slice0.slice(10, arrowBuf.capacity() - 10); + for (int i = 10; i < 256; ++i) { + assertEquals((byte) i, slice10.getByte(i - 10)); + } + + final ArrowBuf slice20 = slice10.slice(10, arrowBuf.capacity() - 20); + for (int i = 20; i < 256; ++i) { + assertEquals((byte) i, slice20.getByte(i - 20)); + } + + final ArrowBuf slice30 = slice20.slice(10, arrowBuf.capacity() - 30); + for (int i = 30; i < 256; ++i) { + assertEquals((byte) i, slice30.getByte(i - 30)); + } + + arrowBuf.getReferenceManager().release(); + } + } + + @Test + public void testAllocator_transferSliced() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced1", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced2", 0, MAX_ALLOCATION); + + final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8); + final ArrowBuf arrowBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8); + + final ArrowBuf arrowBuf1s = arrowBuf1.slice(0, arrowBuf1.capacity() / 2); + final ArrowBuf arrowBuf2s = arrowBuf2.slice(0, arrowBuf2.capacity() / 2); + + rootAllocator.verify(); + + OwnershipTransferResult result1 = arrowBuf2s.getReferenceManager().transferOwnership(arrowBuf2s, childAllocator1); + assertEquiv(arrowBuf2s, result1.getTransferredBuffer()); + rootAllocator.verify(); + OwnershipTransferResult result2 = arrowBuf1s.getReferenceManager().transferOwnership(arrowBuf1s, childAllocator2); + assertEquiv(arrowBuf1s, result2.getTransferredBuffer()); + rootAllocator.verify(); + + result1.getTransferredBuffer().getReferenceManager().release(); + result2.getTransferredBuffer().getReferenceManager().release(); + + arrowBuf1s.getReferenceManager().release(); // releases arrowBuf1 + arrowBuf2s.getReferenceManager().release(); // releases arrowBuf2 + + childAllocator1.close(); + childAllocator2.close(); + } + } + + @Test + public void testAllocator_shareSliced() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION); + + final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8); + final ArrowBuf arrowBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8); + + final ArrowBuf arrowBuf1s = arrowBuf1.slice(0, arrowBuf1.capacity() / 2); + final ArrowBuf arrowBuf2s = arrowBuf2.slice(0, arrowBuf2.capacity() / 2); + + rootAllocator.verify(); + + final ArrowBuf arrowBuf2s1 = arrowBuf2s.getReferenceManager().retain(arrowBuf2s, childAllocator1); + assertEquiv(arrowBuf2s, arrowBuf2s1); + final ArrowBuf arrowBuf1s2 = arrowBuf1s.getReferenceManager().retain(arrowBuf1s, childAllocator2); + assertEquiv(arrowBuf1s, arrowBuf1s2); + rootAllocator.verify(); + + arrowBuf1s.getReferenceManager().release(); // releases arrowBuf1 + arrowBuf2s.getReferenceManager().release(); // releases arrowBuf2 + rootAllocator.verify(); + + arrowBuf2s1.getReferenceManager().release(); // releases the shared arrowBuf2 slice + arrowBuf1s2.getReferenceManager().release(); // releases the shared arrowBuf1 slice + + childAllocator1.close(); + childAllocator2.close(); + } + } + + @Test + public void testAllocator_transferShared() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferShared1", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferShared2", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("transferShared3", 0, MAX_ALLOCATION); + + final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8); + + boolean allocationFit; + + ArrowBuf arrowBuf2 = arrowBuf1.getReferenceManager().retain(arrowBuf1, childAllocator2); + rootAllocator.verify(); + assertNotNull(arrowBuf2); + assertNotEquals(arrowBuf2, arrowBuf1); + assertEquiv(arrowBuf1, arrowBuf2); + + final ReferenceManager refManager1 = arrowBuf1.getReferenceManager(); + final OwnershipTransferResult result1 = refManager1.transferOwnership(arrowBuf1, childAllocator3); + allocationFit = result1.getAllocationFit(); + final ArrowBuf arrowBuf3 = result1.getTransferredBuffer(); + assertTrue(allocationFit); + assertEquiv(arrowBuf1, arrowBuf3); + rootAllocator.verify(); + + // Since childAllocator3 now has childAllocator1's buffer, 1, can close + arrowBuf1.getReferenceManager().release(); + childAllocator1.close(); + rootAllocator.verify(); + + arrowBuf2.getReferenceManager().release(); + childAllocator2.close(); + rootAllocator.verify(); + + final BufferAllocator childAllocator4 = rootAllocator.newChildAllocator("transferShared4", 0, MAX_ALLOCATION); + final ReferenceManager refManager3 = arrowBuf3.getReferenceManager(); + final OwnershipTransferResult result3 = refManager3.transferOwnership(arrowBuf3, childAllocator4); + allocationFit = result3.getAllocationFit(); + final ArrowBuf arrowBuf4 = result3.getTransferredBuffer(); + assertTrue(allocationFit); + assertEquiv(arrowBuf3, arrowBuf4); + rootAllocator.verify(); + + arrowBuf3.getReferenceManager().release(); + childAllocator3.close(); + rootAllocator.verify(); + + arrowBuf4.getReferenceManager().release(); + childAllocator4.close(); + rootAllocator.verify(); + } + } + + @Test + public void testAllocator_unclaimedReservation() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator1 = + rootAllocator.newChildAllocator("unclaimedReservation", 0, MAX_ALLOCATION)) { + try (final AllocationReservation reservation = childAllocator1.newReservation()) { + assertTrue(reservation.add(64)); + } + rootAllocator.verify(); + } + } + } + + @Test + public void testAllocator_claimedReservation() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + + try (final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator( + "claimedReservation", 0, MAX_ALLOCATION)) { + + try (final AllocationReservation reservation = childAllocator1.newReservation()) { + assertTrue(reservation.add(32)); + assertTrue(reservation.add(32)); + + final ArrowBuf arrowBuf = reservation.allocateBuffer(); + assertEquals(64, arrowBuf.capacity()); + rootAllocator.verify(); + + arrowBuf.getReferenceManager().release(); + rootAllocator.verify(); + } + rootAllocator.verify(); + } + } + } + + @Test + public void testInitReservationAndLimit() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator( + "child", 2048, 4096)) { + assertEquals(2048, childAllocator.getInitReservation()); + assertEquals(4096, childAllocator.getLimit()); + } + } + } + + @Test + public void multiple() throws Exception { + final String owner = "test"; + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + + final int op = 100000; + + BufferAllocator frag1 = allocator.newChildAllocator(owner, 1500000, Long.MAX_VALUE); + BufferAllocator frag2 = allocator.newChildAllocator(owner, 500000, Long.MAX_VALUE); + + allocator.verify(); + + BufferAllocator allocator11 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE); + ArrowBuf b11 = allocator11.buffer(1000000); + + allocator.verify(); + + BufferAllocator allocator12 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE); + ArrowBuf b12 = allocator12.buffer(500000); + + allocator.verify(); + + BufferAllocator allocator21 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE); + + allocator.verify(); + + BufferAllocator allocator22 = frag2.newChildAllocator(owner, op, Long.MAX_VALUE); + ArrowBuf b22 = allocator22.buffer(2000000); + + allocator.verify(); + + BufferAllocator frag3 = allocator.newChildAllocator(owner, 1000000, Long.MAX_VALUE); + + allocator.verify(); + + BufferAllocator allocator31 = frag3.newChildAllocator(owner, op, Long.MAX_VALUE); + ArrowBuf b31a = allocator31.buffer(200000); + + allocator.verify(); + + // Previously running operator completes + b22.getReferenceManager().release(); + + allocator.verify(); + + allocator22.close(); + + b31a.getReferenceManager().release(); + allocator31.close(); + + b12.getReferenceManager().release(); + allocator12.close(); + + allocator21.close(); + + b11.getReferenceManager().release(); + allocator11.close(); + + frag1.close(); + frag2.close(); + frag3.close(); + + } + } + + // This test needs to run in non-debug mode. So disabling the assertion status through class loader for this. + // The test passes if run individually with -Dtest=TestBaseAllocator#testMemoryLeakWithReservation + // but fails generally since the assertion status cannot be changed once the class is initialized. + // So setting the test to @ignore + @Test(expected = IllegalStateException.class) + @Ignore + public void testMemoryLeakWithReservation() throws Exception { + // disabling assertion status + AssertionUtil.class.getClassLoader().setClassAssertionStatus(AssertionUtil.class.getName(), false); + try (RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + ChildAllocator childAllocator1 = (ChildAllocator) rootAllocator.newChildAllocator( + "child1", 1024, MAX_ALLOCATION); + rootAllocator.verify(); + + ChildAllocator childAllocator2 = (ChildAllocator) childAllocator1.newChildAllocator( + "child2", 1024, MAX_ALLOCATION); + rootAllocator.verify(); + + ArrowBuf buff = childAllocator2.buffer(256); + + Exception exception = assertThrows(IllegalStateException.class, () -> { + childAllocator2.close(); + }); + String exMessage = exception.getMessage(); + assertTrue(exMessage.contains("Memory leaked: (256)")); + + exception = assertThrows(IllegalStateException.class, () -> { + childAllocator1.close(); + }); + exMessage = exception.getMessage(); + assertTrue(exMessage.contains("Memory leaked: (256)")); + } + } + + public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) { + assertEquals(origBuf.readerIndex(), newBuf.readerIndex()); + assertEquals(origBuf.writerIndex(), newBuf.writerIndex()); + } +} diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEmptyArrowBuf.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEmptyArrowBuf.java new file mode 100644 index 000000000..3fd7ce74a --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEmptyArrowBuf.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.assertEquals; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.PooledByteBufAllocatorL; + +public class TestEmptyArrowBuf { + + private static final int MAX_ALLOCATION = 8 * 1024; + private static RootAllocator allocator; + + @BeforeClass + public static void beforeClass() { + allocator = new RootAllocator(MAX_ALLOCATION); + } + + /** Ensure the allocator is closed. */ + @AfterClass + public static void afterClass() { + if (allocator != null) { + allocator.close(); + } + } + + @Test + public void testZeroBuf() { + // Exercise the historical log inside the empty ArrowBuf. This is initialized statically, and there is a circular + // dependency between ArrowBuf and BaseAllocator, so if the initialization happens in the wrong order, the + // historical log will be null even though BaseAllocator.DEBUG is true. + allocator.getEmpty().print(new StringBuilder(), 0, BaseAllocator.Verbosity.LOG_WITH_STACKTRACE); + } + + @Test + public void testEmptyArrowBuf() { + ArrowBuf buf = new ArrowBuf(ReferenceManager.NO_OP, null, + 1024, new PooledByteBufAllocatorL().empty.memoryAddress()); + + buf.getReferenceManager().retain(); + buf.getReferenceManager().retain(8); + assertEquals(1024, buf.capacity()); + assertEquals(1, buf.getReferenceManager().getRefCount()); + assertEquals(0, buf.getActualMemoryConsumed()); + + for (int i = 0; i < 10; i++) { + buf.setByte(i, i); + } + assertEquals(0, buf.getActualMemoryConsumed()); + assertEquals(0, buf.getReferenceManager().getSize()); + assertEquals(0, buf.getReferenceManager().getAccountedSize()); + assertEquals(false, buf.getReferenceManager().release()); + assertEquals(false, buf.getReferenceManager().release(2)); + assertEquals(0, buf.getReferenceManager().getAllocator().getLimit()); + assertEquals(buf, buf.getReferenceManager().transferOwnership(buf, allocator).getTransferredBuffer()); + assertEquals(0, buf.readerIndex()); + assertEquals(0, buf.writerIndex()); + assertEquals(1, buf.refCnt()); + + ArrowBuf derive = buf.getReferenceManager().deriveBuffer(buf, 0, 100); + assertEquals(derive, buf); + assertEquals(1, buf.refCnt()); + assertEquals(1, derive.refCnt()); + + buf.close(); + + } + +} diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEndianness.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEndianness.java new file mode 100644 index 000000000..dcaeb2488 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestEndianness.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.assertEquals; + +import java.nio.ByteOrder; + +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.NettyArrowBuf; + +public class TestEndianness { + + @Test + public void testNativeEndian() { + final BufferAllocator a = new RootAllocator(10000); + final ByteBuf b = NettyArrowBuf.unwrapBuffer(a.buffer(4)); + b.setInt(0, 35); + if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) { + assertEquals(b.getByte(0), 35); + assertEquals(b.getByte(1), 0); + assertEquals(b.getByte(2), 0); + assertEquals(b.getByte(3), 0); + } else { + assertEquals(b.getByte(0), 0); + assertEquals(b.getByte(1), 0); + assertEquals(b.getByte(2), 0); + assertEquals(b.getByte(3), 35); + } + b.release(); + a.close(); + } + +} diff --git a/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java new file mode 100644 index 000000000..1b64cd733 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Test cases for {@link NettyAllocationManager}. + */ +public class TestNettyAllocationManager { + + static int CUSTOMIZED_ALLOCATION_CUTOFF_VALUE = 1024; + + private BaseAllocator createCustomizedAllocator() { + return new RootAllocator(BaseAllocator.configBuilder() + .allocationManagerFactory(new AllocationManager.Factory() { + @Override + public AllocationManager create(BufferAllocator accountingAllocator, long size) { + return new NettyAllocationManager(accountingAllocator, size, CUSTOMIZED_ALLOCATION_CUTOFF_VALUE); + } + + @Override + public ArrowBuf empty() { + return null; + } + }).build()); + } + + private void readWriteArrowBuf(ArrowBuf buffer) { + // write buffer + for (long i = 0; i < buffer.capacity() / 8; i++) { + buffer.setLong(i * 8, i); + } + + // read buffer + for (long i = 0; i < buffer.capacity() / 8; i++) { + long val = buffer.getLong(i * 8); + assertEquals(i, val); + } + } + + /** + * Test the allocation strategy for small buffers.. + */ + @Test + public void testSmallBufferAllocation() { + final long bufSize = CUSTOMIZED_ALLOCATION_CUTOFF_VALUE - 512L; + try (BaseAllocator allocator = createCustomizedAllocator(); + ArrowBuf buffer = allocator.buffer(bufSize)) { + + assertTrue(buffer.getReferenceManager() instanceof BufferLedger); + BufferLedger bufferLedger = (BufferLedger) buffer.getReferenceManager(); + + // make sure we are using netty allocation manager + AllocationManager allocMgr = bufferLedger.getAllocationManager(); + assertTrue(allocMgr instanceof NettyAllocationManager); + NettyAllocationManager nettyMgr = (NettyAllocationManager) allocMgr; + + // for the small buffer allocation strategy, the chunk is not null + assertNotNull(nettyMgr.getMemoryChunk()); + + readWriteArrowBuf(buffer); + } + } + + /** + * Test the allocation strategy for large buffers.. + */ + @Test + public void testLargeBufferAllocation() { + final long bufSize = CUSTOMIZED_ALLOCATION_CUTOFF_VALUE + 1024L; + try (BaseAllocator allocator = createCustomizedAllocator(); + ArrowBuf buffer = allocator.buffer(bufSize)) { + assertTrue(buffer.getReferenceManager() instanceof BufferLedger); + BufferLedger bufferLedger = (BufferLedger) buffer.getReferenceManager(); + + // make sure we are using netty allocation manager + AllocationManager allocMgr = bufferLedger.getAllocationManager(); + assertTrue(allocMgr instanceof NettyAllocationManager); + NettyAllocationManager nettyMgr = (NettyAllocationManager) allocMgr; + + // for the large buffer allocation strategy, the chunk is null + assertNull(nettyMgr.getMemoryChunk()); + + readWriteArrowBuf(buffer); + } + } +} diff --git a/src/arrow/java/memory/memory-netty/src/test/resources/logback.xml b/src/arrow/java/memory/memory-netty/src/test/resources/logback.xml new file mode 100644 index 000000000..4c54d18a2 --- /dev/null +++ b/src/arrow/java/memory/memory-netty/src/test/resources/logback.xml @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<configuration> + <statusListener class="ch.qos.logback.core.status.NopStatusListener"/> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <!-- encoders are assigned the type + ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> + </encoder> + </appender> + + <logger name="org.apache.arrow" additivity="false"> + <level value="info" /> + <appender-ref ref="STDOUT" /> + </logger> + +</configuration> |