From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/arrow/java/adapter/orc/CMakeLists.txt | 43 ++++++++ src/arrow/java/adapter/orc/pom.xml | 113 +++++++++++++++++++ .../org/apache/arrow/adapter/orc/OrcFieldNode.java | 45 ++++++++ .../org/apache/arrow/adapter/orc/OrcJniUtils.java | 62 +++++++++++ .../arrow/adapter/orc/OrcMemoryJniWrapper.java | 77 +++++++++++++ .../org/apache/arrow/adapter/orc/OrcReader.java | 90 +++++++++++++++ .../arrow/adapter/orc/OrcReaderJniWrapper.java | 79 ++++++++++++++ .../apache/arrow/adapter/orc/OrcRecordBatch.java | 47 ++++++++ .../arrow/adapter/orc/OrcReferenceManager.java | 121 +++++++++++++++++++++ .../apache/arrow/adapter/orc/OrcStripeReader.java | 109 +++++++++++++++++++ .../adapter/orc/OrcStripeReaderJniWrapper.java | 45 ++++++++ .../apache/arrow/adapter/orc/OrcReaderTest.java | 104 ++++++++++++++++++ 12 files changed, 935 insertions(+) create mode 100644 src/arrow/java/adapter/orc/CMakeLists.txt create mode 100644 src/arrow/java/adapter/orc/pom.xml create mode 100644 src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java create mode 100644 src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java create mode 100644 src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java create mode 100644 src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReader.java create mode 100644 src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java create mode 100644 src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java create mode 100644 src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java create mode 100644 src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java create mode 100644 src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java create mode 100644 src/arrow/java/adapter/orc/src/test/java/org/apache/arrow/adapter/orc/OrcReaderTest.java (limited to 'src/arrow/java/adapter/orc') diff --git a/src/arrow/java/adapter/orc/CMakeLists.txt b/src/arrow/java/adapter/orc/CMakeLists.txt new file mode 100644 index 000000000..e2d4655d7 --- /dev/null +++ b/src/arrow/java/adapter/orc/CMakeLists.txt @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_orc_java +# + +# Headers: top level + +project(arrow_orc_java) + +# Find java/jni +include(FindJava) +include(UseJava) +include(FindJNI) + +message("generating headers to ${JNI_HEADERS_DIR}") + +add_jar(arrow_orc_java + src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java + src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java + src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java + src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java + src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java + src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java + GENERATE_NATIVE_HEADERS + arrow_orc_java-native + DESTINATION + ${JNI_HEADERS_DIR}) diff --git a/src/arrow/java/adapter/orc/pom.xml b/src/arrow/java/adapter/orc/pom.xml new file mode 100644 index 000000000..26f5f0c28 --- /dev/null +++ b/src/arrow/java/adapter/orc/pom.xml @@ -0,0 +1,113 @@ + + + + + 4.0.0 + + + org.apache.arrow + arrow-memory-core + ${project.version} + compile + + + org.apache.arrow + arrow-memory-netty + ${project.version} + runtime + + + org.apache.arrow + arrow-vector + ${project.version} + compile + ${arrow.vector.classifier} + + + org.apache.orc + orc-core + 1.7.0 + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + + + org.apache.hadoop + hadoop-common + 2.2.0 + test + + + commons-logging + commons-logging + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + javax.servlet + servlet-api + + + + + org.apache.hive + hive-storage-api + 2.8.1 + test + + + + + org.apache.arrow + arrow-java-root + 6.0.1 + ../../pom.xml + + + org.apache.arrow.orc + arrow-orc + Arrow Orc Adapter + (Experimental/Contrib)A JNI wrapper for the C++ ORC reader implementation. + jar + + ../../../cpp/release-build/ + + + + + + ${arrow.cpp.build.dir} + + **/libarrow_orc_jni.* + + + + + diff --git a/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java new file mode 100644 index 000000000..716a13876 --- /dev/null +++ b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +/** + * Metadata about Vectors/Arrays that is passed via JNI interface. + */ +class OrcFieldNode { + + private final int length; + private final int nullCount; + + /** + * Construct a new instance. + * @param length the number of values written. + * @param nullCount the number of null values. + */ + public OrcFieldNode(int length, int nullCount) { + this.length = length; + this.nullCount = nullCount; + } + + int getLength() { + return length; + } + + int getNullCount() { + return nullCount; + } +} diff --git a/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java new file mode 100644 index 000000000..600569be7 --- /dev/null +++ b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; + +/** + * Helper class for JNI related operations. + */ +class OrcJniUtils { + private static final String LIBRARY_NAME = "arrow_orc_jni"; + private static boolean isLoaded = false; + + private OrcJniUtils() {} + + static void loadOrcAdapterLibraryFromJar() + throws IOException, IllegalAccessException { + synchronized (OrcJniUtils.class) { + if (!isLoaded) { + final String libraryToLoad = System.mapLibraryName(LIBRARY_NAME); + final File libraryFile = moveFileFromJarToTemp( + System.getProperty("java.io.tmpdir"), libraryToLoad); + System.load(libraryFile.getAbsolutePath()); + isLoaded = true; + } + } + } + + private static File moveFileFromJarToTemp(final String tmpDir, String libraryToLoad) + throws IOException { + final File temp = File.createTempFile(tmpDir, libraryToLoad); + try (final InputStream is = OrcReaderJniWrapper.class.getClassLoader() + .getResourceAsStream(libraryToLoad)) { + if (is == null) { + throw new FileNotFoundException(libraryToLoad); + } else { + Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } + return temp; + } +} diff --git a/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java new file mode 100644 index 000000000..473e83142 --- /dev/null +++ b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +/** + * Wrapper for orc memory allocated by native code. + */ +class OrcMemoryJniWrapper implements AutoCloseable { + + private final long nativeInstanceId; + + private final long memoryAddress; + + private final long size; + + private final long capacity; + + /** + * Construct a new instance. + * @param nativeInstanceId unique id of the underlying memory. + * @param memoryAddress starting memory address of the underlying memory. + * @param size size of the valid data. + * @param capacity allocated memory size. + */ + OrcMemoryJniWrapper(long nativeInstanceId, long memoryAddress, long size, long capacity) { + this.nativeInstanceId = nativeInstanceId; + this.memoryAddress = memoryAddress; + this.size = size; + this.capacity = capacity; + } + + /** + * Return the size of underlying chunk of memory that has valid data. + * @return valid data size + */ + long getSize() { + return size; + } + + /** + * Return the size of underlying chunk of memory managed by this OrcMemoryJniWrapper. + * @return underlying memory size + */ + long getCapacity() { + return capacity; + } + + /** + * Return the memory address of underlying chunk of memory. + * @return memory address + */ + long getMemoryAddress() { + return memoryAddress; + } + + @Override + public void close() { + release(nativeInstanceId); + } + + private native void release(long id); +} diff --git a/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReader.java b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReader.java new file mode 100644 index 000000000..b42ddb484 --- /dev/null +++ b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReader.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.io.IOException; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Orc Reader that allow accessing orc stripes in Orc file. + * This orc reader basically acts like an ArrowReader iterator that + * iterate over orc stripes. Each stripe will be accessed via an + * ArrowReader. + */ +public class OrcReader implements AutoCloseable { + private final OrcReaderJniWrapper jniWrapper; + private BufferAllocator allocator; + + /** + * reference to native reader instance. + */ + private final long nativeInstanceId; + + /** + * Create an OrcReader that iterate over orc stripes. + * @param filePath file path to target file, currently only support local file. + * @param allocator allocator provided to ArrowReader. + * @throws IOException throws exception in case of file not found + */ + public OrcReader(String filePath, BufferAllocator allocator) throws IOException, IllegalAccessException { + this.allocator = allocator; + this.jniWrapper = OrcReaderJniWrapper.getInstance(); + this.nativeInstanceId = jniWrapper.open(filePath); + } + + /** + * Seek to designated row. Invoke NextStripeReader() after seek + * will return stripe reader starting from designated row. + * @param rowNumber the rows number to seek + * @return true if seek operation is succeeded + */ + public boolean seek(int rowNumber) throws IllegalArgumentException { + return jniWrapper.seek(nativeInstanceId, rowNumber); + } + + /** + * Get a stripe level ArrowReader with specified batchSize in each record batch. + * + * @param batchSize the number of rows loaded on each iteration + * @return ArrowReader that iterate over current stripes + */ + public ArrowReader nextStripeReader(long batchSize) throws IllegalArgumentException { + long stripeReaderId = jniWrapper.nextStripeReader(nativeInstanceId, batchSize); + if (stripeReaderId < 0) { + return null; + } + + return new OrcStripeReader(stripeReaderId, allocator); + } + + /** + * The number of stripes in the file. + * + * @return number of stripes + */ + public int getNumberOfStripes() throws IllegalArgumentException { + return jniWrapper.getNumberOfStripes(nativeInstanceId); + } + + @Override + public void close() throws Exception { + jniWrapper.close(nativeInstanceId); + } +} diff --git a/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java new file mode 100644 index 000000000..ff449c343 --- /dev/null +++ b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.io.IOException; + +/** + * JNI wrapper for Orc reader. + */ +class OrcReaderJniWrapper { + + private static volatile OrcReaderJniWrapper INSTANCE; + + static OrcReaderJniWrapper getInstance() throws IOException, IllegalAccessException { + if (INSTANCE == null) { + synchronized (OrcReaderJniWrapper.class) { + if (INSTANCE == null) { + OrcJniUtils.loadOrcAdapterLibraryFromJar(); + INSTANCE = new OrcReaderJniWrapper(); + } + } + } + + return INSTANCE; + } + + /** + * Construct a orc file reader over the target file. + * @param fileName absolute file path of target file + * @return id of the orc reader instance if file opened successfully, + * otherwise return error code * -1. + */ + native long open(String fileName); + + /** + * Release resources associated with designated reader instance. + * @param readerId id of the reader instance. + */ + native void close(long readerId); + + /** + * Seek to designated row. Invoke nextStripeReader() after seek + * will return id of stripe reader starting from designated row. + * @param readerId id of the reader instance + * @param rowNumber the rows number to seek + * @return true if seek operation is succeeded + */ + native boolean seek(long readerId, int rowNumber); + + /** + * The number of stripes in the file. + * @param readerId id of the reader instance + * @return number of stripes + */ + native int getNumberOfStripes(long readerId); + + /** + * Get a stripe level ArrowReader with specified batchSize in each record batch. + * @param readerId id of the reader instance + * @param batchSize the number of rows loaded on each iteration + * @return id of the stripe reader instance. + */ + native long nextStripeReader(long readerId, long batchSize); +} diff --git a/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java new file mode 100644 index 000000000..a006cacab --- /dev/null +++ b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.util.Arrays; +import java.util.List; + +/** + * Wrapper for record batch meta and native memory. + */ +class OrcRecordBatch { + final int length; + + /** + * Nodes correspond to the pre-ordered flattened logical schema. + */ + final List nodes; + + final List buffers; + + /** + * Construct a new instance. + * @param length number of records included in current batch + * @param nodes meta data for each fields + * @param buffers buffers for underlying data + */ + OrcRecordBatch(int length, OrcFieldNode[] nodes, OrcMemoryJniWrapper[] buffers) { + this.length = length; + this.nodes = Arrays.asList(nodes); + this.buffers = Arrays.asList(buffers); + } +} diff --git a/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java new file mode 100644 index 000000000..fdec337e8 --- /dev/null +++ b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.OwnershipTransferResult; +import org.apache.arrow.memory.ReferenceManager; +import org.apache.arrow.util.Preconditions; + +/** + * A simple reference manager implementation for memory allocated by native code. + * The underlying memory will be released when reference count reach zero. + */ +public class OrcReferenceManager implements ReferenceManager { + private final AtomicInteger bufRefCnt = new AtomicInteger(0); + + private OrcMemoryJniWrapper memory; + + OrcReferenceManager(OrcMemoryJniWrapper memory) { + this.memory = memory; + } + + @Override + public int getRefCount() { + return bufRefCnt.get(); + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + Preconditions.checkState(decrement >= 1, + "ref count decrement should be greater than or equal to 1"); + // decrement the ref count + final int refCnt; + synchronized (this) { + refCnt = bufRefCnt.addAndGet(-decrement); + if (refCnt == 0) { + // refcount of this reference manager has dropped to 0 + // release the underlying memory + memory.close(); + } + } + // the new ref count should be >= 0 + Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative"); + return refCnt == 0; + } + + @Override + public void retain() { + retain(1); + } + + @Override + public void retain(int increment) { + Preconditions.checkArgument(increment > 0, "retain(%s) argument is not positive", increment); + bufRefCnt.addAndGet(increment); + } + + @Override + public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) { + retain(); + return srcBuffer; + } + + @Override + public ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, long index, long length) { + final long derivedBufferAddress = sourceBuffer.memoryAddress() + index; + + // create new ArrowBuf + final ArrowBuf derivedBuf = new ArrowBuf( + this, + null, + length, // length (in bytes) in the underlying memory chunk for this new ArrowBuf + derivedBufferAddress // starting byte address in the underlying memory for this new ArrowBuf, + ); + + return derivedBuf; + } + + @Override + public OwnershipTransferResult transferOwnership(ArrowBuf sourceBuffer, BufferAllocator targetAllocator) { + throw new UnsupportedOperationException(); + } + + @Override + public BufferAllocator getAllocator() { + return null; + } + + @Override + public long getSize() { + return memory.getSize(); + } + + @Override + public long getAccountedSize() { + return 0; + } +} diff --git a/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java new file mode 100644 index 000000000..484296d92 --- /dev/null +++ b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.stream.Collectors; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageChannelReader; +import org.apache.arrow.vector.ipc.message.MessageResult; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + +/** + * Orc stripe that load data into ArrowRecordBatch. + */ +public class OrcStripeReader extends ArrowReader { + /** + * reference to native stripe reader instance. + */ + private final long nativeInstanceId; + + /** + * Construct a new instance. + * @param nativeInstanceId nativeInstanceId of the stripe reader instance, obtained by + * calling nextStripeReader from OrcReaderJniWrapper + * @param allocator memory allocator for accounting. + */ + OrcStripeReader(long nativeInstanceId, BufferAllocator allocator) { + super(allocator); + this.nativeInstanceId = nativeInstanceId; + } + + @Override + public boolean loadNextBatch() throws IOException { + OrcRecordBatch recordBatch = OrcStripeReaderJniWrapper.next(nativeInstanceId); + if (recordBatch == null) { + return false; + } + + ArrayList buffers = new ArrayList<>(); + for (OrcMemoryJniWrapper buffer : recordBatch.buffers) { + buffers.add(new ArrowBuf( + new OrcReferenceManager(buffer), + null, + (int) buffer.getSize(), + buffer.getMemoryAddress())); + } + + loadRecordBatch(new ArrowRecordBatch( + recordBatch.length, + recordBatch.nodes.stream() + .map(buf -> new ArrowFieldNode(buf.getLength(), buf.getNullCount())) + .collect(Collectors.toList()), + buffers)); + return true; + } + + @Override + public long bytesRead() { + return 0; + } + + + @Override + protected void closeReadSource() throws IOException { + OrcStripeReaderJniWrapper.close(nativeInstanceId); + } + + @Override + protected Schema readSchema() throws IOException { + byte[] schemaBytes = OrcStripeReaderJniWrapper.getSchema(nativeInstanceId); + + try (MessageChannelReader schemaReader = + new MessageChannelReader( + new ReadChannel( + new ByteArrayReadableSeekableByteChannel(schemaBytes)), allocator)) { + + MessageResult result = schemaReader.readNext(); + if (result == null) { + throw new IOException("Unexpected end of input. Missing schema."); + } + + return MessageSerializer.deserializeSchema(result.getMessage()); + } + } +} diff --git a/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java new file mode 100644 index 000000000..1dd969861 --- /dev/null +++ b/src/arrow/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +/** + * JNI wrapper for orc stripe reader. + */ +class OrcStripeReaderJniWrapper { + + /** + * Get the schema of current stripe. + * @param readerId id of the stripe reader instance. + * @return serialized schema. + */ + static native byte[] getSchema(long readerId); + + /** + * Load next record batch. + * @param readerId id of the stripe reader instance. + * @return loaded record batch, return null when reached + * the end of current stripe. + */ + static native OrcRecordBatch next(long readerId); + + /** + * Release resources of underlying reader. + * @param readerId id of the stripe reader instance. + */ + static native void close(long readerId); +} diff --git a/src/arrow/java/adapter/orc/src/test/java/org/apache/arrow/adapter/orc/OrcReaderTest.java b/src/arrow/java/adapter/orc/src/test/java/org/apache/arrow/adapter/orc/OrcReaderTest.java new file mode 100644 index 000000000..4153a35a6 --- /dev/null +++ b/src/arrow/java/adapter/orc/src/test/java/org/apache/arrow/adapter/orc/OrcReaderTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.List; + + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + + +public class OrcReaderTest { + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private static final int MAX_ALLOCATION = 8 * 1024; + private static RootAllocator allocator; + + @BeforeClass + public static void beforeClass() { + allocator = new RootAllocator(MAX_ALLOCATION); + } + + @Test + public void testOrcJniReader() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + File testFile = new File(testFolder.getRoot(), "test-orc"); + + Writer writer = OrcFile.createWriter(new Path(testFile.getAbsolutePath()), + OrcFile.writerOptions(new Configuration()).setSchema(schema)); + VectorizedRowBatch batch = schema.createRowBatch(); + LongColumnVector longColumnVector = (LongColumnVector) batch.cols[0]; + BytesColumnVector bytesColumnVector = (BytesColumnVector) batch.cols[1]; + for (int r = 0; r < 1024; ++r) { + int row = batch.size++; + longColumnVector.vector[row] = r; + byte[] buffer = ("Last-" + (r * 3)).getBytes(StandardCharsets.UTF_8); + bytesColumnVector.setRef(row, buffer, 0, buffer.length); + } + writer.addRowBatch(batch); + writer.close(); + + OrcReader reader = new OrcReader(testFile.getAbsolutePath(), allocator); + assertEquals(1, reader.getNumberOfStripes()); + + ArrowReader stripeReader = reader.nextStripeReader(1024); + VectorSchemaRoot schemaRoot = stripeReader.getVectorSchemaRoot(); + stripeReader.loadNextBatch(); + + List fields = schemaRoot.getFieldVectors(); + assertEquals(2, fields.size()); + + IntVector intVector = (IntVector) fields.get(0); + VarCharVector varCharVector = (VarCharVector) fields.get(1); + for (int i = 0; i < 1024; ++i) { + assertEquals(i, intVector.get(i)); + assertEquals("Last-" + (i * 3), new String(varCharVector.get(i), StandardCharsets.UTF_8)); + } + + assertFalse(stripeReader.loadNextBatch()); + assertNull(reader.nextStripeReader(1024)); + + stripeReader.close(); + reader.close(); + } +} -- cgit v1.2.3