diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/java/dataset | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/java/dataset')
31 files changed, 2459 insertions, 0 deletions
diff --git a/src/arrow/java/dataset/CMakeLists.txt b/src/arrow/java/dataset/CMakeLists.txt new file mode 100644 index 000000000..07e2d0ae8 --- /dev/null +++ b/src/arrow/java/dataset/CMakeLists.txt @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_dataset_java +# + +# Headers: top level + +project(arrow_dataset_java) + +# Find java/jni +include(FindJava) +include(UseJava) +include(FindJNI) + +message("generating headers to ${JNI_HEADERS_DIR}") + +add_jar(arrow_dataset_java + src/main/java/org/apache/arrow/dataset/jni/JniLoader.java + src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java + src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java + src/main/java/org/apache/arrow/dataset/file/JniWrapper.java + src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java + src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java + GENERATE_NATIVE_HEADERS + arrow_dataset_java-native + DESTINATION + ${JNI_HEADERS_DIR}) diff --git a/src/arrow/java/dataset/pom.xml b/src/arrow/java/dataset/pom.xml new file mode 100644 index 000000000..0a393c50a --- /dev/null +++ b/src/arrow/java/dataset/pom.xml @@ -0,0 +1,134 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>arrow-java-root</artifactId> + <groupId>org.apache.arrow</groupId> + <version>6.0.1</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>arrow-dataset</artifactId> + <name>Arrow Java Dataset</name> + <description>Java implementation of Arrow Dataset API/Framework</description> + <packaging>jar</packaging> + <properties> + <arrow.cpp.build.dir>../../../cpp/release-build/</arrow.cpp.build.dir> + <protobuf.version>2.5.0</protobuf.version> + <parquet.version>1.11.0</parquet.version> + <avro.version>1.8.2</avro.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + <classifier>${arrow.vector.classifier}</classifier> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-core</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${parquet.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${dep.hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${dep.guava.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <resources> + <resource> + <directory>${arrow.cpp.build.dir}</directory> + <includes> + <include>**/libarrow_dataset_jni.*</include> + </includes> + </resource> + </resources> + + <plugins> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.5.1</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + </protocArtifact> + <protoSourceRoot>../../cpp/src/jni/dataset/proto</protoSourceRoot> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>test-compile</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java new file mode 100644 index 000000000..e341d46be --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +/** + * File format definitions. + */ +public enum FileFormat { + PARQUET(0), + NONE(-1); + + private int id; + + FileFormat(int id) { + this.id = id; + } + + public int id() { + return id; + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java new file mode 100644 index 000000000..1268d11fe --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import org.apache.arrow.dataset.jni.NativeDatasetFactory; +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.memory.BufferAllocator; + +/** + * Java binding of the C++ FileSystemDatasetFactory. + */ +public class FileSystemDatasetFactory extends NativeDatasetFactory { + + public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, + String uri) { + super(allocator, memoryPool, createNative(format, uri)); + } + + private static long createNative(FileFormat format, String uri) { + return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id()); + } + +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java new file mode 100644 index 000000000..1af307aac --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import org.apache.arrow.dataset.jni.JniLoader; + +/** + * JniWrapper for filesystem based {@link org.apache.arrow.dataset.source.Dataset} implementations. + */ +public class JniWrapper { + + private static final JniWrapper INSTANCE = new JniWrapper(); + + public static JniWrapper get() { + return INSTANCE; + } + + private JniWrapper() { + JniLoader.get().ensureLoaded(); + } + + /** + * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a + * intermediate shared_ptr of the factory instance. + * @param uri file uri to read + * @param fileFormat file format ID + * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. + * @see FileFormat + */ + public native long makeFileSystemDatasetFactory(String uri, int fileFormat); + +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java new file mode 100644 index 000000000..72a1cadcf --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.arrow.util.VisibleForTesting; + +/** + * Reserving Java direct memory bytes from java.nio.Bits. Used by Java Dataset API's C++ memory + * pool implementation. This makes memory allocated by the pool to be controlled by JVM option + * "-XX:MaxDirectMemorySize". + */ +public class DirectReservationListener implements ReservationListener { + private final Method methodReserve; + private final Method methodUnreserve; + + private DirectReservationListener() { + try { + final Class<?> classBits = Class.forName("java.nio.Bits"); + methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class); + methodReserve.setAccessible(true); + methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class); + methodUnreserve.setAccessible(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static final DirectReservationListener INSTANCE = new DirectReservationListener(); + + public static DirectReservationListener instance() { + return INSTANCE; + } + + /** + * Reserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#reserveMemory. + */ + @Override + public void reserve(long size) { + try { + if (size > Integer.MAX_VALUE) { + throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)"); + } + methodReserve.invoke(null, (int) size, (int) size); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Unreserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#unreserveMemory. + */ + @Override + public void unreserve(long size) { + try { + if (size > Integer.MAX_VALUE) { + throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)"); + } + methodUnreserve.invoke(null, (int) size, (int) size); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Get current reservation of jVM direct memory. Visible for testing. + */ + @VisibleForTesting + public long getCurrentDirectMemReservation() { + try { + final Class<?> classBits = Class.forName("java.nio.Bits"); + final Field f = classBits.getDeclaredField("reservedMemory"); + f.setAccessible(true); + return ((AtomicLong) f.get(null)).get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java new file mode 100644 index 000000000..15ce5448b --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * The JniLoader for Dataset API's native implementation. + */ +public final class JniLoader { + + private static final JniLoader INSTANCE = new JniLoader(Collections.singletonList("arrow_dataset_jni")); + + public static JniLoader get() { + return INSTANCE; + } + + private final Set<String> librariesToLoad; + + private JniLoader(List<String> libraryNames) { + librariesToLoad = new HashSet<>(libraryNames); + } + + private boolean finished() { + return librariesToLoad.isEmpty(); + } + + /** + * If required JNI libraries are not loaded, then load them. + */ + public void ensureLoaded() { + if (finished()) { + return; + } + loadRemaining(); + } + + private synchronized void loadRemaining() { + // The method is protected by a mutex via synchronized, if more than one thread race to call + // loadRemaining, at same time only one will do the actual loading and the others will wait for + // the mutex to be acquired then check on the remaining list: if there are libraries that were not + // successfully loaded then the mutex owner will try to load them again. + if (finished()) { + return; + } + List<String> libs = new ArrayList<>(librariesToLoad); + for (String lib : libs) { + load(lib); + librariesToLoad.remove(lib); + } + } + + private void load(String name) { + final String libraryToLoad = System.mapLibraryName(name); + try { + File temp = File.createTempFile("jnilib-", ".tmp", new File(System.getProperty("java.io.tmpdir"))); + try (final InputStream is + = JniWrapper.class.getClassLoader().getResourceAsStream(libraryToLoad)) { + if (is == null) { + throw new FileNotFoundException(libraryToLoad); + } + Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); + System.load(temp.getAbsolutePath()); + } + } catch (IOException e) { + throw new IllegalStateException("error loading native libraries: " + e); + } + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java new file mode 100644 index 000000000..7dd54e764 --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +/** + * JNI wrapper for Dataset API's native implementation. + */ +public class JniWrapper { + + private static final JniWrapper INSTANCE = new JniWrapper(); + + public static JniWrapper get() { + return INSTANCE; + } + + private JniWrapper() { + JniLoader.get().ensureLoaded(); + } + + /** + * Release the DatasetFactory by destroying its reference held by JNI wrapper. + * + * @param datasetFactoryId the native pointer of the arrow::dataset::DatasetFactory instance. + */ + public native void closeDatasetFactory(long datasetFactoryId); + + /** + * Get a serialized schema from native instance of a DatasetFactory. + * + * @param datasetFactoryId the native pointer of the arrow::dataset::DatasetFactory instance. + * @return the serialized schema + * @see org.apache.arrow.vector.types.pojo.Schema + */ + public native byte[] inspectSchema(long datasetFactoryId); + + /** + * Create Dataset from a DatasetFactory and get the native pointer of the Dataset. + * + * @param datasetFactoryId the native pointer of the arrow::dataset::DatasetFactory instance. + * @param schema the predefined schema of the resulting Dataset. + * @return the native pointer of the arrow::dataset::Dataset instance. + */ + public native long createDataset(long datasetFactoryId, byte[] schema); + + /** + * Release the Dataset by destroying its reference held by JNI wrapper. + * + * @param datasetId the native pointer of the arrow::dataset::Dataset instance. + */ + public native void closeDataset(long datasetId); + + /** + * Create Scanner from a Dataset and get the native pointer of the Dataset. + * @param datasetId the native pointer of the arrow::dataset::Dataset instance. + * @param columns desired column names. + * Columns not in this list will not be emitted when performing scan operation. Null equals + * to "all columns". + * @param batchSize batch size of scanned record batches. + * @param memoryPool identifier of memory pool used in the native scanner. + * @return the native pointer of the arrow::dataset::Scanner instance. + */ + public native long createScanner(long datasetId, String[] columns, long batchSize, long memoryPool); + + /** + * Get a serialized schema from native instance of a Scanner. + * + * @param scannerId the native pointer of the arrow::dataset::Scanner instance. + * @return the serialized schema + * @see org.apache.arrow.vector.types.pojo.Schema + */ + public native byte[] getSchemaFromScanner(long scannerId); + + /** + * Release the Scanner by destroying its reference held by JNI wrapper. + * @param scannerId the native pointer of the arrow::dataset::Scanner instance. + */ + public native void closeScanner(long scannerId); + + /** + * Read next record batch from the specified scanner. + * @param scannerId the native pointer of the arrow::dataset::Scanner instance. + * @return an instance of {@link NativeRecordBatchHandle} describing the overall layout of the native record batch. + */ + public native NativeRecordBatchHandle nextRecordBatch(long scannerId); + + /** + * Release the Buffer by destroying its reference held by JNI wrapper. + * @param bufferId the native pointer of the arrow::Buffer instance. + */ + public native void releaseBuffer(long bufferId); + +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java new file mode 100644 index 000000000..7f6dfbc02 --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import org.apache.arrow.memory.BufferAllocator; + +/** + * Context for relevant classes of NativeDataset. + */ +public class NativeContext { + private final BufferAllocator allocator; + private final NativeMemoryPool memoryPool; + + /** + * Constructor. + * + * @param allocator The allocator in use. + * @param memoryPool Native memory pool. + */ + public NativeContext(BufferAllocator allocator, NativeMemoryPool memoryPool) { + this.allocator = allocator; + this.memoryPool = memoryPool; + } + + /** + * Returns the allocator which is in use. + */ + public BufferAllocator getAllocator() { + return allocator; + } + + /** + * Returns the native memory pool. + */ + public NativeMemoryPool getMemoryPool() { + return memoryPool; + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java new file mode 100644 index 000000000..30ff1a930 --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.source.Dataset; + +/** + * Native implementation of {@link Dataset}. + */ +public class NativeDataset implements Dataset { + + private final NativeContext context; + private final long datasetId; + + private boolean closed = false; + + public NativeDataset(NativeContext context, long datasetId) { + this.context = context; + this.datasetId = datasetId; + } + + @Override + public synchronized NativeScanner newScan(ScanOptions options) { + if (closed) { + throw new NativeInstanceReleasedException(); + } + long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns().orElse(null), + options.getBatchSize(), context.getMemoryPool().getNativeInstanceId()); + return new NativeScanner(context, scannerId); + } + + @Override + public synchronized void close() { + if (closed) { + return; + } + closed = true; + JniWrapper.get().closeDataset(datasetId); + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java new file mode 100644 index 000000000..993d44fa2 --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.io.IOException; + +import org.apache.arrow.dataset.source.DatasetFactory; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.SchemaUtility; + +/** + * Native implementation of {@link DatasetFactory}. + */ +public class NativeDatasetFactory implements DatasetFactory { + private final long datasetFactoryId; + private final NativeMemoryPool memoryPool; + private final BufferAllocator allocator; + + private boolean closed = false; + + /** + * Constructor. + * + * @param allocator a context allocator associated with this factory. Any buffer that will be created natively will + * be then bound to this allocator. + * @param memoryPool the native memory pool associated with this factory. Any buffer created natively should request + * for memory spaces from this memory pool. This is a mapped instance of c++ arrow::MemoryPool. + * @param datasetFactoryId an ID, at the same time the native pointer of the underlying native instance of this + * factory. Make sure in c++ side the pointer is pointing to the shared pointer wrapping + * the actual instance so we could successfully decrease the reference count once + * {@link #close} is called. + * @see #close() + */ + public NativeDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, long datasetFactoryId) { + this.allocator = allocator; + this.memoryPool = memoryPool; + this.datasetFactoryId = datasetFactoryId; + } + + @Override + public Schema inspect() { + final byte[] buffer; + synchronized (this) { + if (closed) { + throw new NativeInstanceReleasedException(); + } + buffer = JniWrapper.get().inspectSchema(datasetFactoryId); + } + try { + return SchemaUtility.deserialize(buffer, allocator); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public NativeDataset finish() { + return finish(inspect()); + } + + @Override + public NativeDataset finish(Schema schema) { + try { + byte[] serialized = SchemaUtility.serialize(schema); + synchronized (this) { + if (closed) { + throw new NativeInstanceReleasedException(); + } + return new NativeDataset(new NativeContext(allocator, memoryPool), + JniWrapper.get().createDataset(datasetFactoryId, serialized)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Close this factory by release the pointer of the native instance. + */ + @Override + public synchronized void close() { + if (closed) { + return; + } + closed = true; + JniWrapper.get().closeDatasetFactory(datasetFactoryId); + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeInstanceReleasedException.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeInstanceReleasedException.java new file mode 100644 index 000000000..3231ca23a --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeInstanceReleasedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +/** + * Thrown if trying to operate on a native instance that is already released. + */ +public class NativeInstanceReleasedException extends RuntimeException { + public NativeInstanceReleasedException() { + super("Native instance has been released"); + } + + public NativeInstanceReleasedException(String message) { + super(message); + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java new file mode 100644 index 000000000..83825776b --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +/** + * C++ memory pool(arrow::MemoryPool)'s Java mapped instance. + */ +public class NativeMemoryPool implements AutoCloseable { + private final long nativeInstanceId; + + static { + JniLoader.get().ensureLoaded(); + } + + private NativeMemoryPool(long nativeInstanceId) { + this.nativeInstanceId = nativeInstanceId; + } + + /** + * Get the default memory pool. This will return arrow::default_memory_pool() directly. + */ + public static NativeMemoryPool getDefault() { + return new NativeMemoryPool(getDefaultMemoryPool()); + } + + /** + * Create a listenable memory pool (see also: arrow::ReservationListenableMemoryPool) with + * a specific listener. All buffers created from the memory pool should take enough reservation + * from the listener in advance. + */ + public static NativeMemoryPool createListenable(ReservationListener listener) { + return new NativeMemoryPool(createListenableMemoryPool(listener)); + } + + /** + * Return native instance ID of this memory pool. + */ + public long getNativeInstanceId() { + return nativeInstanceId; + } + + /** + * Get current allocated bytes. + */ + public long getBytesAllocated() { + return bytesAllocated(nativeInstanceId); + } + + @Override + public void close() throws Exception { + releaseMemoryPool(nativeInstanceId); + } + + private static native long getDefaultMemoryPool(); + + private static native long createListenableMemoryPool(ReservationListener listener); + + private static native void releaseMemoryPool(long id); + + private static native long bytesAllocated(long id); +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java new file mode 100644 index 000000000..dd90fd1c1 --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.util.Arrays; +import java.util.List; + +/** + * Hold pointers to a Arrow C++ RecordBatch. + */ +public class NativeRecordBatchHandle { + + private final long numRows; + private final List<Field> fields; + private final List<Buffer> buffers; + + /** + * Constructor. + * + * @param numRows Total row number of the associated RecordBatch + * @param fields Metadata of fields + * @param buffers Retained Arrow buffers + */ + public NativeRecordBatchHandle(long numRows, Field[] fields, Buffer[] buffers) { + this.numRows = numRows; + this.fields = Arrays.asList(fields); + this.buffers = Arrays.asList(buffers); + } + + /** + * Returns the total row number of the associated RecordBatch. + * @return Total row number of the associated RecordBatch. + */ + public long getNumRows() { + return numRows; + } + + /** + * Returns Metadata of fields. + * @return Metadata of fields. + */ + public List<Field> getFields() { + return fields; + } + + /** + * Returns the buffers. + * @return Retained Arrow buffers. + */ + public List<Buffer> getBuffers() { + return buffers; + } + + /** + * Field metadata. + */ + public static class Field { + public final long length; + public final long nullCount; + + public Field(long length, long nullCount) { + this.length = length; + this.nullCount = nullCount; + } + } + + /** + * Pointers and metadata of the targeted Arrow buffer. + */ + public static class Buffer { + public final long nativeInstanceId; + public final long memoryAddress; + public final long size; + public final long capacity; + + /** + * Constructor. + * + * @param nativeInstanceId Native instance's id + * @param memoryAddress Memory address of the first byte + * @param size Size (in bytes) + * @param capacity Capacity (in bytes) + */ + public Buffer(long nativeInstanceId, long memoryAddress, long size, long capacity) { + this.nativeInstanceId = nativeInstanceId; + this.memoryAddress = memoryAddress; + this.size = size; + this.capacity = capacity; + } + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java new file mode 100644 index 000000000..14d89c2ee --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import org.apache.arrow.dataset.scanner.ScanTask; + +/** + * Native implementation of {@link ScanTask}. Currently RecordBatches are iterated directly by the scanner + * id via {@link JniWrapper}, thus we allow only one-time execution of method {@link #execute()}. If a re-scan + * operation is expected, call {@link NativeDataset#newScan} to create a new scanner instance. + */ +public class NativeScanTask implements ScanTask { + private final NativeScanner scanner; + + /** + * Constructor. + */ + public NativeScanTask(NativeScanner scanner) { + this.scanner = scanner; + } + + @Override + public BatchIterator execute() { + return scanner.execute(); + } + + @Override + public void close() { + scanner.close(); + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java new file mode 100644 index 000000000..24c298067 --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.BufferLedger; +import org.apache.arrow.memory.NativeUnderlyingMemory; +import org.apache.arrow.memory.util.LargeMemoryUtil; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.SchemaUtility; + +/** + * Native implementation of {@link Scanner}. Note that it currently emits only a single scan task of type + * {@link NativeScanTask}, which is internally a combination of all scan task instances returned by the + * native scanner. + */ +public class NativeScanner implements Scanner { + + private final AtomicBoolean executed = new AtomicBoolean(false); + private final NativeContext context; + private final long scannerId; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Lock writeLock = lock.writeLock(); + private final Lock readLock = lock.readLock(); + private boolean closed = false; + + public NativeScanner(NativeContext context, long scannerId) { + this.context = context; + this.scannerId = scannerId; + } + + ScanTask.BatchIterator execute() { + if (closed) { + throw new NativeInstanceReleasedException(); + } + if (!executed.compareAndSet(false, true)) { + throw new UnsupportedOperationException("NativeScanner cannot be executed more than once. Consider creating " + + "new scanner instead"); + } + return new ScanTask.BatchIterator() { + private ArrowRecordBatch peek = null; + + @Override + public void close() { + NativeScanner.this.close(); + } + + @Override + public boolean hasNext() { + if (peek != null) { + return true; + } + final NativeRecordBatchHandle handle; + readLock.lock(); + try { + if (closed) { + throw new NativeInstanceReleasedException(); + } + handle = JniWrapper.get().nextRecordBatch(scannerId); + } finally { + readLock.unlock(); + } + if (handle == null) { + return false; + } + final ArrayList<ArrowBuf> buffers = new ArrayList<>(); + for (NativeRecordBatchHandle.Buffer buffer : handle.getBuffers()) { + final BufferAllocator allocator = context.getAllocator(); + final int size = LargeMemoryUtil.checkedCastToInt(buffer.size); + final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator, + size, buffer.nativeInstanceId, buffer.memoryAddress); + BufferLedger ledger = am.associate(allocator); + ArrowBuf buf = new ArrowBuf(ledger, null, size, buffer.memoryAddress); + buffers.add(buf); + } + + try { + final int numRows = LargeMemoryUtil.checkedCastToInt(handle.getNumRows()); + peek = new ArrowRecordBatch(numRows, handle.getFields().stream() + .map(field -> new ArrowFieldNode(field.length, field.nullCount)) + .collect(Collectors.toList()), buffers); + return true; + } finally { + buffers.forEach(buffer -> buffer.getReferenceManager().release()); + } + } + + @Override + public ArrowRecordBatch next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + try { + return peek; + } finally { + peek = null; + } + } + }; + } + + @Override + public Iterable<? extends NativeScanTask> scan() { + if (closed) { + throw new NativeInstanceReleasedException(); + } + return Collections.singletonList(new NativeScanTask(this)); + } + + @Override + public Schema schema() { + readLock.lock(); + try { + if (closed) { + throw new NativeInstanceReleasedException(); + } + return SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + readLock.unlock(); + } + } + + @Override + public void close() { + writeLock.lock(); + try { + if (closed) { + return; + } + closed = true; + JniWrapper.get().closeScanner(scannerId); + } finally { + writeLock.unlock(); + } + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java new file mode 100644 index 000000000..f1ffdd2ac --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +/** + * Listener of buffer memory reservation. Used by native datasets. + */ +public interface ReservationListener { + + /** + * Reserve bytes. + * + * @throws RuntimeException if request size cannot be granted + */ + void reserve(long size); + + /** + * Unreserve bytes. + */ + void unreserve(long size); +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java new file mode 100644 index 000000000..f5a1af384 --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.scanner; + +import java.util.Optional; + +import org.apache.arrow.util.Preconditions; + +/** + * Options used during scanning. + */ +public class ScanOptions { + private final Optional<String[]> columns; + private final long batchSize; + + /** + * Constructor. + * @param columns Projected columns. Empty for scanning all columns. + * @param batchSize Maximum row number of each returned {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch} + * + * @deprecated Deprecated. Use {@link #ScanOptions(long, Optional)} instead. + */ + @Deprecated + public ScanOptions(String[] columns, long batchSize) { + this(batchSize, Optional.of(columns).map(present -> { + if (present.length == 0) { + // Backwards compatibility: See ARROW-13257, in the new constructor, we now use null to scan for all columns. + return null; + } + return present; + })); + } + + /** + * Constructor. + * @param batchSize Maximum row number of each returned {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch} + * @param columns (Optional) Projected columns. {@link Optional#empty()} for scanning all columns. Otherwise, + * Only columns present in the Array will be scanned. + */ + public ScanOptions(long batchSize, Optional<String[]> columns) { + Preconditions.checkNotNull(columns); + this.batchSize = batchSize; + this.columns = columns; + } + + public ScanOptions(long batchSize) { + this(batchSize, Optional.empty()); + } + + public Optional<String[]> getColumns() { + return columns; + } + + public long getBatchSize() { + return batchSize; + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java new file mode 100644 index 000000000..d07036a61 --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.scanner; + +import java.util.Iterator; + +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +/** + * Read record batches from a range of a single data fragment. A + * ScanTask is meant to be a unit of work to be dispatched. The implementation + * must be thread and concurrent safe. + */ +public interface ScanTask extends AutoCloseable { + + /** + * Creates and returns a {@link BatchIterator} instance. + */ + BatchIterator execute(); + + /** + * The iterator implementation for {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch}s. + */ + interface BatchIterator extends Iterator<ArrowRecordBatch>, AutoCloseable { + + } +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java new file mode 100644 index 000000000..93a1b08f3 --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.scanner; + +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * A high level interface for scanning data over dataset. + */ +public interface Scanner extends AutoCloseable { + + /** + * Perform the scan operation. + * + * @return a iterable set of {@link ScanTask}s. Each task is considered independent and it is allowed + * to execute the tasks concurrently to gain better performance. + */ + Iterable<? extends ScanTask> scan(); + + /** + * Get the schema of this Scanner. + * + * @return the schema instance + */ + Schema schema(); +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java new file mode 100644 index 000000000..ce193581f --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.source; + +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.Scanner; + +/** + * A container of Fragments which are the internal iterable unit of read data. + */ +public interface Dataset extends AutoCloseable { + + /** + * Create a new Scanner using the provided scan options. + * + * @param options options used during creating Scanner + * @return the Scanner instance + */ + Scanner newScan(ScanOptions options); +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java new file mode 100644 index 000000000..46b8545d6 --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.source; + +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * DatasetFactory provides a way to inspect a Dataset potential + * schema before materializing it. Thus, the user can peek the schema for + * data sources and decide on a unified schema. + */ +public interface DatasetFactory extends AutoCloseable { + + /** + * Get unified schema for the resulting Dataset. + * + * @return the schema object inspected + */ + Schema inspect(); + + /** + * Create a Dataset with auto-inferred schema. Which means, the schema of the resulting Dataset will be + * the same with calling {@link #inspect()} manually. + * + * @return the Dataset instance + */ + Dataset finish(); + + /** + * Create a Dataset with predefined schema. Schema inference will not be performed. + * + * @param schema a predefined schema + * @return the Dataset instance + */ + Dataset finish(Schema schema); +} diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java new file mode 100644 index 000000000..963fb6170 --- /dev/null +++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import org.apache.arrow.dataset.jni.JniWrapper; + +/** + * AllocationManager implementation for native allocated memory. + */ +public class NativeUnderlyingMemory extends AllocationManager { + + private final int size; + private final long nativeInstanceId; + private final long address; + + /** + * Constructor. + * + * @param accountingAllocator The accounting allocator instance + * @param size Size of underlying memory (in bytes) + * @param nativeInstanceId ID of the native instance + */ + NativeUnderlyingMemory(BufferAllocator accountingAllocator, int size, long nativeInstanceId, long address) { + super(accountingAllocator); + this.size = size; + this.nativeInstanceId = nativeInstanceId; + this.address = address; + // pre-allocate bytes on accounting allocator + final AllocationListener listener = accountingAllocator.getListener(); + try (final AllocationReservation reservation = accountingAllocator.newReservation()) { + listener.onPreAllocation(size); + reservation.reserve(size); + listener.onAllocation(size); + } catch (Exception e) { + release0(); + throw e; + } + } + + /** + * Alias to constructor. + */ + public static NativeUnderlyingMemory create(BufferAllocator bufferAllocator, int size, long nativeInstanceId, + long address) { + return new NativeUnderlyingMemory(bufferAllocator, size, nativeInstanceId, address); + } + + public BufferLedger associate(BufferAllocator allocator) { + return super.associate(allocator); + } + + @Override + protected void release0() { + JniWrapper.get().releaseBuffer(nativeInstanceId); + } + + @Override + public long getSize() { + return size; + } + + @Override + protected long memoryAddress() { + return address; + } +} diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java new file mode 100644 index 000000000..c6299d135 --- /dev/null +++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.util.Preconditions; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; + +/** + * Utility class for writing Parquet files using Avro based tools. + */ +public class ParquetWriteSupport implements AutoCloseable { + + private final String path; + private final String uri; + private final ParquetWriter<GenericRecord> writer; + private final Schema avroSchema; + private final List<GenericRecord> writtenRecords = new ArrayList<>(); + private final GenericRecordListBuilder recordListBuilder = new GenericRecordListBuilder(); + + + public ParquetWriteSupport(String schemaName, File outputFolder) throws Exception { + avroSchema = readSchemaFromFile(schemaName); + path = outputFolder.getPath() + File.separator + "generated.parquet"; + uri = "file://" + path; + writer = AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(path)) + .withSchema(avroSchema) + .build(); + } + + private static Schema readSchemaFromFile(String schemaName) throws Exception { + Path schemaPath = Paths.get(ParquetWriteSupport.class.getResource("/").getPath(), + "avroschema", schemaName); + return new org.apache.avro.Schema.Parser().parse(schemaPath.toFile()); + } + + public static ParquetWriteSupport writeTempFile(String schemaName, File outputFolder, + Object... values) throws Exception { + try (final ParquetWriteSupport writeSupport = new ParquetWriteSupport(schemaName, outputFolder)) { + writeSupport.writeRecords(values); + return writeSupport; + } + } + + public void writeRecords(Object... values) throws Exception { + final List<GenericRecord> valueList = getRecordListBuilder().createRecordList(values); + writeRecords(valueList); + } + + public void writeRecords(List<GenericRecord> records) throws Exception { + for (GenericRecord record : records) { + writeRecord(record); + } + } + + public void writeRecord(GenericRecord record) throws Exception { + writtenRecords.add(record); + writer.write(record); + } + + public String getOutputURI() { + return uri; + } + + public Schema getAvroSchema() { + return avroSchema; + } + + public GenericRecordListBuilder getRecordListBuilder() { + return recordListBuilder; + } + + public List<GenericRecord> getWrittenRecords() { + return Collections.unmodifiableList(writtenRecords); + } + + @Override + public void close() throws Exception { + writer.close(); + } + + public class GenericRecordListBuilder { + public final List<GenericRecord> createRecordList(Object... values) { + final int fieldCount = avroSchema.getFields().size(); + Preconditions.checkArgument(values.length % fieldCount == 0, + "arg count of values should be divide by field number"); + final List<GenericRecord> recordList = new ArrayList<>(); + for (int i = 0; i < values.length / fieldCount; i++) { + final GenericRecord record = new GenericData.Record(avroSchema); + for (int j = 0; j < fieldCount; j++) { + record.put(j, values[i * fieldCount + j]); + } + recordList.add(record); + } + return Collections.unmodifiableList(recordList); + } + } +} diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java new file mode 100644 index 000000000..51dac15e5 --- /dev/null +++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset; + +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.dataset.source.Dataset; +import org.apache.arrow.dataset.source.DatasetFactory; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.After; +import org.junit.Before; + +public abstract class TestDataset { + private RootAllocator allocator = null; + + @Before + public void setUp() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + protected RootAllocator rootAllocator() { + return allocator; + } + + protected List<ArrowRecordBatch> collectResultFromFactory(DatasetFactory factory, ScanOptions options) { + final Dataset dataset = factory.finish(); + final Scanner scanner = dataset.newScan(options); + final List<ArrowRecordBatch> ret = stream(scanner.scan()) + .flatMap(t -> stream(t.execute())) + .collect(Collectors.toList()); + try { + AutoCloseables.close(scanner, dataset); + } catch (Exception e) { + throw new RuntimeException(e); + } + return ret; + } + + protected Schema inferResultSchemaFromFactory(DatasetFactory factory, ScanOptions options) { + final Dataset dataset = factory.finish(); + final Scanner scanner = dataset.newScan(options); + final Schema schema = scanner.schema(); + try { + AutoCloseables.close(scanner, dataset); + } catch (Exception e) { + throw new RuntimeException(e); + } + return schema; + } + + protected <T> Stream<T> stream(Iterable<T> iterable) { + return StreamSupport.stream(iterable.spliterator(), false); + } + + protected <T> List<T> collect(Iterable<T> iterable) { + return stream(iterable).collect(Collectors.toList()); + } + + protected <T> Stream<T> stream(Iterator<T> iterator) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); + } + + protected <T> List<T> collect(Iterator<T> iterator) { + return stream(iterator).collect(Collectors.toList()); + } +} diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java new file mode 100644 index 000000000..2b99f8283 --- /dev/null +++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import org.apache.arrow.dataset.ParquetWriteSupport; +import org.apache.arrow.dataset.jni.NativeDataset; +import org.apache.arrow.dataset.jni.NativeInstanceReleasedException; +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.dataset.jni.NativeScanTask; +import org.apache.arrow.dataset.jni.NativeScanner; +import org.apache.arrow.dataset.jni.TestNativeDataset; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; + +public class TestFileSystemDataset extends TestNativeDataset { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + public static final String AVRO_SCHEMA_USER = "user.avsc"; + + @Test + public void testBaseParquetRead() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(100); + Schema schema = inferResultSchemaFromFactory(factory, options); + List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options); + + assertSingleTaskProduced(factory, options); + assertEquals(1, datum.size()); + assertEquals(2, schema.getFields().size()); + assertEquals("id", schema.getFields().get(0).getName()); + assertEquals("name", schema.getFields().get(1).getName()); + assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType()); + assertEquals(Types.MinorType.VARCHAR.getType(), schema.getFields().get(1).getType()); + checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum); + + AutoCloseables.close(datum); + } + + @Test + public void testParquetProjectSingleColumn() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(100, Optional.of(new String[]{"id"})); + Schema schema = inferResultSchemaFromFactory(factory, options); + List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options); + org.apache.avro.Schema expectedSchema = truncateAvroSchema(writeSupport.getAvroSchema(), 0, 1); + + assertSingleTaskProduced(factory, options); + assertEquals(1, schema.getFields().size()); + assertEquals("id", schema.getFields().get(0).getName()); + assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType()); + assertEquals(1, datum.size()); + checkParquetReadResult(schema, + Collections.singletonList( + new GenericRecordBuilder( + expectedSchema) + .set("id", 1) + .build()), datum); + + AutoCloseables.close(datum); + } + + @Test + public void testParquetBatchSize() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 1, "a", 2, "b", 3, "c"); + + ScanOptions options = new ScanOptions(1); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + Schema schema = inferResultSchemaFromFactory(factory, options); + List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options); + + assertSingleTaskProduced(factory, options); + assertEquals(3, datum.size()); + datum.forEach(batch -> assertEquals(1, batch.getLength())); + checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum); + + AutoCloseables.close(datum); + } + + @Test + public void testEmptyProjectSelectsZeroColumns() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(100, Optional.of(new String[0])); + Schema schema = inferResultSchemaFromFactory(factory, options); + List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options); + org.apache.avro.Schema expectedSchema = org.apache.avro.Schema.createRecord(Collections.emptyList()); + + assertSingleTaskProduced(factory, options); + assertEquals(0, schema.getFields().size()); + assertEquals(1, datum.size()); + checkParquetReadResult(schema, + Collections.singletonList( + new GenericRecordBuilder( + expectedSchema) + .build()), datum); + + AutoCloseables.close(datum); + } + + @Test + public void testNullProjectSelectsAllColumns() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(100, Optional.empty()); + Schema schema = inferResultSchemaFromFactory(factory, options); + List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options); + + assertSingleTaskProduced(factory, options); + assertEquals(1, datum.size()); + assertEquals(2, schema.getFields().size()); + assertEquals("id", schema.getFields().get(0).getName()); + assertEquals("name", schema.getFields().get(1).getName()); + assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType()); + assertEquals(Types.MinorType.VARCHAR.getType(), schema.getFields().get(1).getType()); + checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum); + + AutoCloseables.close(datum); + } + + @Test + public void testNoErrorWhenCloseAgain() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + + assertDoesNotThrow(() -> { + NativeDataset dataset = factory.finish(); + dataset.close(); + dataset.close(); + }); + } + + @Test + public void testErrorThrownWhenScanAgain() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + NativeDataset dataset = factory.finish(); + ScanOptions options = new ScanOptions(100); + NativeScanner scanner = dataset.newScan(options); + List<? extends NativeScanTask> taskList1 = collect(scanner.scan()); + List<? extends NativeScanTask> taskList2 = collect(scanner.scan()); + NativeScanTask task1 = taskList1.get(0); + NativeScanTask task2 = taskList2.get(0); + List<ArrowRecordBatch> datum = collect(task1.execute()); + + UnsupportedOperationException uoe = assertThrows(UnsupportedOperationException.class, task2::execute); + Assertions.assertEquals("NativeScanner cannot be executed more than once. Consider creating new scanner instead", + uoe.getMessage()); + + AutoCloseables.close(datum); + AutoCloseables.close(taskList1); + AutoCloseables.close(taskList2); + AutoCloseables.close(scanner, dataset, factory); + } + + @Test + public void testScanInOtherThread() throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(); + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + NativeDataset dataset = factory.finish(); + ScanOptions options = new ScanOptions(100); + NativeScanner scanner = dataset.newScan(options); + List<? extends NativeScanTask> taskList = collect(scanner.scan()); + NativeScanTask task = taskList.get(0); + List<ArrowRecordBatch> datum = executor.submit(() -> collect(task.execute())).get(); + + AutoCloseables.close(datum); + AutoCloseables.close(taskList); + AutoCloseables.close(scanner, dataset, factory); + } + + @Test + public void testErrorThrownWhenScanAfterScannerClose() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + NativeDataset dataset = factory.finish(); + ScanOptions options = new ScanOptions(100); + NativeScanner scanner = dataset.newScan(options); + scanner.close(); + assertThrows(NativeInstanceReleasedException.class, scanner::scan); + } + + @Test + public void testErrorThrownWhenExecuteTaskAfterTaskClose() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + NativeDataset dataset = factory.finish(); + ScanOptions options = new ScanOptions(100); + NativeScanner scanner = dataset.newScan(options); + List<? extends NativeScanTask> tasks = collect(scanner.scan()); + NativeScanTask task = tasks.get(0); + task.close(); + assertThrows(NativeInstanceReleasedException.class, task::execute); + } + + @Test + public void testErrorThrownWhenIterateOnIteratorAfterTaskClose() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + NativeDataset dataset = factory.finish(); + ScanOptions options = new ScanOptions(100); + NativeScanner scanner = dataset.newScan(options); + List<? extends NativeScanTask> tasks = collect(scanner.scan()); + NativeScanTask task = tasks.get(0); + ScanTask.BatchIterator iterator = task.execute(); + task.close(); + assertThrows(NativeInstanceReleasedException.class, iterator::hasNext); + } + + @Test + public void testMemoryAllocationOnAssociatedAllocator() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(100); + long initReservation = rootAllocator().getAllocatedMemory(); + List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options); + final long expected_diff = datum.stream() + .flatMapToLong(batch -> batch.getBuffers() + .stream() + .mapToLong(buf -> buf.getReferenceManager().getAccountedSize())).sum(); + long reservation = rootAllocator().getAllocatedMemory(); + AutoCloseables.close(datum); + long finalReservation = rootAllocator().getAllocatedMemory(); + Assert.assertEquals(expected_diff, reservation - initReservation); + Assert.assertEquals(-expected_diff, finalReservation - reservation); + } + + private void checkParquetReadResult(Schema schema, List<GenericRecord> expected, List<ArrowRecordBatch> actual) { + assertEquals(expected.size(), actual.stream() + .mapToInt(ArrowRecordBatch::getLength) + .sum()); + final int fieldCount = schema.getFields().size(); + LinkedList<GenericRecord> expectedRemovable = new LinkedList<>(expected); + try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) { + VectorLoader loader = new VectorLoader(vsr); + for (ArrowRecordBatch batch : actual) { + try { + assertEquals(fieldCount, batch.getNodes().size()); + loader.load(batch); + int batchRowCount = vsr.getRowCount(); + for (int i = 0; i < fieldCount; i++) { + FieldVector vector = vsr.getVector(i); + for (int j = 0; j < batchRowCount; j++) { + Object object = vector.getObject(j); + Object expectedObject = expectedRemovable.get(j).get(i); + assertEquals(Objects.toString(expectedObject), + Objects.toString(object)); + } + } + for (int i = 0; i < batchRowCount; i++) { + expectedRemovable.poll(); + } + } finally { + batch.close(); + } + } + assertTrue(expectedRemovable.isEmpty()); + } + } + + private org.apache.avro.Schema truncateAvroSchema(org.apache.avro.Schema schema, int from, int to) { + List<org.apache.avro.Schema.Field> fields = schema.getFields().subList(from, to); + return org.apache.avro.Schema.createRecord( + fields.stream() + .map(f -> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())) + .collect(Collectors.toList())); + } +} diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDatasetFactory.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDatasetFactory.java new file mode 100644 index 000000000..bddf96b5e --- /dev/null +++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDatasetFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.memory.RootAllocator; +import org.junit.Test; + +public class TestFileSystemDatasetFactory { + + @Test + public void testErrorHandling() { + RuntimeException e = assertThrows(RuntimeException.class, () -> { + new FileSystemDatasetFactory(new RootAllocator(Long.MAX_VALUE), NativeMemoryPool.getDefault(), + FileFormat.NONE, "file:///NON_EXIST_FILE"); + }); + assertEquals("illegal file format id: -1", e.getMessage()); + } + + @Test + public void testCloseAgain() { + assertDoesNotThrow(() -> { + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(new RootAllocator(Long.MAX_VALUE), + NativeMemoryPool.getDefault(), FileFormat.PARQUET, "file:///NON_EXIST_FILE"); + factory.close(); + factory.close(); + }); + } +} diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java new file mode 100644 index 000000000..2a86a2568 --- /dev/null +++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import org.apache.arrow.dataset.TestDataset; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.dataset.source.Dataset; +import org.apache.arrow.dataset.source.DatasetFactory; +import org.junit.Assert; + +public abstract class TestNativeDataset extends TestDataset { + protected void assertSingleTaskProduced(DatasetFactory factory, ScanOptions options) { + final Dataset dataset = factory.finish(); + final Scanner scanner = dataset.newScan(options); + Assert.assertEquals(1L, stream(scanner.scan()).count()); + } +} diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java new file mode 100644 index 000000000..2bc1b9a41 --- /dev/null +++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.arrow.dataset.ParquetWriteSupport; +import org.apache.arrow.dataset.TestDataset; +import org.apache.arrow.dataset.file.FileFormat; +import org.apache.arrow.dataset.file.FileSystemDatasetFactory; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestReservationListener extends TestDataset { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + public static final String AVRO_SCHEMA_USER = "user.avsc"; + + @Test + public void testDirectReservationListener() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + NativeMemoryPool pool = NativeMemoryPool.createListenable(DirectReservationListener.instance()); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), + pool, FileFormat.PARQUET, + writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(100); + long initReservation = DirectReservationListener.instance().getCurrentDirectMemReservation(); + List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options); + long reservation = DirectReservationListener.instance().getCurrentDirectMemReservation(); + AutoCloseables.close(datum); + AutoCloseables.close(pool); + long finalReservation = DirectReservationListener.instance().getCurrentDirectMemReservation(); + Assert.assertTrue(reservation >= initReservation); + Assert.assertTrue(finalReservation == initReservation); + } + + @Test + public void testCustomReservationListener() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + final AtomicLong reserved = new AtomicLong(0L); + ReservationListener listener = new ReservationListener() { + @Override + public void reserve(long size) { + reserved.getAndAdd(size); + } + + @Override + public void unreserve(long size) { + reserved.getAndAdd(-size); + } + }; + NativeMemoryPool pool = NativeMemoryPool.createListenable(listener); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), + pool, FileFormat.PARQUET, writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(100); + long initReservation = reserved.get(); + List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options); + long reservation = reserved.get(); + AutoCloseables.close(datum); + AutoCloseables.close(pool); + long finalReservation = reserved.get(); + Assert.assertTrue(reservation >= initReservation); + Assert.assertTrue(finalReservation == initReservation); + } +} diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java new file mode 100644 index 000000000..c81868e42 --- /dev/null +++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestNativeUnderlyingMemory { + + private RootAllocator allocator = null; + + @Before + public void setUp() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + protected RootAllocator rootAllocator() { + return allocator; + } + + @Test + public void testReservation() { + final RootAllocator root = rootAllocator(); + + final int size = 512; + final AllocationManager am = new MockUnderlyingMemory(root, size); + final BufferLedger ledger = am.associate(root); + + assertEquals(size, root.getAllocatedMemory()); + + ledger.release(); + } + + @Test + public void testBufferTransfer() { + final RootAllocator root = rootAllocator(); + + ChildAllocator allocator1 = (ChildAllocator) root.newChildAllocator("allocator1", 0, Long.MAX_VALUE); + ChildAllocator allocator2 = (ChildAllocator) root.newChildAllocator("allocator2", 0, Long.MAX_VALUE); + assertEquals(0, allocator1.getAllocatedMemory()); + assertEquals(0, allocator2.getAllocatedMemory()); + + final int size = 512; + final AllocationManager am = new MockUnderlyingMemory(allocator1, size); + + final BufferLedger owningLedger = am.associate(allocator1); + assertEquals(size, owningLedger.getAccountedSize()); + assertEquals(size, owningLedger.getSize()); + assertEquals(size, allocator1.getAllocatedMemory()); + + final BufferLedger transferredLedger = am.associate(allocator2); + owningLedger.release(); // release previous owner + assertEquals(0, owningLedger.getAccountedSize()); + assertEquals(size, owningLedger.getSize()); + assertEquals(size, transferredLedger.getAccountedSize()); + assertEquals(size, transferredLedger.getSize()); + assertEquals(0, allocator1.getAllocatedMemory()); + assertEquals(size, allocator2.getAllocatedMemory()); + + transferredLedger.release(); + allocator1.close(); + allocator2.close(); + } + + /** + * A mock class of {@link NativeUnderlyingMemory} for unit testing about size-related operations. + */ + private static class MockUnderlyingMemory extends NativeUnderlyingMemory { + + /** + * Constructor. + */ + MockUnderlyingMemory(BaseAllocator accountingAllocator, int size) { + super(accountingAllocator, size, -1L, -1L); + } + + @Override + protected void release0() { + System.out.println("Underlying memory released. Size: " + getSize()); + } + + @Override + protected long memoryAddress() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/src/arrow/java/dataset/src/test/resources/avroschema/user.avsc b/src/arrow/java/dataset/src/test/resources/avroschema/user.avsc new file mode 100644 index 000000000..072b64391 --- /dev/null +++ b/src/arrow/java/dataset/src/test/resources/avroschema/user.avsc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.dataset", + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": ["int", "null"]}, + {"name": "name", "type": ["string", "null"]} + ] +} |