summaryrefslogtreecommitdiffstats
path: root/src/arrow/java/dataset
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/java/dataset')
-rw-r--r--src/arrow/java/dataset/CMakeLists.txt43
-rw-r--r--src/arrow/java/dataset/pom.xml134
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java36
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java38
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java47
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java97
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java94
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java107
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java53
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java56
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java104
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeInstanceReleasedException.java31
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java76
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java106
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java46
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java170
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java36
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java72
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java42
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java41
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java35
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java51
-rw-r--r--src/arrow/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java81
-rw-r--r--src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java123
-rw-r--r--src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java97
-rw-r--r--src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java338
-rw-r--r--src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDatasetFactory.java48
-rw-r--r--src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java33
-rw-r--r--src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java88
-rw-r--r--src/arrow/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java110
-rw-r--r--src/arrow/java/dataset/src/test/resources/avroschema/user.avsc26
31 files changed, 2459 insertions, 0 deletions
diff --git a/src/arrow/java/dataset/CMakeLists.txt b/src/arrow/java/dataset/CMakeLists.txt
new file mode 100644
index 000000000..07e2d0ae8
--- /dev/null
+++ b/src/arrow/java/dataset/CMakeLists.txt
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#
+# arrow_dataset_java
+#
+
+# Headers: top level
+
+project(arrow_dataset_java)
+
+# Find java/jni
+include(FindJava)
+include(UseJava)
+include(FindJNI)
+
+message("generating headers to ${JNI_HEADERS_DIR}")
+
+add_jar(arrow_dataset_java
+ src/main/java/org/apache/arrow/dataset/jni/JniLoader.java
+ src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
+ src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java
+ src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
+ src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java
+ src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java
+ GENERATE_NATIVE_HEADERS
+ arrow_dataset_java-native
+ DESTINATION
+ ${JNI_HEADERS_DIR})
diff --git a/src/arrow/java/dataset/pom.xml b/src/arrow/java/dataset/pom.xml
new file mode 100644
index 000000000..0a393c50a
--- /dev/null
+++ b/src/arrow/java/dataset/pom.xml
@@ -0,0 +1,134 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>arrow-java-root</artifactId>
+ <groupId>org.apache.arrow</groupId>
+ <version>6.0.1</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>arrow-dataset</artifactId>
+ <name>Arrow Java Dataset</name>
+ <description>Java implementation of Arrow Dataset API/Framework</description>
+ <packaging>jar</packaging>
+ <properties>
+ <arrow.cpp.build.dir>../../../cpp/release-build/</arrow.cpp.build.dir>
+ <protobuf.version>2.5.0</protobuf.version>
+ <parquet.version>1.11.0</parquet.version>
+ <avro.version>1.8.2</avro.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ <classifier>${arrow.vector.classifier}</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>${parquet.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${parquet.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${dep.hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${dep.guava.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>${arrow.cpp.build.dir}</directory>
+ <includes>
+ <include>**/libarrow_dataset_jni.*</include>
+ </includes>
+ </resource>
+ </resources>
+
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.5.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
+ <protoSourceRoot>../../cpp/src/jni/dataset/proto</protoSourceRoot>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>test-compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
new file mode 100644
index 000000000..e341d46be
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+/**
+ * File format definitions.
+ */
+public enum FileFormat {
+ PARQUET(0),
+ NONE(-1);
+
+ private int id;
+
+ FileFormat(int id) {
+ this.id = id;
+ }
+
+ public int id() {
+ return id;
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
new file mode 100644
index 000000000..1268d11fe
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+import org.apache.arrow.dataset.jni.NativeDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.memory.BufferAllocator;
+
+/**
+ * Java binding of the C++ FileSystemDatasetFactory.
+ */
+public class FileSystemDatasetFactory extends NativeDatasetFactory {
+
+ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format,
+ String uri) {
+ super(allocator, memoryPool, createNative(format, uri));
+ }
+
+ private static long createNative(FileFormat format, String uri) {
+ return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id());
+ }
+
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
new file mode 100644
index 000000000..1af307aac
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+import org.apache.arrow.dataset.jni.JniLoader;
+
+/**
+ * JniWrapper for filesystem based {@link org.apache.arrow.dataset.source.Dataset} implementations.
+ */
+public class JniWrapper {
+
+ private static final JniWrapper INSTANCE = new JniWrapper();
+
+ public static JniWrapper get() {
+ return INSTANCE;
+ }
+
+ private JniWrapper() {
+ JniLoader.get().ensureLoaded();
+ }
+
+ /**
+ * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a
+ * intermediate shared_ptr of the factory instance.
+ * @param uri file uri to read
+ * @param fileFormat file format ID
+ * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance.
+ * @see FileFormat
+ */
+ public native long makeFileSystemDatasetFactory(String uri, int fileFormat);
+
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java
new file mode 100644
index 000000000..72a1cadcf
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.arrow.util.VisibleForTesting;
+
+/**
+ * Reserving Java direct memory bytes from java.nio.Bits. Used by Java Dataset API's C++ memory
+ * pool implementation. This makes memory allocated by the pool to be controlled by JVM option
+ * "-XX:MaxDirectMemorySize".
+ */
+public class DirectReservationListener implements ReservationListener {
+ private final Method methodReserve;
+ private final Method methodUnreserve;
+
+ private DirectReservationListener() {
+ try {
+ final Class<?> classBits = Class.forName("java.nio.Bits");
+ methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class);
+ methodReserve.setAccessible(true);
+ methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class);
+ methodUnreserve.setAccessible(true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static final DirectReservationListener INSTANCE = new DirectReservationListener();
+
+ public static DirectReservationListener instance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Reserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#reserveMemory.
+ */
+ @Override
+ public void reserve(long size) {
+ try {
+ if (size > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
+ }
+ methodReserve.invoke(null, (int) size, (int) size);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Unreserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#unreserveMemory.
+ */
+ @Override
+ public void unreserve(long size) {
+ try {
+ if (size > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
+ }
+ methodUnreserve.invoke(null, (int) size, (int) size);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Get current reservation of jVM direct memory. Visible for testing.
+ */
+ @VisibleForTesting
+ public long getCurrentDirectMemReservation() {
+ try {
+ final Class<?> classBits = Class.forName("java.nio.Bits");
+ final Field f = classBits.getDeclaredField("reservedMemory");
+ f.setAccessible(true);
+ return ((AtomicLong) f.get(null)).get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java
new file mode 100644
index 000000000..15ce5448b
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * The JniLoader for Dataset API's native implementation.
+ */
+public final class JniLoader {
+
+ private static final JniLoader INSTANCE = new JniLoader(Collections.singletonList("arrow_dataset_jni"));
+
+ public static JniLoader get() {
+ return INSTANCE;
+ }
+
+ private final Set<String> librariesToLoad;
+
+ private JniLoader(List<String> libraryNames) {
+ librariesToLoad = new HashSet<>(libraryNames);
+ }
+
+ private boolean finished() {
+ return librariesToLoad.isEmpty();
+ }
+
+ /**
+ * If required JNI libraries are not loaded, then load them.
+ */
+ public void ensureLoaded() {
+ if (finished()) {
+ return;
+ }
+ loadRemaining();
+ }
+
+ private synchronized void loadRemaining() {
+ // The method is protected by a mutex via synchronized, if more than one thread race to call
+ // loadRemaining, at same time only one will do the actual loading and the others will wait for
+ // the mutex to be acquired then check on the remaining list: if there are libraries that were not
+ // successfully loaded then the mutex owner will try to load them again.
+ if (finished()) {
+ return;
+ }
+ List<String> libs = new ArrayList<>(librariesToLoad);
+ for (String lib : libs) {
+ load(lib);
+ librariesToLoad.remove(lib);
+ }
+ }
+
+ private void load(String name) {
+ final String libraryToLoad = System.mapLibraryName(name);
+ try {
+ File temp = File.createTempFile("jnilib-", ".tmp", new File(System.getProperty("java.io.tmpdir")));
+ try (final InputStream is
+ = JniWrapper.class.getClassLoader().getResourceAsStream(libraryToLoad)) {
+ if (is == null) {
+ throw new FileNotFoundException(libraryToLoad);
+ }
+ Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ System.load(temp.getAbsolutePath());
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("error loading native libraries: " + e);
+ }
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
new file mode 100644
index 000000000..7dd54e764
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+/**
+ * JNI wrapper for Dataset API's native implementation.
+ */
+public class JniWrapper {
+
+ private static final JniWrapper INSTANCE = new JniWrapper();
+
+ public static JniWrapper get() {
+ return INSTANCE;
+ }
+
+ private JniWrapper() {
+ JniLoader.get().ensureLoaded();
+ }
+
+ /**
+ * Release the DatasetFactory by destroying its reference held by JNI wrapper.
+ *
+ * @param datasetFactoryId the native pointer of the arrow::dataset::DatasetFactory instance.
+ */
+ public native void closeDatasetFactory(long datasetFactoryId);
+
+ /**
+ * Get a serialized schema from native instance of a DatasetFactory.
+ *
+ * @param datasetFactoryId the native pointer of the arrow::dataset::DatasetFactory instance.
+ * @return the serialized schema
+ * @see org.apache.arrow.vector.types.pojo.Schema
+ */
+ public native byte[] inspectSchema(long datasetFactoryId);
+
+ /**
+ * Create Dataset from a DatasetFactory and get the native pointer of the Dataset.
+ *
+ * @param datasetFactoryId the native pointer of the arrow::dataset::DatasetFactory instance.
+ * @param schema the predefined schema of the resulting Dataset.
+ * @return the native pointer of the arrow::dataset::Dataset instance.
+ */
+ public native long createDataset(long datasetFactoryId, byte[] schema);
+
+ /**
+ * Release the Dataset by destroying its reference held by JNI wrapper.
+ *
+ * @param datasetId the native pointer of the arrow::dataset::Dataset instance.
+ */
+ public native void closeDataset(long datasetId);
+
+ /**
+ * Create Scanner from a Dataset and get the native pointer of the Dataset.
+ * @param datasetId the native pointer of the arrow::dataset::Dataset instance.
+ * @param columns desired column names.
+ * Columns not in this list will not be emitted when performing scan operation. Null equals
+ * to "all columns".
+ * @param batchSize batch size of scanned record batches.
+ * @param memoryPool identifier of memory pool used in the native scanner.
+ * @return the native pointer of the arrow::dataset::Scanner instance.
+ */
+ public native long createScanner(long datasetId, String[] columns, long batchSize, long memoryPool);
+
+ /**
+ * Get a serialized schema from native instance of a Scanner.
+ *
+ * @param scannerId the native pointer of the arrow::dataset::Scanner instance.
+ * @return the serialized schema
+ * @see org.apache.arrow.vector.types.pojo.Schema
+ */
+ public native byte[] getSchemaFromScanner(long scannerId);
+
+ /**
+ * Release the Scanner by destroying its reference held by JNI wrapper.
+ * @param scannerId the native pointer of the arrow::dataset::Scanner instance.
+ */
+ public native void closeScanner(long scannerId);
+
+ /**
+ * Read next record batch from the specified scanner.
+ * @param scannerId the native pointer of the arrow::dataset::Scanner instance.
+ * @return an instance of {@link NativeRecordBatchHandle} describing the overall layout of the native record batch.
+ */
+ public native NativeRecordBatchHandle nextRecordBatch(long scannerId);
+
+ /**
+ * Release the Buffer by destroying its reference held by JNI wrapper.
+ * @param bufferId the native pointer of the arrow::Buffer instance.
+ */
+ public native void releaseBuffer(long bufferId);
+
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java
new file mode 100644
index 000000000..7f6dfbc02
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import org.apache.arrow.memory.BufferAllocator;
+
+/**
+ * Context for relevant classes of NativeDataset.
+ */
+public class NativeContext {
+ private final BufferAllocator allocator;
+ private final NativeMemoryPool memoryPool;
+
+ /**
+ * Constructor.
+ *
+ * @param allocator The allocator in use.
+ * @param memoryPool Native memory pool.
+ */
+ public NativeContext(BufferAllocator allocator, NativeMemoryPool memoryPool) {
+ this.allocator = allocator;
+ this.memoryPool = memoryPool;
+ }
+
+ /**
+ * Returns the allocator which is in use.
+ */
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
+
+ /**
+ * Returns the native memory pool.
+ */
+ public NativeMemoryPool getMemoryPool() {
+ return memoryPool;
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java
new file mode 100644
index 000000000..30ff1a930
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.source.Dataset;
+
+/**
+ * Native implementation of {@link Dataset}.
+ */
+public class NativeDataset implements Dataset {
+
+ private final NativeContext context;
+ private final long datasetId;
+
+ private boolean closed = false;
+
+ public NativeDataset(NativeContext context, long datasetId) {
+ this.context = context;
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ public synchronized NativeScanner newScan(ScanOptions options) {
+ if (closed) {
+ throw new NativeInstanceReleasedException();
+ }
+ long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns().orElse(null),
+ options.getBatchSize(), context.getMemoryPool().getNativeInstanceId());
+ return new NativeScanner(context, scannerId);
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ JniWrapper.get().closeDataset(datasetId);
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java
new file mode 100644
index 000000000..993d44fa2
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import java.io.IOException;
+
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.SchemaUtility;
+
+/**
+ * Native implementation of {@link DatasetFactory}.
+ */
+public class NativeDatasetFactory implements DatasetFactory {
+ private final long datasetFactoryId;
+ private final NativeMemoryPool memoryPool;
+ private final BufferAllocator allocator;
+
+ private boolean closed = false;
+
+ /**
+ * Constructor.
+ *
+ * @param allocator a context allocator associated with this factory. Any buffer that will be created natively will
+ * be then bound to this allocator.
+ * @param memoryPool the native memory pool associated with this factory. Any buffer created natively should request
+ * for memory spaces from this memory pool. This is a mapped instance of c++ arrow::MemoryPool.
+ * @param datasetFactoryId an ID, at the same time the native pointer of the underlying native instance of this
+ * factory. Make sure in c++ side the pointer is pointing to the shared pointer wrapping
+ * the actual instance so we could successfully decrease the reference count once
+ * {@link #close} is called.
+ * @see #close()
+ */
+ public NativeDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, long datasetFactoryId) {
+ this.allocator = allocator;
+ this.memoryPool = memoryPool;
+ this.datasetFactoryId = datasetFactoryId;
+ }
+
+ @Override
+ public Schema inspect() {
+ final byte[] buffer;
+ synchronized (this) {
+ if (closed) {
+ throw new NativeInstanceReleasedException();
+ }
+ buffer = JniWrapper.get().inspectSchema(datasetFactoryId);
+ }
+ try {
+ return SchemaUtility.deserialize(buffer, allocator);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public NativeDataset finish() {
+ return finish(inspect());
+ }
+
+ @Override
+ public NativeDataset finish(Schema schema) {
+ try {
+ byte[] serialized = SchemaUtility.serialize(schema);
+ synchronized (this) {
+ if (closed) {
+ throw new NativeInstanceReleasedException();
+ }
+ return new NativeDataset(new NativeContext(allocator, memoryPool),
+ JniWrapper.get().createDataset(datasetFactoryId, serialized));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Close this factory by release the pointer of the native instance.
+ */
+ @Override
+ public synchronized void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ JniWrapper.get().closeDatasetFactory(datasetFactoryId);
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeInstanceReleasedException.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeInstanceReleasedException.java
new file mode 100644
index 000000000..3231ca23a
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeInstanceReleasedException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+/**
+ * Thrown if trying to operate on a native instance that is already released.
+ */
+public class NativeInstanceReleasedException extends RuntimeException {
+ public NativeInstanceReleasedException() {
+ super("Native instance has been released");
+ }
+
+ public NativeInstanceReleasedException(String message) {
+ super(message);
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java
new file mode 100644
index 000000000..83825776b
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+/**
+ * C++ memory pool(arrow::MemoryPool)'s Java mapped instance.
+ */
+public class NativeMemoryPool implements AutoCloseable {
+ private final long nativeInstanceId;
+
+ static {
+ JniLoader.get().ensureLoaded();
+ }
+
+ private NativeMemoryPool(long nativeInstanceId) {
+ this.nativeInstanceId = nativeInstanceId;
+ }
+
+ /**
+ * Get the default memory pool. This will return arrow::default_memory_pool() directly.
+ */
+ public static NativeMemoryPool getDefault() {
+ return new NativeMemoryPool(getDefaultMemoryPool());
+ }
+
+ /**
+ * Create a listenable memory pool (see also: arrow::ReservationListenableMemoryPool) with
+ * a specific listener. All buffers created from the memory pool should take enough reservation
+ * from the listener in advance.
+ */
+ public static NativeMemoryPool createListenable(ReservationListener listener) {
+ return new NativeMemoryPool(createListenableMemoryPool(listener));
+ }
+
+ /**
+ * Return native instance ID of this memory pool.
+ */
+ public long getNativeInstanceId() {
+ return nativeInstanceId;
+ }
+
+ /**
+ * Get current allocated bytes.
+ */
+ public long getBytesAllocated() {
+ return bytesAllocated(nativeInstanceId);
+ }
+
+ @Override
+ public void close() throws Exception {
+ releaseMemoryPool(nativeInstanceId);
+ }
+
+ private static native long getDefaultMemoryPool();
+
+ private static native long createListenableMemoryPool(ReservationListener listener);
+
+ private static native void releaseMemoryPool(long id);
+
+ private static native long bytesAllocated(long id);
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java
new file mode 100644
index 000000000..dd90fd1c1
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Hold pointers to a Arrow C++ RecordBatch.
+ */
+public class NativeRecordBatchHandle {
+
+ private final long numRows;
+ private final List<Field> fields;
+ private final List<Buffer> buffers;
+
+ /**
+ * Constructor.
+ *
+ * @param numRows Total row number of the associated RecordBatch
+ * @param fields Metadata of fields
+ * @param buffers Retained Arrow buffers
+ */
+ public NativeRecordBatchHandle(long numRows, Field[] fields, Buffer[] buffers) {
+ this.numRows = numRows;
+ this.fields = Arrays.asList(fields);
+ this.buffers = Arrays.asList(buffers);
+ }
+
+ /**
+ * Returns the total row number of the associated RecordBatch.
+ * @return Total row number of the associated RecordBatch.
+ */
+ public long getNumRows() {
+ return numRows;
+ }
+
+ /**
+ * Returns Metadata of fields.
+ * @return Metadata of fields.
+ */
+ public List<Field> getFields() {
+ return fields;
+ }
+
+ /**
+ * Returns the buffers.
+ * @return Retained Arrow buffers.
+ */
+ public List<Buffer> getBuffers() {
+ return buffers;
+ }
+
+ /**
+ * Field metadata.
+ */
+ public static class Field {
+ public final long length;
+ public final long nullCount;
+
+ public Field(long length, long nullCount) {
+ this.length = length;
+ this.nullCount = nullCount;
+ }
+ }
+
+ /**
+ * Pointers and metadata of the targeted Arrow buffer.
+ */
+ public static class Buffer {
+ public final long nativeInstanceId;
+ public final long memoryAddress;
+ public final long size;
+ public final long capacity;
+
+ /**
+ * Constructor.
+ *
+ * @param nativeInstanceId Native instance's id
+ * @param memoryAddress Memory address of the first byte
+ * @param size Size (in bytes)
+ * @param capacity Capacity (in bytes)
+ */
+ public Buffer(long nativeInstanceId, long memoryAddress, long size, long capacity) {
+ this.nativeInstanceId = nativeInstanceId;
+ this.memoryAddress = memoryAddress;
+ this.size = size;
+ this.capacity = capacity;
+ }
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java
new file mode 100644
index 000000000..14d89c2ee
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import org.apache.arrow.dataset.scanner.ScanTask;
+
+/**
+ * Native implementation of {@link ScanTask}. Currently RecordBatches are iterated directly by the scanner
+ * id via {@link JniWrapper}, thus we allow only one-time execution of method {@link #execute()}. If a re-scan
+ * operation is expected, call {@link NativeDataset#newScan} to create a new scanner instance.
+ */
+public class NativeScanTask implements ScanTask {
+ private final NativeScanner scanner;
+
+ /**
+ * Constructor.
+ */
+ public NativeScanTask(NativeScanner scanner) {
+ this.scanner = scanner;
+ }
+
+ @Override
+ public BatchIterator execute() {
+ return scanner.execute();
+ }
+
+ @Override
+ public void close() {
+ scanner.close();
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
new file mode 100644
index 000000000..24c298067
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.BufferLedger;
+import org.apache.arrow.memory.NativeUnderlyingMemory;
+import org.apache.arrow.memory.util.LargeMemoryUtil;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.SchemaUtility;
+
+/**
+ * Native implementation of {@link Scanner}. Note that it currently emits only a single scan task of type
+ * {@link NativeScanTask}, which is internally a combination of all scan task instances returned by the
+ * native scanner.
+ */
+public class NativeScanner implements Scanner {
+
+ private final AtomicBoolean executed = new AtomicBoolean(false);
+ private final NativeContext context;
+ private final long scannerId;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Lock writeLock = lock.writeLock();
+ private final Lock readLock = lock.readLock();
+ private boolean closed = false;
+
+ public NativeScanner(NativeContext context, long scannerId) {
+ this.context = context;
+ this.scannerId = scannerId;
+ }
+
+ ScanTask.BatchIterator execute() {
+ if (closed) {
+ throw new NativeInstanceReleasedException();
+ }
+ if (!executed.compareAndSet(false, true)) {
+ throw new UnsupportedOperationException("NativeScanner cannot be executed more than once. Consider creating " +
+ "new scanner instead");
+ }
+ return new ScanTask.BatchIterator() {
+ private ArrowRecordBatch peek = null;
+
+ @Override
+ public void close() {
+ NativeScanner.this.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (peek != null) {
+ return true;
+ }
+ final NativeRecordBatchHandle handle;
+ readLock.lock();
+ try {
+ if (closed) {
+ throw new NativeInstanceReleasedException();
+ }
+ handle = JniWrapper.get().nextRecordBatch(scannerId);
+ } finally {
+ readLock.unlock();
+ }
+ if (handle == null) {
+ return false;
+ }
+ final ArrayList<ArrowBuf> buffers = new ArrayList<>();
+ for (NativeRecordBatchHandle.Buffer buffer : handle.getBuffers()) {
+ final BufferAllocator allocator = context.getAllocator();
+ final int size = LargeMemoryUtil.checkedCastToInt(buffer.size);
+ final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator,
+ size, buffer.nativeInstanceId, buffer.memoryAddress);
+ BufferLedger ledger = am.associate(allocator);
+ ArrowBuf buf = new ArrowBuf(ledger, null, size, buffer.memoryAddress);
+ buffers.add(buf);
+ }
+
+ try {
+ final int numRows = LargeMemoryUtil.checkedCastToInt(handle.getNumRows());
+ peek = new ArrowRecordBatch(numRows, handle.getFields().stream()
+ .map(field -> new ArrowFieldNode(field.length, field.nullCount))
+ .collect(Collectors.toList()), buffers);
+ return true;
+ } finally {
+ buffers.forEach(buffer -> buffer.getReferenceManager().release());
+ }
+ }
+
+ @Override
+ public ArrowRecordBatch next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ try {
+ return peek;
+ } finally {
+ peek = null;
+ }
+ }
+ };
+ }
+
+ @Override
+ public Iterable<? extends NativeScanTask> scan() {
+ if (closed) {
+ throw new NativeInstanceReleasedException();
+ }
+ return Collections.singletonList(new NativeScanTask(this));
+ }
+
+ @Override
+ public Schema schema() {
+ readLock.lock();
+ try {
+ if (closed) {
+ throw new NativeInstanceReleasedException();
+ }
+ return SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ writeLock.lock();
+ try {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ JniWrapper.get().closeScanner(scannerId);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java
new file mode 100644
index 000000000..f1ffdd2ac
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+/**
+ * Listener of buffer memory reservation. Used by native datasets.
+ */
+public interface ReservationListener {
+
+ /**
+ * Reserve bytes.
+ *
+ * @throws RuntimeException if request size cannot be granted
+ */
+ void reserve(long size);
+
+ /**
+ * Unreserve bytes.
+ */
+ void unreserve(long size);
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java
new file mode 100644
index 000000000..f5a1af384
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.scanner;
+
+import java.util.Optional;
+
+import org.apache.arrow.util.Preconditions;
+
+/**
+ * Options used during scanning.
+ */
+public class ScanOptions {
+ private final Optional<String[]> columns;
+ private final long batchSize;
+
+ /**
+ * Constructor.
+ * @param columns Projected columns. Empty for scanning all columns.
+ * @param batchSize Maximum row number of each returned {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
+ *
+ * @deprecated Deprecated. Use {@link #ScanOptions(long, Optional)} instead.
+ */
+ @Deprecated
+ public ScanOptions(String[] columns, long batchSize) {
+ this(batchSize, Optional.of(columns).map(present -> {
+ if (present.length == 0) {
+ // Backwards compatibility: See ARROW-13257, in the new constructor, we now use null to scan for all columns.
+ return null;
+ }
+ return present;
+ }));
+ }
+
+ /**
+ * Constructor.
+ * @param batchSize Maximum row number of each returned {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch}
+ * @param columns (Optional) Projected columns. {@link Optional#empty()} for scanning all columns. Otherwise,
+ * Only columns present in the Array will be scanned.
+ */
+ public ScanOptions(long batchSize, Optional<String[]> columns) {
+ Preconditions.checkNotNull(columns);
+ this.batchSize = batchSize;
+ this.columns = columns;
+ }
+
+ public ScanOptions(long batchSize) {
+ this(batchSize, Optional.empty());
+ }
+
+ public Optional<String[]> getColumns() {
+ return columns;
+ }
+
+ public long getBatchSize() {
+ return batchSize;
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java
new file mode 100644
index 000000000..d07036a61
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.scanner;
+
+import java.util.Iterator;
+
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+
+/**
+ * Read record batches from a range of a single data fragment. A
+ * ScanTask is meant to be a unit of work to be dispatched. The implementation
+ * must be thread and concurrent safe.
+ */
+public interface ScanTask extends AutoCloseable {
+
+ /**
+ * Creates and returns a {@link BatchIterator} instance.
+ */
+ BatchIterator execute();
+
+ /**
+ * The iterator implementation for {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch}s.
+ */
+ interface BatchIterator extends Iterator<ArrowRecordBatch>, AutoCloseable {
+
+ }
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java
new file mode 100644
index 000000000..93a1b08f3
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.scanner;
+
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/**
+ * A high level interface for scanning data over dataset.
+ */
+public interface Scanner extends AutoCloseable {
+
+ /**
+ * Perform the scan operation.
+ *
+ * @return a iterable set of {@link ScanTask}s. Each task is considered independent and it is allowed
+ * to execute the tasks concurrently to gain better performance.
+ */
+ Iterable<? extends ScanTask> scan();
+
+ /**
+ * Get the schema of this Scanner.
+ *
+ * @return the schema instance
+ */
+ Schema schema();
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java
new file mode 100644
index 000000000..ce193581f
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.source;
+
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+
+/**
+ * A container of Fragments which are the internal iterable unit of read data.
+ */
+public interface Dataset extends AutoCloseable {
+
+ /**
+ * Create a new Scanner using the provided scan options.
+ *
+ * @param options options used during creating Scanner
+ * @return the Scanner instance
+ */
+ Scanner newScan(ScanOptions options);
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java
new file mode 100644
index 000000000..46b8545d6
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.source;
+
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/**
+ * DatasetFactory provides a way to inspect a Dataset potential
+ * schema before materializing it. Thus, the user can peek the schema for
+ * data sources and decide on a unified schema.
+ */
+public interface DatasetFactory extends AutoCloseable {
+
+ /**
+ * Get unified schema for the resulting Dataset.
+ *
+ * @return the schema object inspected
+ */
+ Schema inspect();
+
+ /**
+ * Create a Dataset with auto-inferred schema. Which means, the schema of the resulting Dataset will be
+ * the same with calling {@link #inspect()} manually.
+ *
+ * @return the Dataset instance
+ */
+ Dataset finish();
+
+ /**
+ * Create a Dataset with predefined schema. Schema inference will not be performed.
+ *
+ * @param schema a predefined schema
+ * @return the Dataset instance
+ */
+ Dataset finish(Schema schema);
+}
diff --git a/src/arrow/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java b/src/arrow/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
new file mode 100644
index 000000000..963fb6170
--- /dev/null
+++ b/src/arrow/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import org.apache.arrow.dataset.jni.JniWrapper;
+
+/**
+ * AllocationManager implementation for native allocated memory.
+ */
+public class NativeUnderlyingMemory extends AllocationManager {
+
+ private final int size;
+ private final long nativeInstanceId;
+ private final long address;
+
+ /**
+ * Constructor.
+ *
+ * @param accountingAllocator The accounting allocator instance
+ * @param size Size of underlying memory (in bytes)
+ * @param nativeInstanceId ID of the native instance
+ */
+ NativeUnderlyingMemory(BufferAllocator accountingAllocator, int size, long nativeInstanceId, long address) {
+ super(accountingAllocator);
+ this.size = size;
+ this.nativeInstanceId = nativeInstanceId;
+ this.address = address;
+ // pre-allocate bytes on accounting allocator
+ final AllocationListener listener = accountingAllocator.getListener();
+ try (final AllocationReservation reservation = accountingAllocator.newReservation()) {
+ listener.onPreAllocation(size);
+ reservation.reserve(size);
+ listener.onAllocation(size);
+ } catch (Exception e) {
+ release0();
+ throw e;
+ }
+ }
+
+ /**
+ * Alias to constructor.
+ */
+ public static NativeUnderlyingMemory create(BufferAllocator bufferAllocator, int size, long nativeInstanceId,
+ long address) {
+ return new NativeUnderlyingMemory(bufferAllocator, size, nativeInstanceId, address);
+ }
+
+ public BufferLedger associate(BufferAllocator allocator) {
+ return super.associate(allocator);
+ }
+
+ @Override
+ protected void release0() {
+ JniWrapper.get().releaseBuffer(nativeInstanceId);
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ protected long memoryAddress() {
+ return address;
+ }
+}
diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java
new file mode 100644
index 000000000..c6299d135
--- /dev/null
+++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.util.Preconditions;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+
+/**
+ * Utility class for writing Parquet files using Avro based tools.
+ */
+public class ParquetWriteSupport implements AutoCloseable {
+
+ private final String path;
+ private final String uri;
+ private final ParquetWriter<GenericRecord> writer;
+ private final Schema avroSchema;
+ private final List<GenericRecord> writtenRecords = new ArrayList<>();
+ private final GenericRecordListBuilder recordListBuilder = new GenericRecordListBuilder();
+
+
+ public ParquetWriteSupport(String schemaName, File outputFolder) throws Exception {
+ avroSchema = readSchemaFromFile(schemaName);
+ path = outputFolder.getPath() + File.separator + "generated.parquet";
+ uri = "file://" + path;
+ writer = AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(path))
+ .withSchema(avroSchema)
+ .build();
+ }
+
+ private static Schema readSchemaFromFile(String schemaName) throws Exception {
+ Path schemaPath = Paths.get(ParquetWriteSupport.class.getResource("/").getPath(),
+ "avroschema", schemaName);
+ return new org.apache.avro.Schema.Parser().parse(schemaPath.toFile());
+ }
+
+ public static ParquetWriteSupport writeTempFile(String schemaName, File outputFolder,
+ Object... values) throws Exception {
+ try (final ParquetWriteSupport writeSupport = new ParquetWriteSupport(schemaName, outputFolder)) {
+ writeSupport.writeRecords(values);
+ return writeSupport;
+ }
+ }
+
+ public void writeRecords(Object... values) throws Exception {
+ final List<GenericRecord> valueList = getRecordListBuilder().createRecordList(values);
+ writeRecords(valueList);
+ }
+
+ public void writeRecords(List<GenericRecord> records) throws Exception {
+ for (GenericRecord record : records) {
+ writeRecord(record);
+ }
+ }
+
+ public void writeRecord(GenericRecord record) throws Exception {
+ writtenRecords.add(record);
+ writer.write(record);
+ }
+
+ public String getOutputURI() {
+ return uri;
+ }
+
+ public Schema getAvroSchema() {
+ return avroSchema;
+ }
+
+ public GenericRecordListBuilder getRecordListBuilder() {
+ return recordListBuilder;
+ }
+
+ public List<GenericRecord> getWrittenRecords() {
+ return Collections.unmodifiableList(writtenRecords);
+ }
+
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ }
+
+ public class GenericRecordListBuilder {
+ public final List<GenericRecord> createRecordList(Object... values) {
+ final int fieldCount = avroSchema.getFields().size();
+ Preconditions.checkArgument(values.length % fieldCount == 0,
+ "arg count of values should be divide by field number");
+ final List<GenericRecord> recordList = new ArrayList<>();
+ for (int i = 0; i < values.length / fieldCount; i++) {
+ final GenericRecord record = new GenericData.Record(avroSchema);
+ for (int j = 0; j < fieldCount; j++) {
+ record.put(j, values[i * fieldCount + j]);
+ }
+ recordList.add(record);
+ }
+ return Collections.unmodifiableList(recordList);
+ }
+ }
+}
diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
new file mode 100644
index 000000000..51dac15e5
--- /dev/null
+++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class TestDataset {
+ private RootAllocator allocator = null;
+
+ @Before
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @After
+ public void tearDown() {
+ allocator.close();
+ }
+
+ protected RootAllocator rootAllocator() {
+ return allocator;
+ }
+
+ protected List<ArrowRecordBatch> collectResultFromFactory(DatasetFactory factory, ScanOptions options) {
+ final Dataset dataset = factory.finish();
+ final Scanner scanner = dataset.newScan(options);
+ final List<ArrowRecordBatch> ret = stream(scanner.scan())
+ .flatMap(t -> stream(t.execute()))
+ .collect(Collectors.toList());
+ try {
+ AutoCloseables.close(scanner, dataset);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return ret;
+ }
+
+ protected Schema inferResultSchemaFromFactory(DatasetFactory factory, ScanOptions options) {
+ final Dataset dataset = factory.finish();
+ final Scanner scanner = dataset.newScan(options);
+ final Schema schema = scanner.schema();
+ try {
+ AutoCloseables.close(scanner, dataset);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return schema;
+ }
+
+ protected <T> Stream<T> stream(Iterable<T> iterable) {
+ return StreamSupport.stream(iterable.spliterator(), false);
+ }
+
+ protected <T> List<T> collect(Iterable<T> iterable) {
+ return stream(iterable).collect(Collectors.toList());
+ }
+
+ protected <T> Stream<T> stream(Iterator<T> iterator) {
+ return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);
+ }
+
+ protected <T> List<T> collect(Iterator<T> iterator) {
+ return stream(iterator).collect(Collectors.toList());
+ }
+}
diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
new file mode 100644
index 000000000..2b99f8283
--- /dev/null
+++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.jni.NativeDataset;
+import org.apache.arrow.dataset.jni.NativeInstanceReleasedException;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.jni.NativeScanTask;
+import org.apache.arrow.dataset.jni.NativeScanner;
+import org.apache.arrow.dataset.jni.TestNativeDataset;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+
+public class TestFileSystemDataset extends TestNativeDataset {
+
+ @ClassRule
+ public static final TemporaryFolder TMP = new TemporaryFolder();
+
+ public static final String AVRO_SCHEMA_USER = "user.avsc";
+
+ @Test
+ public void testBaseParquetRead() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ ScanOptions options = new ScanOptions(100);
+ Schema schema = inferResultSchemaFromFactory(factory, options);
+ List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
+
+ assertSingleTaskProduced(factory, options);
+ assertEquals(1, datum.size());
+ assertEquals(2, schema.getFields().size());
+ assertEquals("id", schema.getFields().get(0).getName());
+ assertEquals("name", schema.getFields().get(1).getName());
+ assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType());
+ assertEquals(Types.MinorType.VARCHAR.getType(), schema.getFields().get(1).getType());
+ checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum);
+
+ AutoCloseables.close(datum);
+ }
+
+ @Test
+ public void testParquetProjectSingleColumn() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ ScanOptions options = new ScanOptions(100, Optional.of(new String[]{"id"}));
+ Schema schema = inferResultSchemaFromFactory(factory, options);
+ List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
+ org.apache.avro.Schema expectedSchema = truncateAvroSchema(writeSupport.getAvroSchema(), 0, 1);
+
+ assertSingleTaskProduced(factory, options);
+ assertEquals(1, schema.getFields().size());
+ assertEquals("id", schema.getFields().get(0).getName());
+ assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType());
+ assertEquals(1, datum.size());
+ checkParquetReadResult(schema,
+ Collections.singletonList(
+ new GenericRecordBuilder(
+ expectedSchema)
+ .set("id", 1)
+ .build()), datum);
+
+ AutoCloseables.close(datum);
+ }
+
+ @Test
+ public void testParquetBatchSize() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(),
+ 1, "a", 2, "b", 3, "c");
+
+ ScanOptions options = new ScanOptions(1);
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ Schema schema = inferResultSchemaFromFactory(factory, options);
+ List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
+
+ assertSingleTaskProduced(factory, options);
+ assertEquals(3, datum.size());
+ datum.forEach(batch -> assertEquals(1, batch.getLength()));
+ checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum);
+
+ AutoCloseables.close(datum);
+ }
+
+ @Test
+ public void testEmptyProjectSelectsZeroColumns() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ ScanOptions options = new ScanOptions(100, Optional.of(new String[0]));
+ Schema schema = inferResultSchemaFromFactory(factory, options);
+ List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
+ org.apache.avro.Schema expectedSchema = org.apache.avro.Schema.createRecord(Collections.emptyList());
+
+ assertSingleTaskProduced(factory, options);
+ assertEquals(0, schema.getFields().size());
+ assertEquals(1, datum.size());
+ checkParquetReadResult(schema,
+ Collections.singletonList(
+ new GenericRecordBuilder(
+ expectedSchema)
+ .build()), datum);
+
+ AutoCloseables.close(datum);
+ }
+
+ @Test
+ public void testNullProjectSelectsAllColumns() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ ScanOptions options = new ScanOptions(100, Optional.empty());
+ Schema schema = inferResultSchemaFromFactory(factory, options);
+ List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
+
+ assertSingleTaskProduced(factory, options);
+ assertEquals(1, datum.size());
+ assertEquals(2, schema.getFields().size());
+ assertEquals("id", schema.getFields().get(0).getName());
+ assertEquals("name", schema.getFields().get(1).getName());
+ assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType());
+ assertEquals(Types.MinorType.VARCHAR.getType(), schema.getFields().get(1).getType());
+ checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum);
+
+ AutoCloseables.close(datum);
+ }
+
+ @Test
+ public void testNoErrorWhenCloseAgain() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+
+ assertDoesNotThrow(() -> {
+ NativeDataset dataset = factory.finish();
+ dataset.close();
+ dataset.close();
+ });
+ }
+
+ @Test
+ public void testErrorThrownWhenScanAgain() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ NativeDataset dataset = factory.finish();
+ ScanOptions options = new ScanOptions(100);
+ NativeScanner scanner = dataset.newScan(options);
+ List<? extends NativeScanTask> taskList1 = collect(scanner.scan());
+ List<? extends NativeScanTask> taskList2 = collect(scanner.scan());
+ NativeScanTask task1 = taskList1.get(0);
+ NativeScanTask task2 = taskList2.get(0);
+ List<ArrowRecordBatch> datum = collect(task1.execute());
+
+ UnsupportedOperationException uoe = assertThrows(UnsupportedOperationException.class, task2::execute);
+ Assertions.assertEquals("NativeScanner cannot be executed more than once. Consider creating new scanner instead",
+ uoe.getMessage());
+
+ AutoCloseables.close(datum);
+ AutoCloseables.close(taskList1);
+ AutoCloseables.close(taskList2);
+ AutoCloseables.close(scanner, dataset, factory);
+ }
+
+ @Test
+ public void testScanInOtherThread() throws Exception {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ NativeDataset dataset = factory.finish();
+ ScanOptions options = new ScanOptions(100);
+ NativeScanner scanner = dataset.newScan(options);
+ List<? extends NativeScanTask> taskList = collect(scanner.scan());
+ NativeScanTask task = taskList.get(0);
+ List<ArrowRecordBatch> datum = executor.submit(() -> collect(task.execute())).get();
+
+ AutoCloseables.close(datum);
+ AutoCloseables.close(taskList);
+ AutoCloseables.close(scanner, dataset, factory);
+ }
+
+ @Test
+ public void testErrorThrownWhenScanAfterScannerClose() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ NativeDataset dataset = factory.finish();
+ ScanOptions options = new ScanOptions(100);
+ NativeScanner scanner = dataset.newScan(options);
+ scanner.close();
+ assertThrows(NativeInstanceReleasedException.class, scanner::scan);
+ }
+
+ @Test
+ public void testErrorThrownWhenExecuteTaskAfterTaskClose() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ NativeDataset dataset = factory.finish();
+ ScanOptions options = new ScanOptions(100);
+ NativeScanner scanner = dataset.newScan(options);
+ List<? extends NativeScanTask> tasks = collect(scanner.scan());
+ NativeScanTask task = tasks.get(0);
+ task.close();
+ assertThrows(NativeInstanceReleasedException.class, task::execute);
+ }
+
+ @Test
+ public void testErrorThrownWhenIterateOnIteratorAfterTaskClose() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ NativeDataset dataset = factory.finish();
+ ScanOptions options = new ScanOptions(100);
+ NativeScanner scanner = dataset.newScan(options);
+ List<? extends NativeScanTask> tasks = collect(scanner.scan());
+ NativeScanTask task = tasks.get(0);
+ ScanTask.BatchIterator iterator = task.execute();
+ task.close();
+ assertThrows(NativeInstanceReleasedException.class, iterator::hasNext);
+ }
+
+ @Test
+ public void testMemoryAllocationOnAssociatedAllocator() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ ScanOptions options = new ScanOptions(100);
+ long initReservation = rootAllocator().getAllocatedMemory();
+ List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
+ final long expected_diff = datum.stream()
+ .flatMapToLong(batch -> batch.getBuffers()
+ .stream()
+ .mapToLong(buf -> buf.getReferenceManager().getAccountedSize())).sum();
+ long reservation = rootAllocator().getAllocatedMemory();
+ AutoCloseables.close(datum);
+ long finalReservation = rootAllocator().getAllocatedMemory();
+ Assert.assertEquals(expected_diff, reservation - initReservation);
+ Assert.assertEquals(-expected_diff, finalReservation - reservation);
+ }
+
+ private void checkParquetReadResult(Schema schema, List<GenericRecord> expected, List<ArrowRecordBatch> actual) {
+ assertEquals(expected.size(), actual.stream()
+ .mapToInt(ArrowRecordBatch::getLength)
+ .sum());
+ final int fieldCount = schema.getFields().size();
+ LinkedList<GenericRecord> expectedRemovable = new LinkedList<>(expected);
+ try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) {
+ VectorLoader loader = new VectorLoader(vsr);
+ for (ArrowRecordBatch batch : actual) {
+ try {
+ assertEquals(fieldCount, batch.getNodes().size());
+ loader.load(batch);
+ int batchRowCount = vsr.getRowCount();
+ for (int i = 0; i < fieldCount; i++) {
+ FieldVector vector = vsr.getVector(i);
+ for (int j = 0; j < batchRowCount; j++) {
+ Object object = vector.getObject(j);
+ Object expectedObject = expectedRemovable.get(j).get(i);
+ assertEquals(Objects.toString(expectedObject),
+ Objects.toString(object));
+ }
+ }
+ for (int i = 0; i < batchRowCount; i++) {
+ expectedRemovable.poll();
+ }
+ } finally {
+ batch.close();
+ }
+ }
+ assertTrue(expectedRemovable.isEmpty());
+ }
+ }
+
+ private org.apache.avro.Schema truncateAvroSchema(org.apache.avro.Schema schema, int from, int to) {
+ List<org.apache.avro.Schema.Field> fields = schema.getFields().subList(from, to);
+ return org.apache.avro.Schema.createRecord(
+ fields.stream()
+ .map(f -> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
+ .collect(Collectors.toList()));
+ }
+}
diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDatasetFactory.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDatasetFactory.java
new file mode 100644
index 000000000..bddf96b5e
--- /dev/null
+++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDatasetFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.Test;
+
+public class TestFileSystemDatasetFactory {
+
+ @Test
+ public void testErrorHandling() {
+ RuntimeException e = assertThrows(RuntimeException.class, () -> {
+ new FileSystemDatasetFactory(new RootAllocator(Long.MAX_VALUE), NativeMemoryPool.getDefault(),
+ FileFormat.NONE, "file:///NON_EXIST_FILE");
+ });
+ assertEquals("illegal file format id: -1", e.getMessage());
+ }
+
+ @Test
+ public void testCloseAgain() {
+ assertDoesNotThrow(() -> {
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(new RootAllocator(Long.MAX_VALUE),
+ NativeMemoryPool.getDefault(), FileFormat.PARQUET, "file:///NON_EXIST_FILE");
+ factory.close();
+ factory.close();
+ });
+ }
+}
diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java
new file mode 100644
index 000000000..2a86a2568
--- /dev/null
+++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.junit.Assert;
+
+public abstract class TestNativeDataset extends TestDataset {
+ protected void assertSingleTaskProduced(DatasetFactory factory, ScanOptions options) {
+ final Dataset dataset = factory.finish();
+ final Scanner scanner = dataset.newScan(options);
+ Assert.assertEquals(1L, stream(scanner.scan()).count());
+ }
+}
diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java
new file mode 100644
index 000000000..2bc1b9a41
--- /dev/null
+++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestReservationListener extends TestDataset {
+
+ @ClassRule
+ public static final TemporaryFolder TMP = new TemporaryFolder();
+
+ public static final String AVRO_SCHEMA_USER = "user.avsc";
+
+ @Test
+ public void testDirectReservationListener() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+ NativeMemoryPool pool = NativeMemoryPool.createListenable(DirectReservationListener.instance());
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(),
+ pool, FileFormat.PARQUET,
+ writeSupport.getOutputURI());
+ ScanOptions options = new ScanOptions(100);
+ long initReservation = DirectReservationListener.instance().getCurrentDirectMemReservation();
+ List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
+ long reservation = DirectReservationListener.instance().getCurrentDirectMemReservation();
+ AutoCloseables.close(datum);
+ AutoCloseables.close(pool);
+ long finalReservation = DirectReservationListener.instance().getCurrentDirectMemReservation();
+ Assert.assertTrue(reservation >= initReservation);
+ Assert.assertTrue(finalReservation == initReservation);
+ }
+
+ @Test
+ public void testCustomReservationListener() throws Exception {
+ ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
+ final AtomicLong reserved = new AtomicLong(0L);
+ ReservationListener listener = new ReservationListener() {
+ @Override
+ public void reserve(long size) {
+ reserved.getAndAdd(size);
+ }
+
+ @Override
+ public void unreserve(long size) {
+ reserved.getAndAdd(-size);
+ }
+ };
+ NativeMemoryPool pool = NativeMemoryPool.createListenable(listener);
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(),
+ pool, FileFormat.PARQUET, writeSupport.getOutputURI());
+ ScanOptions options = new ScanOptions(100);
+ long initReservation = reserved.get();
+ List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
+ long reservation = reserved.get();
+ AutoCloseables.close(datum);
+ AutoCloseables.close(pool);
+ long finalReservation = reserved.get();
+ Assert.assertTrue(reservation >= initReservation);
+ Assert.assertTrue(finalReservation == initReservation);
+ }
+}
diff --git a/src/arrow/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java b/src/arrow/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java
new file mode 100644
index 000000000..c81868e42
--- /dev/null
+++ b/src/arrow/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNativeUnderlyingMemory {
+
+ private RootAllocator allocator = null;
+
+ @Before
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @After
+ public void tearDown() {
+ allocator.close();
+ }
+
+ protected RootAllocator rootAllocator() {
+ return allocator;
+ }
+
+ @Test
+ public void testReservation() {
+ final RootAllocator root = rootAllocator();
+
+ final int size = 512;
+ final AllocationManager am = new MockUnderlyingMemory(root, size);
+ final BufferLedger ledger = am.associate(root);
+
+ assertEquals(size, root.getAllocatedMemory());
+
+ ledger.release();
+ }
+
+ @Test
+ public void testBufferTransfer() {
+ final RootAllocator root = rootAllocator();
+
+ ChildAllocator allocator1 = (ChildAllocator) root.newChildAllocator("allocator1", 0, Long.MAX_VALUE);
+ ChildAllocator allocator2 = (ChildAllocator) root.newChildAllocator("allocator2", 0, Long.MAX_VALUE);
+ assertEquals(0, allocator1.getAllocatedMemory());
+ assertEquals(0, allocator2.getAllocatedMemory());
+
+ final int size = 512;
+ final AllocationManager am = new MockUnderlyingMemory(allocator1, size);
+
+ final BufferLedger owningLedger = am.associate(allocator1);
+ assertEquals(size, owningLedger.getAccountedSize());
+ assertEquals(size, owningLedger.getSize());
+ assertEquals(size, allocator1.getAllocatedMemory());
+
+ final BufferLedger transferredLedger = am.associate(allocator2);
+ owningLedger.release(); // release previous owner
+ assertEquals(0, owningLedger.getAccountedSize());
+ assertEquals(size, owningLedger.getSize());
+ assertEquals(size, transferredLedger.getAccountedSize());
+ assertEquals(size, transferredLedger.getSize());
+ assertEquals(0, allocator1.getAllocatedMemory());
+ assertEquals(size, allocator2.getAllocatedMemory());
+
+ transferredLedger.release();
+ allocator1.close();
+ allocator2.close();
+ }
+
+ /**
+ * A mock class of {@link NativeUnderlyingMemory} for unit testing about size-related operations.
+ */
+ private static class MockUnderlyingMemory extends NativeUnderlyingMemory {
+
+ /**
+ * Constructor.
+ */
+ MockUnderlyingMemory(BaseAllocator accountingAllocator, int size) {
+ super(accountingAllocator, size, -1L, -1L);
+ }
+
+ @Override
+ protected void release0() {
+ System.out.println("Underlying memory released. Size: " + getSize());
+ }
+
+ @Override
+ protected long memoryAddress() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/src/arrow/java/dataset/src/test/resources/avroschema/user.avsc b/src/arrow/java/dataset/src/test/resources/avroschema/user.avsc
new file mode 100644
index 000000000..072b64391
--- /dev/null
+++ b/src/arrow/java/dataset/src/test/resources/avroschema/user.avsc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.dataset",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": ["int", "null"]},
+ {"name": "name", "type": ["string", "null"]}
+ ]
+}