summaryrefslogtreecommitdiffstats
path: root/src/arrow/java/compression
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/java/compression
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/java/compression')
-rw-r--r--src/arrow/java/compression/pom.xml52
-rw-r--r--src/arrow/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java43
-rw-r--r--src/arrow/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java89
-rw-r--r--src/arrow/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java74
-rw-r--r--src/arrow/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java213
5 files changed, 471 insertions, 0 deletions
diff --git a/src/arrow/java/compression/pom.xml b/src/arrow/java/compression/pom.xml
new file mode 100644
index 000000000..c2e460380
--- /dev/null
+++ b/src/arrow/java/compression/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-java-root</artifactId>
+ <version>6.0.1</version>
+ </parent>
+ <artifactId>arrow-compression</artifactId>
+ <name>Arrow Compression</name>
+ <description>(Experimental/Contrib) A library for working with the compression/decompression of Arrow data.</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${project.version}</version>
+ <classifier>${arrow.vector.classifier}</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-unsafe</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.20</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ <version>1.4.9-1</version>
+</dependency>
+ </dependencies>
+</project>
diff --git a/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java
new file mode 100644
index 000000000..867e9f418
--- /dev/null
+++ b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+
+/**
+ * Default implementation of factory supported LZ4 and ZSTD compression.
+ *
+ * // TODO(ARROW-12115): Rename this class.
+ */
+public class CommonsCompressionFactory implements CompressionCodec.Factory {
+
+ public static final CommonsCompressionFactory INSTANCE = new CommonsCompressionFactory();
+
+ @Override
+ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) {
+ switch (codecType) {
+ case LZ4_FRAME:
+ return new Lz4CompressionCodec();
+ case ZSTD:
+ return new ZstdCompressionCodec();
+ default:
+ throw new IllegalArgumentException("Compression type not supported: " + codecType);
+ }
+ }
+}
diff --git a/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
new file mode 100644
index 000000000..daa35b7e1
--- /dev/null
+++ b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.AbstractCompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec extends AbstractCompressionCodec {
+
+ @Override
+ protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+ Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+ "The uncompressed buffer size exceeds the integer limit %s.", Integer.MAX_VALUE);
+
+ byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+ uncompressedBuffer.getBytes(/*index=*/0, inBytes);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (InputStream in = new ByteArrayInputStream(inBytes);
+ OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+ IOUtils.copy(in, out);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ byte[] outBytes = baos.toByteArray();
+
+ ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+ compressedBuffer.setBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes);
+ compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+ return compressedBuffer;
+ }
+
+ @Override
+ protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+ Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+ "The compressed buffer size exceeds the integer limit %s", Integer.MAX_VALUE);
+
+ long decompressedLength = readUncompressedLength(compressedBuffer);
+
+ byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)];
+ compressedBuffer.getBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes);
+ ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength);
+ try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) {
+ IOUtils.copy(in, out);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ byte[] outBytes = out.toByteArray();
+ ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length);
+ decompressedBuffer.setBytes(/*index=*/0, outBytes);
+ return decompressedBuffer;
+ }
+
+ @Override
+ public CompressionUtil.CodecType getCodecType() {
+ return CompressionUtil.CodecType.LZ4_FRAME;
+ }
+}
diff --git a/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java
new file mode 100644
index 000000000..38717843e
--- /dev/null
+++ b/src/arrow/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.compression.AbstractCompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+
+import com.github.luben.zstd.Zstd;
+
+/**
+ * Compression codec for the ZSTD algorithm.
+ */
+public class ZstdCompressionCodec extends AbstractCompressionCodec {
+
+ @Override
+ protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+ long maxSize = Zstd.compressBound(uncompressedBuffer.writerIndex());
+ long dstSize = CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + maxSize;
+ ArrowBuf compressedBuffer = allocator.buffer(dstSize);
+ long bytesWritten = Zstd.compressUnsafe(
+ compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, dstSize,
+ /*src*/uncompressedBuffer.memoryAddress(), /*srcSize=*/uncompressedBuffer.writerIndex(),
+ /*level=*/3);
+ if (Zstd.isError(bytesWritten)) {
+ compressedBuffer.close();
+ throw new RuntimeException("Error compressing: " + Zstd.getErrorName(bytesWritten));
+ }
+ compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + bytesWritten);
+ return compressedBuffer;
+ }
+
+ @Override
+ protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+ long decompressedLength = readUncompressedLength(compressedBuffer);
+ ArrowBuf uncompressedBuffer = allocator.buffer(decompressedLength);
+ long decompressedSize = Zstd.decompressUnsafe(uncompressedBuffer.memoryAddress(), decompressedLength,
+ /*src=*/compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+ compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+ if (Zstd.isError(decompressedSize)) {
+ uncompressedBuffer.close();
+ throw new RuntimeException("Error decompressing: " + Zstd.getErrorName(decompressedLength));
+ }
+ if (decompressedLength != decompressedSize) {
+ uncompressedBuffer.close();
+ throw new RuntimeException("Expected != actual decompressed length: " +
+ decompressedLength + " != " + decompressedSize);
+ }
+ uncompressedBuffer.writerIndex(decompressedLength);
+ return uncompressedBuffer;
+ }
+
+ @Override
+ public CompressionUtil.CodecType getCodecType() {
+ return CompressionUtil.CodecType.ZSTD;
+ }
+}
diff --git a/src/arrow/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java b/src/arrow/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java
new file mode 100644
index 000000000..1f6d64d47
--- /dev/null
+++ b/src/arrow/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test cases for {@link CompressionCodec}s.
+ */
+@RunWith(Parameterized.class)
+public class TestCompressionCodec {
+
+ private final CompressionCodec codec;
+
+ private BufferAllocator allocator;
+
+ private final int vectorLength;
+
+ @Before
+ public void init() {
+ allocator = new RootAllocator(Integer.MAX_VALUE);
+ }
+
+ @After
+ public void terminate() {
+ allocator.close();
+ }
+
+ public TestCompressionCodec(CompressionUtil.CodecType type, int vectorLength, CompressionCodec codec) {
+ this.codec = codec;
+ this.vectorLength = vectorLength;
+ }
+
+ @Parameterized.Parameters(name = "codec = {0}, length = {1}")
+ public static Collection<Object[]> getCodecs() {
+ List<Object[]> params = new ArrayList<>();
+
+ int[] lengths = new int[] {10, 100, 1000};
+ for (int len : lengths) {
+ CompressionCodec dumbCodec = NoCompressionCodec.INSTANCE;
+ params.add(new Object[]{dumbCodec.getCodecType(), len, dumbCodec});
+
+ CompressionCodec lz4Codec = new Lz4CompressionCodec();
+ params.add(new Object[]{lz4Codec.getCodecType(), len, lz4Codec});
+
+ CompressionCodec zstdCodec = new ZstdCompressionCodec();
+ params.add(new Object[]{zstdCodec.getCodecType(), len, zstdCodec});
+
+ }
+ return params;
+ }
+
+ private List<ArrowBuf> compressBuffers(List<ArrowBuf> inputBuffers) {
+ List<ArrowBuf> outputBuffers = new ArrayList<>(inputBuffers.size());
+ for (ArrowBuf buf : inputBuffers) {
+ outputBuffers.add(codec.compress(allocator, buf));
+ }
+ return outputBuffers;
+ }
+
+ private List<ArrowBuf> deCompressBuffers(List<ArrowBuf> inputBuffers) {
+ List<ArrowBuf> outputBuffers = new ArrayList<>(inputBuffers.size());
+ for (ArrowBuf buf : inputBuffers) {
+ outputBuffers.add(codec.decompress(allocator, buf));
+ }
+ return outputBuffers;
+ }
+
+ @Test
+ public void testCompressFixedWidthBuffers() throws Exception {
+ // prepare vector to compress
+ IntVector origVec = new IntVector("vec", allocator);
+ origVec.allocateNew(vectorLength);
+ for (int i = 0; i < vectorLength; i++) {
+ if (i % 10 == 0) {
+ origVec.setNull(i);
+ } else {
+ origVec.set(i, i);
+ }
+ }
+ origVec.setValueCount(vectorLength);
+ int nullCount = origVec.getNullCount();
+
+ // compress & decompress
+ List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
+ List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
+ List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
+
+ assertEquals(2, decompressedBuffers.size());
+
+ // orchestrate new vector
+ IntVector newVec = new IntVector("new vec", allocator);
+ newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, nullCount), decompressedBuffers);
+
+ // verify new vector
+ assertEquals(vectorLength, newVec.getValueCount());
+ for (int i = 0; i < vectorLength; i++) {
+ if (i % 10 == 0) {
+ assertTrue(newVec.isNull(i));
+ } else {
+ assertEquals(i, newVec.get(i));
+ }
+ }
+
+ newVec.close();
+ AutoCloseables.close(decompressedBuffers);
+ }
+
+ @Test
+ public void testCompressVariableWidthBuffers() throws Exception {
+ // prepare vector to compress
+ VarCharVector origVec = new VarCharVector("vec", allocator);
+ origVec.allocateNew();
+ for (int i = 0; i < vectorLength; i++) {
+ if (i % 10 == 0) {
+ origVec.setNull(i);
+ } else {
+ origVec.setSafe(i, String.valueOf(i).getBytes());
+ }
+ }
+ origVec.setValueCount(vectorLength);
+ int nullCount = origVec.getNullCount();
+
+ // compress & decompress
+ List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
+ List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
+ List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
+
+ assertEquals(3, decompressedBuffers.size());
+
+ // orchestrate new vector
+ VarCharVector newVec = new VarCharVector("new vec", allocator);
+ newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, nullCount), decompressedBuffers);
+
+ // verify new vector
+ assertEquals(vectorLength, newVec.getValueCount());
+ for (int i = 0; i < vectorLength; i++) {
+ if (i % 10 == 0) {
+ assertTrue(newVec.isNull(i));
+ } else {
+ assertArrayEquals(String.valueOf(i).getBytes(), newVec.get(i));
+ }
+ }
+
+ newVec.close();
+ AutoCloseables.close(decompressedBuffers);
+ }
+
+ @Test
+ public void testEmptyBuffer() throws Exception {
+ final VarBinaryVector origVec = new VarBinaryVector("vec", allocator);
+
+ origVec.allocateNew(vectorLength);
+
+ // Do not set any values (all missing)
+ origVec.setValueCount(vectorLength);
+
+ final List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
+ final List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
+ final List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
+
+ // orchestrate new vector
+ VarBinaryVector newVec = new VarBinaryVector("new vec", allocator);
+ newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, vectorLength), decompressedBuffers);
+
+ // verify new vector
+ assertEquals(vectorLength, newVec.getValueCount());
+ for (int i = 0; i < vectorLength; i++) {
+ assertTrue(newVec.isNull(i));
+ }
+
+ newVec.close();
+ AutoCloseables.close(decompressedBuffers);
+ }
+}