diff options
Diffstat (limited to 'src/arrow/java/compression')
5 files changed, 471 insertions, 0 deletions
diff --git a/src/arrow/java/compression/pom.xml b/src/arrow/java/compression/pom.xml new file mode 100644 index 000000000..c2e460380 --- /dev/null +++ b/src/arrow/java/compression/pom.xml @@ -0,0 +1,52 @@ +<?xml version="1.0"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-java-root</artifactId> + <version>6.0.1</version> + </parent> + <artifactId>arrow-compression</artifactId> + <name>Arrow Compression</name> + <description>(Experimental/Contrib) A library for working with the compression/decompression of Arrow data.</description> + + <dependencies> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${project.version}</version> + <classifier>${arrow.vector.classifier}</classifier> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-unsafe</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.20</version> + </dependency> + <dependency> + <groupId>com.github.luben</groupId> + <artifactId>zstd-jni</artifactId> + <version>1.4.9-1</version> +</dependency> + </dependencies> +</project> diff --git a/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java new file mode 100644 index 000000000..867e9f418 --- /dev/null +++ b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; + +/** + * Default implementation of factory supported LZ4 and ZSTD compression. + * + * // TODO(ARROW-12115): Rename this class. + */ +public class CommonsCompressionFactory implements CompressionCodec.Factory { + + public static final CommonsCompressionFactory INSTANCE = new CommonsCompressionFactory(); + + @Override + public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { + switch (codecType) { + case LZ4_FRAME: + return new Lz4CompressionCodec(); + case ZSTD: + return new ZstdCompressionCodec(); + default: + throw new IllegalArgumentException("Compression type not supported: " + codecType); + } + } +} diff --git a/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java new file mode 100644 index 000000000..daa35b7e1 --- /dev/null +++ b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.AbstractCompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec extends AbstractCompressionCodec { + + @Override + protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The uncompressed buffer size exceeds the integer limit %s.", Integer.MAX_VALUE); + + byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; + uncompressedBuffer.getBytes(/*index=*/0, inBytes); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); + } catch (IOException e) { + throw new RuntimeException(e); + } + + byte[] outBytes = baos.toByteArray(); + + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + compressedBuffer.setBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + return compressedBuffer; + } + + @Override + protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The compressed buffer size exceeds the integer limit %s", Integer.MAX_VALUE); + + long decompressedLength = readUncompressedLength(compressedBuffer); + + byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)]; + compressedBuffer.getBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes); + ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength); + try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) { + IOUtils.copy(in, out); + } catch (IOException e) { + throw new RuntimeException(e); + } + + byte[] outBytes = out.toByteArray(); + ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length); + decompressedBuffer.setBytes(/*index=*/0, outBytes); + return decompressedBuffer; + } + + @Override + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.LZ4_FRAME; + } +} diff --git a/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java new file mode 100644 index 000000000..38717843e --- /dev/null +++ b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.compression.AbstractCompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; + +import com.github.luben.zstd.Zstd; + +/** + * Compression codec for the ZSTD algorithm. + */ +public class ZstdCompressionCodec extends AbstractCompressionCodec { + + @Override + protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + long maxSize = Zstd.compressBound(uncompressedBuffer.writerIndex()); + long dstSize = CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + maxSize; + ArrowBuf compressedBuffer = allocator.buffer(dstSize); + long bytesWritten = Zstd.compressUnsafe( + compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, dstSize, + /*src*/uncompressedBuffer.memoryAddress(), /*srcSize=*/uncompressedBuffer.writerIndex(), + /*level=*/3); + if (Zstd.isError(bytesWritten)) { + compressedBuffer.close(); + throw new RuntimeException("Error compressing: " + Zstd.getErrorName(bytesWritten)); + } + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + bytesWritten); + return compressedBuffer; + } + + @Override + protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + long decompressedLength = readUncompressedLength(compressedBuffer); + ArrowBuf uncompressedBuffer = allocator.buffer(decompressedLength); + long decompressedSize = Zstd.decompressUnsafe(uncompressedBuffer.memoryAddress(), decompressedLength, + /*src=*/compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, + compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + if (Zstd.isError(decompressedSize)) { + uncompressedBuffer.close(); + throw new RuntimeException("Error decompressing: " + Zstd.getErrorName(decompressedLength)); + } + if (decompressedLength != decompressedSize) { + uncompressedBuffer.close(); + throw new RuntimeException("Expected != actual decompressed length: " + + decompressedLength + " != " + decompressedSize); + } + uncompressedBuffer.writerIndex(decompressedLength); + return uncompressedBuffer; + } + + @Override + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.ZSTD; + } +} diff --git a/src/arrow/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java b/src/arrow/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java new file mode 100644 index 000000000..1f6d64d47 --- /dev/null +++ b/src/arrow/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.NoCompressionCodec; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Test cases for {@link CompressionCodec}s. + */ +@RunWith(Parameterized.class) +public class TestCompressionCodec { + + private final CompressionCodec codec; + + private BufferAllocator allocator; + + private final int vectorLength; + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void terminate() { + allocator.close(); + } + + public TestCompressionCodec(CompressionUtil.CodecType type, int vectorLength, CompressionCodec codec) { + this.codec = codec; + this.vectorLength = vectorLength; + } + + @Parameterized.Parameters(name = "codec = {0}, length = {1}") + public static Collection<Object[]> getCodecs() { + List<Object[]> params = new ArrayList<>(); + + int[] lengths = new int[] {10, 100, 1000}; + for (int len : lengths) { + CompressionCodec dumbCodec = NoCompressionCodec.INSTANCE; + params.add(new Object[]{dumbCodec.getCodecType(), len, dumbCodec}); + + CompressionCodec lz4Codec = new Lz4CompressionCodec(); + params.add(new Object[]{lz4Codec.getCodecType(), len, lz4Codec}); + + CompressionCodec zstdCodec = new ZstdCompressionCodec(); + params.add(new Object[]{zstdCodec.getCodecType(), len, zstdCodec}); + + } + return params; + } + + private List<ArrowBuf> compressBuffers(List<ArrowBuf> inputBuffers) { + List<ArrowBuf> outputBuffers = new ArrayList<>(inputBuffers.size()); + for (ArrowBuf buf : inputBuffers) { + outputBuffers.add(codec.compress(allocator, buf)); + } + return outputBuffers; + } + + private List<ArrowBuf> deCompressBuffers(List<ArrowBuf> inputBuffers) { + List<ArrowBuf> outputBuffers = new ArrayList<>(inputBuffers.size()); + for (ArrowBuf buf : inputBuffers) { + outputBuffers.add(codec.decompress(allocator, buf)); + } + return outputBuffers; + } + + @Test + public void testCompressFixedWidthBuffers() throws Exception { + // prepare vector to compress + IntVector origVec = new IntVector("vec", allocator); + origVec.allocateNew(vectorLength); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + origVec.setNull(i); + } else { + origVec.set(i, i); + } + } + origVec.setValueCount(vectorLength); + int nullCount = origVec.getNullCount(); + + // compress & decompress + List<ArrowBuf> origBuffers = origVec.getFieldBuffers(); + List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers); + List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers); + + assertEquals(2, decompressedBuffers.size()); + + // orchestrate new vector + IntVector newVec = new IntVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, nullCount), decompressedBuffers); + + // verify new vector + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + assertTrue(newVec.isNull(i)); + } else { + assertEquals(i, newVec.get(i)); + } + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } + + @Test + public void testCompressVariableWidthBuffers() throws Exception { + // prepare vector to compress + VarCharVector origVec = new VarCharVector("vec", allocator); + origVec.allocateNew(); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + origVec.setNull(i); + } else { + origVec.setSafe(i, String.valueOf(i).getBytes()); + } + } + origVec.setValueCount(vectorLength); + int nullCount = origVec.getNullCount(); + + // compress & decompress + List<ArrowBuf> origBuffers = origVec.getFieldBuffers(); + List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers); + List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers); + + assertEquals(3, decompressedBuffers.size()); + + // orchestrate new vector + VarCharVector newVec = new VarCharVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, nullCount), decompressedBuffers); + + // verify new vector + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + assertTrue(newVec.isNull(i)); + } else { + assertArrayEquals(String.valueOf(i).getBytes(), newVec.get(i)); + } + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } + + @Test + public void testEmptyBuffer() throws Exception { + final VarBinaryVector origVec = new VarBinaryVector("vec", allocator); + + origVec.allocateNew(vectorLength); + + // Do not set any values (all missing) + origVec.setValueCount(vectorLength); + + final List<ArrowBuf> origBuffers = origVec.getFieldBuffers(); + final List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers); + final List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers); + + // orchestrate new vector + VarBinaryVector newVec = new VarBinaryVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, vectorLength), decompressedBuffers); + + // verify new vector + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { + assertTrue(newVec.isNull(i)); + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } +} |