summaryrefslogtreecommitdiffstats
path: root/src/arrow/java/tools
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/java/tools
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/java/tools')
-rw-r--r--src/arrow/java/tools/pom.xml106
-rw-r--r--src/arrow/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java146
-rw-r--r--src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java123
-rw-r--r--src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java78
-rw-r--r--src/arrow/java/tools/src/main/java/org/apache/arrow/tools/Integration.java244
-rw-r--r--src/arrow/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java76
-rw-r--r--src/arrow/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java105
-rw-r--r--src/arrow/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java301
-rw-r--r--src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java65
-rw-r--r--src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java288
-rw-r--r--src/arrow/java/tools/src/test/resources/logback.xml27
11 files changed, 1559 insertions, 0 deletions
diff --git a/src/arrow/java/tools/pom.xml b/src/arrow/java/tools/pom.xml
new file mode 100644
index 000000000..12a19ae2e
--- /dev/null
+++ b/src/arrow/java/tools/pom.xml
@@ -0,0 +1,106 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-java-root</artifactId>
+ <version>6.0.1</version>
+ </parent>
+ <artifactId>arrow-tools</artifactId>
+ <name>Arrow Tools</name>
+ <description>Java applications for working with Arrow ValueVectors.</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${project.version}</version>
+ <classifier>${arrow.vector.classifier}</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-compression</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.4</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.2.3</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <version>${project.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
new file mode 100644
index 000000000..0ddd1e946
--- /dev/null
+++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.tools;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple server that echoes back data received.
+ */
+public class EchoServer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class);
+ private final ServerSocket serverSocket;
+ private boolean closed = false;
+
+ /**
+ * Constructs a new instance that binds to the given port.
+ */
+ public EchoServer(int port) throws IOException {
+ LOGGER.debug("Starting echo server.");
+ serverSocket = new ServerSocket(port);
+ LOGGER.debug("Running echo server on port: " + port());
+ }
+
+ /**
+ * Main method to run the server, the first argument is an optional port number.
+ */
+ public static void main(String[] args) throws Exception {
+ int port;
+ if (args.length > 0) {
+ port = Integer.parseInt(args[0]);
+ } else {
+ port = 8080;
+ }
+ new EchoServer(port).run();
+ }
+
+ public int port() {
+ return serverSocket.getLocalPort();
+ }
+
+ /**
+ * Starts the main server event loop.
+ */
+ public void run() throws IOException {
+ try {
+ Socket clientSocket = null;
+ ClientConnection client = null;
+ while (!closed) {
+ LOGGER.debug("Waiting to accept new client connection.");
+ clientSocket = serverSocket.accept();
+ LOGGER.debug("Accepted new client connection.");
+ client = new ClientConnection(clientSocket);
+ try {
+ client.run();
+ } catch (IOException e) {
+ LOGGER.warn("Error handling client connection.", e);
+ }
+ LOGGER.debug("Closed connection with client");
+ }
+ } catch (java.net.SocketException ex) {
+ if (!closed) {
+ throw ex;
+ }
+ } finally {
+ serverSocket.close();
+ LOGGER.debug("Server closed.");
+ }
+ }
+
+ public void close() throws IOException {
+ closed = true;
+ serverSocket.close();
+ }
+
+ /**
+ * Handler for each client connection to the server.
+ */
+ public static class ClientConnection implements AutoCloseable {
+ public final Socket socket;
+
+ public ClientConnection(Socket socket) {
+ this.socket = socket;
+ }
+
+ /**
+ * Reads a record batch off the socket and writes it back out.
+ */
+ public void run() throws IOException {
+ // Read the entire input stream and write it back
+ try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator);
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ // load the first batch before instantiating the writer so that we have any dictionaries
+ reader.loadNextBatch();
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream());
+ writer.start();
+ int echoed = 0;
+ while (true) {
+ int rowCount = reader.getVectorSchemaRoot().getRowCount();
+ if (rowCount == 0) {
+ break;
+ } else {
+ writer.writeBatch();
+ echoed += rowCount;
+ reader.loadNextBatch();
+ }
+ }
+ writer.end();
+ Preconditions.checkState(reader.bytesRead() == writer.bytesWritten());
+ LOGGER.debug(String.format("Echoed %d records", echoed));
+ reader.close(false);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ socket.close();
+ }
+ }
+}
diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
new file mode 100644
index 000000000..c49b04c85
--- /dev/null
+++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Application that verifies data can be round-tripped through a file.
+ */
+public class FileRoundtrip {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class);
+ private final Options options;
+ private final PrintStream out;
+ private final PrintStream err;
+
+ FileRoundtrip(PrintStream out, PrintStream err) {
+ this.out = out;
+ this.err = err;
+ this.options = new Options();
+ this.options.addOption("i", "in", true, "input file");
+ this.options.addOption("o", "out", true, "output file");
+
+ }
+
+ public static void main(String[] args) {
+ System.exit(new FileRoundtrip(System.out, System.err).run(args));
+ }
+
+ private File validateFile(String type, String fileName) {
+ if (fileName == null) {
+ throw new IllegalArgumentException("missing " + type + " file parameter");
+ }
+ File f = new File(fileName);
+ if (!f.exists() || f.isDirectory()) {
+ throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath());
+ }
+ return f;
+ }
+
+ int run(String[] args) {
+ try {
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args, false);
+
+ String inFileName = cmd.getOptionValue("in");
+ String outFileName = cmd.getOptionValue("out");
+
+ File inFile = validateFile("input", inFileName);
+ File outFile = validateFile("output", outFileName);
+
+ try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ FileInputStream fileInputStream = new FileInputStream(inFile);
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(),
+ allocator)) {
+
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+ Schema schema = root.getSchema();
+ LOGGER.debug("Input file size: " + inFile.length());
+ LOGGER.debug("Found schema: " + schema);
+
+ try (FileOutputStream fileOutputStream = new FileOutputStream(outFile);
+ ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader,
+ fileOutputStream.getChannel())) {
+ arrowWriter.start();
+ while (true) {
+ if (!arrowReader.loadNextBatch()) {
+ break;
+ } else {
+ arrowWriter.writeBatch();
+ }
+ }
+ arrowWriter.end();
+ }
+ LOGGER.debug("Output file size: " + outFile.length());
+ }
+ } catch (ParseException e) {
+ return fatalError("Invalid parameters", e);
+ } catch (IOException e) {
+ return fatalError("Error accessing files", e);
+ }
+ return 0;
+ }
+
+ private int fatalError(String message, Throwable e) {
+ err.println(message);
+ LOGGER.error(message, e);
+ return 1;
+ }
+
+}
diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
new file mode 100644
index 000000000..bb7cedeb7
--- /dev/null
+++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+
+/**
+ * Converts an Arrow file to an Arrow stream. The file should be specified as the
+ * first argument and the output is written to standard out.
+ */
+public class FileToStream {
+ private FileToStream() {}
+
+ /**
+ * Reads an Arrow file from in and writes it back to out.
+ */
+ public static void convert(FileInputStream in, OutputStream out) throws IOException {
+ BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ try (ArrowFileReader reader = new ArrowFileReader(in.getChannel(), allocator)) {
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ // load the first batch before instantiating the writer so that we have any dictionaries
+ // only writeBatches if we loaded one in the first place.
+ boolean writeBatches = reader.loadNextBatch();
+ try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, out)) {
+ writer.start();
+ while (writeBatches) {
+ writer.writeBatch();
+ if (!reader.loadNextBatch()) {
+ break;
+ }
+ }
+ writer.end();
+ }
+ }
+ }
+
+ /**
+ * Main method. The first arg is the file path. The second, optional argument,
+ * is an output file location (defaults to standard out).
+ */
+ public static void main(String[] args) throws IOException {
+ if (args.length != 1 && args.length != 2) {
+ System.err.println("Usage: FileToStream <input file> [output file]");
+ System.exit(1);
+ }
+
+ FileInputStream in = new FileInputStream(new File(args[0]));
+ OutputStream out = args.length == 1 ?
+ System.out : new FileOutputStream(new File(args[1]));
+
+ convert(in, out);
+ }
+}
diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
new file mode 100644
index 000000000..1db3eeb64
--- /dev/null
+++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.arrow.compression.CommonsCompressionFactory;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.JsonFileReader;
+import org.apache.arrow.vector.ipc.JsonFileWriter;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Validator;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Application for cross language integration testing.
+ */
+public class Integration {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Integration.class);
+ private final Options options;
+
+ Integration() {
+ this.options = new Options();
+ this.options.addOption("a", "arrow", true, "arrow file");
+ this.options.addOption("j", "json", true, "json file");
+ this.options.addOption("c", "command", true, "command to execute: " + Arrays.toString(Command
+ .values()));
+ }
+
+ /**
+ * Main method.
+ */
+ public static void main(String[] args) {
+ try {
+ new Integration().run(args);
+ } catch (ParseException e) {
+ fatalError("Invalid parameters", e);
+ } catch (IOException e) {
+ fatalError("Error accessing files", e);
+ } catch (RuntimeException e) {
+ fatalError("Incompatible files", e);
+ }
+ }
+
+ private static void fatalError(String message, Throwable e) {
+ System.err.println(message);
+ System.err.println(e.getMessage());
+ LOGGER.error(message, e);
+ System.exit(1);
+ }
+
+ private File validateFile(String type, String fileName, boolean shouldExist) {
+ if (fileName == null) {
+ throw new IllegalArgumentException("missing " + type + " file parameter");
+ }
+ File f = new File(fileName);
+ if (shouldExist && (!f.exists() || f.isDirectory())) {
+ throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath());
+ }
+ if (!shouldExist && f.exists()) {
+ throw new IllegalArgumentException(type + " file already exists: " + f.getAbsolutePath());
+ }
+ return f;
+ }
+
+ static void extractDictionaryEncodings(List<Field> fields, List<DictionaryEncoding> encodings) {
+ for (Field field : fields) {
+ DictionaryEncoding encoding = field.getDictionary();
+ if (encoding != null) {
+ encodings.add(encoding);
+ }
+
+ extractDictionaryEncodings(field.getChildren(), encodings);
+ }
+ }
+
+ void run(String[] args) throws ParseException, IOException {
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args, false);
+
+ Command command = toCommand(cmd.getOptionValue("command"));
+ File arrowFile = validateFile("arrow", cmd.getOptionValue("arrow"), command.arrowExists);
+ File jsonFile = validateFile("json", cmd.getOptionValue("json"), command.jsonExists);
+ command.execute(arrowFile, jsonFile);
+ }
+
+ private Command toCommand(String commandName) {
+ try {
+ return Command.valueOf(commandName);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Unknown command: " + commandName + " expected one of " +
+ Arrays.toString(Command.values()));
+ }
+ }
+
+ /**
+ * Commands (actions) the application can perform.
+ */
+ enum Command {
+ ARROW_TO_JSON(true, false) {
+ @Override
+ public void execute(File arrowFile, File jsonFile) throws IOException {
+ try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ FileInputStream fileInputStream = new FileInputStream(arrowFile);
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(),
+ allocator)) {
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+ Schema schema = root.getSchema();
+ LOGGER.debug("Input file size: " + arrowFile.length());
+ LOGGER.debug("Found schema: " + schema);
+ try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config()
+ .pretty(true))) {
+ writer.start(schema, arrowReader);
+ for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+ if (!arrowReader.loadRecordBatch(rbBlock)) {
+ throw new IOException("Expected to load record batch");
+ }
+ writer.write(root);
+ }
+ }
+ LOGGER.debug("Output file size: " + jsonFile.length());
+ }
+ }
+ },
+ JSON_TO_ARROW(false, true) {
+ @Override
+ public void execute(File arrowFile, File jsonFile) throws IOException {
+ try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ JsonFileReader reader = new JsonFileReader(jsonFile, allocator)) {
+ Schema schema = reader.start();
+ LOGGER.debug("Input file size: " + jsonFile.length());
+ LOGGER.debug("Found schema: " + schema);
+ try (FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
+ VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+ // TODO json dictionaries
+ ArrowFileWriter arrowWriter = new ArrowFileWriter(root, reader, fileOutputStream
+ .getChannel())) {
+ arrowWriter.start();
+ while (reader.read(root)) {
+ arrowWriter.writeBatch();
+ }
+ arrowWriter.end();
+ }
+ LOGGER.debug("Output file size: " + arrowFile.length());
+ }
+ }
+ },
+ VALIDATE(true, true) {
+ @Override
+ public void execute(File arrowFile, File jsonFile) throws IOException {
+ try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
+ FileInputStream fileInputStream = new FileInputStream(arrowFile);
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(),
+ allocator, CommonsCompressionFactory.INSTANCE)) {
+ Schema jsonSchema = jsonReader.start();
+ VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot();
+ Schema arrowSchema = arrowRoot.getSchema();
+ LOGGER.debug("Arrow Input file size: " + arrowFile.length());
+ LOGGER.debug("ARROW schema: " + arrowSchema);
+ LOGGER.debug("JSON Input file size: " + jsonFile.length());
+ LOGGER.debug("JSON schema: " + jsonSchema);
+ Validator.compareSchemas(jsonSchema, arrowSchema);
+
+ List<ArrowBlock> recordBatches = arrowReader.getRecordBlocks();
+ Iterator<ArrowBlock> iterator = recordBatches.iterator();
+ VectorSchemaRoot jsonRoot;
+ int totalBatches = 0;
+ while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) {
+ ArrowBlock rbBlock = iterator.next();
+ if (!arrowReader.loadRecordBatch(rbBlock)) {
+ throw new IOException("Expected to load record batch");
+ }
+ Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot);
+ jsonRoot.close();
+ totalBatches++;
+ }
+
+ // Validate Dictionaries after ArrowFileReader has read batches
+ List<DictionaryEncoding> encodingsJson = new ArrayList<>();
+ extractDictionaryEncodings(jsonSchema.getFields(), encodingsJson);
+ List<DictionaryEncoding> encodingsArrow = new ArrayList<>();
+ extractDictionaryEncodings(arrowSchema.getFields(), encodingsArrow);
+ Validator.compareDictionaries(encodingsJson, encodingsArrow, jsonReader, arrowReader);
+
+ boolean hasMoreJSON = jsonRoot != null;
+ boolean hasMoreArrow = iterator.hasNext();
+ if (hasMoreJSON || hasMoreArrow) {
+ throw new IllegalArgumentException("Unexpected RecordBatches. Total: " + totalBatches +
+ " J:" + hasMoreJSON + " " +
+ "A:" + hasMoreArrow);
+ }
+ }
+ }
+ };
+
+ public final boolean arrowExists;
+ public final boolean jsonExists;
+
+ Command(boolean arrowExists, boolean jsonExists) {
+ this.arrowExists = arrowExists;
+ this.jsonExists = jsonExists;
+ }
+
+ public abstract void execute(File arrowFile, File jsonFile) throws IOException;
+
+ }
+
+}
diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
new file mode 100644
index 000000000..6bd3c2fba
--- /dev/null
+++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+
+import org.apache.arrow.compression.CommonsCompressionFactory;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+
+/**
+ * Converts an Arrow stream to an Arrow file.
+ */
+public class StreamToFile {
+ /**
+ * Reads an Arrow stream from <code>in</code> and writes it to <code>out</code>.
+ */
+ public static void convert(InputStream in, OutputStream out) throws IOException {
+ BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator, CommonsCompressionFactory.INSTANCE)) {
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ // load the first batch before instantiating the writer so that we have any dictionaries.
+ // Only writeBatches if we load the first one.
+ boolean writeBatches = reader.loadNextBatch();
+ try (ArrowFileWriter writer = new ArrowFileWriter(root, reader, Channels.newChannel(out))) {
+ writer.start();
+ while (writeBatches) {
+ writer.writeBatch();
+ if (!reader.loadNextBatch()) {
+ break;
+ }
+ }
+ writer.end();
+ }
+ }
+ }
+
+ /**
+ * Main method. Defaults to reading from standard in and standard out.
+ * If there are two arguments the first is interpreted as the input file path,
+ * the second is the output file path.
+ */
+ public static void main(String[] args) throws IOException {
+ InputStream in = System.in;
+ OutputStream out = System.out;
+ if (args.length == 2) {
+ in = new FileInputStream(new File(args[0]));
+ out = new FileOutputStream(new File(args[1]));
+ }
+ convert(in, out);
+ }
+}
diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
new file mode 100644
index 000000000..178a0834f
--- /dev/null
+++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.NonNullableStructVector;
+import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter;
+import org.apache.arrow.vector.complex.writer.BigIntWriter;
+import org.apache.arrow.vector.complex.writer.IntWriter;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Assert;
+
+public class ArrowFileTestFixtures {
+ static final int COUNT = 10;
+
+ static void writeData(int count, NonNullableStructVector parent) {
+ ComplexWriter writer = new ComplexWriterImpl("root", parent);
+ StructWriter rootWriter = writer.rootAsStruct();
+ IntWriter intWriter = rootWriter.integer("int");
+ BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt");
+ for (int i = 0; i < count; i++) {
+ intWriter.setPosition(i);
+ intWriter.writeInt(i);
+ bigIntWriter.setPosition(i);
+ bigIntWriter.writeBigInt(i);
+ }
+ writer.setValueCount(count);
+ }
+
+ static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception {
+ // read
+ try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer
+ .MAX_VALUE);
+ FileInputStream fileInputStream = new FileInputStream(testOutFile);
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(),
+ readerAllocator)) {
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+ Schema schema = root.getSchema();
+ for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+ if (!arrowReader.loadRecordBatch(rbBlock)) {
+ throw new IOException("Expected to read record batch");
+ }
+ validateContent(COUNT, root);
+ }
+ }
+ }
+
+ static void validateContent(int count, VectorSchemaRoot root) {
+ Assert.assertEquals(count, root.getRowCount());
+ for (int i = 0; i < count; i++) {
+ Assert.assertEquals(i, root.getVector("int").getObject(i));
+ Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getObject(i));
+ }
+ }
+
+ static void write(FieldVector parent, File file) throws FileNotFoundException, IOException {
+ VectorSchemaRoot root = new VectorSchemaRoot(parent);
+ try (FileOutputStream fileOutputStream = new FileOutputStream(file);
+ ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream
+ .getChannel())) {
+ arrowWriter.writeBatch();
+ }
+ }
+
+
+ static void writeInput(File testInFile, BufferAllocator allocator) throws
+ FileNotFoundException, IOException {
+ int count = ArrowFileTestFixtures.COUNT;
+ try (
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0,
+ Integer.MAX_VALUE);
+ NonNullableStructVector parent = NonNullableStructVector.empty("parent", vectorAllocator)) {
+ writeData(count, parent);
+ write(parent.getChild("root"), testInFile);
+ }
+ }
+}
diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
new file mode 100644
index 000000000..714cb416b
--- /dev/null
+++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.tools;
+
+import static java.util.Arrays.asList;
+import static org.apache.arrow.vector.types.Types.MinorType.TINYINT;
+import static org.apache.arrow.vector.types.Types.MinorType.VARCHAR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.testing.ValueVectorDataPopulator;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class EchoServerTest {
+
+ private static EchoServer server;
+ private static int serverPort;
+ private static Thread serverThread;
+
+ @BeforeClass
+ public static void startEchoServer() throws IOException {
+ server = new EchoServer(0);
+ serverPort = server.port();
+ serverThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ server.run();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ serverThread.start();
+ }
+
+ @AfterClass
+ public static void stopEchoServer() throws IOException, InterruptedException {
+ server.close();
+ serverThread.join();
+ }
+
+ private void testEchoServer(int serverPort,
+ Field field,
+ TinyIntVector vector,
+ int batches)
+ throws UnknownHostException, IOException {
+ VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector), 0);
+ try (BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+ Socket socket = new Socket("localhost", serverPort);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream());
+ ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) {
+ writer.start();
+ for (int i = 0; i < batches; i++) {
+ vector.allocateNew(16);
+ for (int j = 0; j < 8; j++) {
+ vector.set(j, j + i);
+ vector.set(j + 8, 0, (byte) (j + i));
+ }
+ vector.setValueCount(16);
+ root.setRowCount(16);
+ writer.writeBatch();
+ }
+ writer.end();
+
+ assertEquals(new Schema(asList(field)), reader.getVectorSchemaRoot().getSchema());
+
+ TinyIntVector readVector = (TinyIntVector) reader.getVectorSchemaRoot()
+ .getFieldVectors().get(0);
+ for (int i = 0; i < batches; i++) {
+ Assert.assertTrue(reader.loadNextBatch());
+ assertEquals(16, reader.getVectorSchemaRoot().getRowCount());
+ assertEquals(16, readVector.getValueCount());
+ for (int j = 0; j < 8; j++) {
+ assertEquals(j + i, readVector.get(j));
+ assertTrue(readVector.isNull(j + 8));
+ }
+ }
+ Assert.assertFalse(reader.loadNextBatch());
+ assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
+ assertEquals(reader.bytesRead(), writer.bytesWritten());
+ }
+ }
+
+ @Test
+ public void basicTest() throws InterruptedException, IOException {
+ BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+
+ Field field = new Field(
+ "testField",
+ new FieldType(true, new ArrowType.Int(8, true), null, null),
+ Collections.<Field>emptyList());
+ TinyIntVector vector =
+ new TinyIntVector("testField", FieldType.nullable(TINYINT.getType()), alloc);
+ Schema schema = new Schema(asList(field));
+
+ // Try an empty stream, just the header.
+ testEchoServer(serverPort, field, vector, 0);
+
+ // Try with one batch.
+ testEchoServer(serverPort, field, vector, 1);
+
+ // Try with a few
+ testEchoServer(serverPort, field, vector, 10);
+ }
+
+ @Test
+ public void testFlatDictionary() throws IOException {
+ DictionaryEncoding writeEncoding = new DictionaryEncoding(1L, false, null);
+ try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ IntVector writeVector =
+ new IntVector(
+ "varchar",
+ new FieldType(true, MinorType.INT.getType(), writeEncoding, null),
+ allocator);
+ VarCharVector writeDictionaryVector =
+ new VarCharVector(
+ "dict",
+ FieldType.nullable(VARCHAR.getType()),
+ allocator)) {
+
+ ValueVectorDataPopulator.setVector(writeVector, 0, 1, null, 2, 1, 2);
+ ValueVectorDataPopulator.setVector(writeDictionaryVector, "foo".getBytes(StandardCharsets.UTF_8),
+ "bar".getBytes(StandardCharsets.UTF_8), "baz".getBytes(StandardCharsets.UTF_8));
+
+ List<Field> fields = ImmutableList.of(writeVector.getField());
+ List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector);
+ VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 6);
+
+ DictionaryProvider writeProvider = new MapDictionaryProvider(
+ new Dictionary(writeDictionaryVector, writeEncoding));
+
+ try (Socket socket = new Socket("localhost", serverPort);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket
+ .getOutputStream());
+ ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+
+ reader.loadNextBatch();
+ VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+ Assert.assertEquals(6, readerRoot.getRowCount());
+
+ FieldVector readVector = readerRoot.getFieldVectors().get(0);
+ Assert.assertNotNull(readVector);
+
+ DictionaryEncoding readEncoding = readVector.getField().getDictionary();
+ Assert.assertNotNull(readEncoding);
+ Assert.assertEquals(1L, readEncoding.getId());
+
+ Assert.assertEquals(6, readVector.getValueCount());
+ Assert.assertEquals(0, readVector.getObject(0));
+ Assert.assertEquals(1, readVector.getObject(1));
+ Assert.assertEquals(null, readVector.getObject(2));
+ Assert.assertEquals(2, readVector.getObject(3));
+ Assert.assertEquals(1, readVector.getObject(4));
+ Assert.assertEquals(2, readVector.getObject(5));
+
+ Dictionary dictionary = reader.lookup(1L);
+ Assert.assertNotNull(dictionary);
+ VarCharVector dictionaryVector = ((VarCharVector) dictionary.getVector());
+ Assert.assertEquals(3, dictionaryVector.getValueCount());
+ Assert.assertEquals(new Text("foo"), dictionaryVector.getObject(0));
+ Assert.assertEquals(new Text("bar"), dictionaryVector.getObject(1));
+ Assert.assertEquals(new Text("baz"), dictionaryVector.getObject(2));
+ }
+ }
+ }
+
+ @Test
+ public void testNestedDictionary() throws IOException {
+ DictionaryEncoding writeEncoding = new DictionaryEncoding(2L, false, null);
+ try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ VarCharVector writeDictionaryVector =
+ new VarCharVector("dictionary", FieldType.nullable(VARCHAR.getType()), allocator);
+ ListVector writeVector = ListVector.empty("list", allocator)) {
+
+ // data being written:
+ // [['foo', 'bar'], ['foo'], ['bar']] -> [[0, 1], [0], [1]]
+
+ writeDictionaryVector.allocateNew();
+ writeDictionaryVector.set(0, "foo".getBytes(StandardCharsets.UTF_8));
+ writeDictionaryVector.set(1, "bar".getBytes(StandardCharsets.UTF_8));
+ writeDictionaryVector.setValueCount(2);
+
+ writeVector.addOrGetVector(new FieldType(true, MinorType.INT.getType(), writeEncoding, null));
+ writeVector.allocateNew();
+ UnionListWriter listWriter = new UnionListWriter(writeVector);
+ listWriter.startList();
+ listWriter.writeInt(0);
+ listWriter.writeInt(1);
+ listWriter.endList();
+ listWriter.startList();
+ listWriter.writeInt(0);
+ listWriter.endList();
+ listWriter.startList();
+ listWriter.writeInt(1);
+ listWriter.endList();
+ listWriter.setValueCount(3);
+
+ List<Field> fields = ImmutableList.of(writeVector.getField());
+ List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector);
+ VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 3);
+
+ DictionaryProvider writeProvider = new MapDictionaryProvider(
+ new Dictionary(writeDictionaryVector, writeEncoding));
+
+ try (Socket socket = new Socket("localhost", serverPort);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket
+ .getOutputStream());
+ ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+
+ reader.loadNextBatch();
+ VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+ Assert.assertEquals(3, readerRoot.getRowCount());
+
+ ListVector readVector = (ListVector) readerRoot.getFieldVectors().get(0);
+ Assert.assertNotNull(readVector);
+
+ Assert.assertNull(readVector.getField().getDictionary());
+ DictionaryEncoding readEncoding = readVector.getField().getChildren().get(0)
+ .getDictionary();
+ Assert.assertNotNull(readEncoding);
+ Assert.assertEquals(2L, readEncoding.getId());
+
+ Field nestedField = readVector.getField().getChildren().get(0);
+
+ DictionaryEncoding encoding = nestedField.getDictionary();
+ Assert.assertNotNull(encoding);
+ Assert.assertEquals(2L, encoding.getId());
+ Assert.assertEquals(new Int(32, true), encoding.getIndexType());
+
+ Assert.assertEquals(3, readVector.getValueCount());
+ Assert.assertEquals(Arrays.asList(0, 1), readVector.getObject(0));
+ Assert.assertEquals(Arrays.asList(0), readVector.getObject(1));
+ Assert.assertEquals(Arrays.asList(1), readVector.getObject(2));
+
+ Dictionary readDictionary = reader.lookup(2L);
+ Assert.assertNotNull(readDictionary);
+ VarCharVector dictionaryVector = ((VarCharVector) readDictionary.getVector());
+ Assert.assertEquals(2, dictionaryVector.getValueCount());
+ Assert.assertEquals(new Text("foo"), dictionaryVector.getObject(0));
+ Assert.assertEquals(new Text("bar"), dictionaryVector.getObject(1));
+ }
+ }
+ }
+}
diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java
new file mode 100644
index 000000000..ddac6f793
--- /dev/null
+++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.tools;
+
+import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput;
+import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestFileRoundtrip {
+
+ @Rule
+ public TemporaryFolder testFolder = new TemporaryFolder();
+
+ private BufferAllocator allocator;
+
+ @Before
+ public void init() {
+ allocator = new RootAllocator(Integer.MAX_VALUE);
+ }
+
+ @After
+ public void tearDown() {
+ allocator.close();
+ }
+
+ @Test
+ public void test() throws Exception {
+ File testInFile = testFolder.newFile("testIn.arrow");
+ File testOutFile = testFolder.newFile("testOut.arrow");
+
+ writeInput(testInFile, allocator);
+
+ String[] args = {"-i", testInFile.getAbsolutePath(), "-o", testOutFile.getAbsolutePath()};
+ int result = new FileRoundtrip(System.out, System.err).run(args);
+ assertEquals(0, result);
+
+ validateOutput(testOutFile, allocator);
+ }
+
+}
diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
new file mode 100644
index 000000000..1232c6c1d
--- /dev/null
+++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.tools;
+
+import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput;
+import static org.apache.arrow.tools.ArrowFileTestFixtures.write;
+import static org.apache.arrow.tools.ArrowFileTestFixtures.writeData;
+import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Map;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.tools.Integration.Command;
+import org.apache.arrow.vector.complex.NonNullableStructVector;
+import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter;
+import org.apache.arrow.vector.complex.writer.BigIntWriter;
+import org.apache.arrow.vector.complex.writer.Float8Writer;
+import org.apache.arrow.vector.complex.writer.IntWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+public class TestIntegration {
+
+ @Rule
+ public TemporaryFolder testFolder = new TemporaryFolder();
+
+ private BufferAllocator allocator;
+ private ObjectMapper om = new ObjectMapper();
+
+ {
+ DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();
+ prettyPrinter.indentArraysWith(NopIndenter.instance);
+ om.setDefaultPrettyPrinter(prettyPrinter);
+ om.enable(SerializationFeature.INDENT_OUTPUT);
+ om.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
+ }
+
+ static void writeInputFloat(File testInFile, BufferAllocator allocator, double... f) throws
+ FileNotFoundException, IOException {
+ try (
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0,
+ Integer.MAX_VALUE);
+ NonNullableStructVector parent = NonNullableStructVector.empty("parent", vectorAllocator)) {
+ ComplexWriter writer = new ComplexWriterImpl("root", parent);
+ StructWriter rootWriter = writer.rootAsStruct();
+ Float8Writer floatWriter = rootWriter.float8("float");
+ for (int i = 0; i < f.length; i++) {
+ floatWriter.setPosition(i);
+ floatWriter.writeFloat8(f[i]);
+ }
+ writer.setValueCount(f.length);
+ write(parent.getChild("root"), testInFile);
+ }
+ }
+
+ static void writeInput2(File testInFile, BufferAllocator allocator) throws
+ FileNotFoundException, IOException {
+ int count = ArrowFileTestFixtures.COUNT;
+ try (
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0,
+ Integer.MAX_VALUE);
+ NonNullableStructVector parent = NonNullableStructVector.empty("parent", vectorAllocator)) {
+ writeData(count, parent);
+ ComplexWriter writer = new ComplexWriterImpl("root", parent);
+ StructWriter rootWriter = writer.rootAsStruct();
+ IntWriter intWriter = rootWriter.integer("int");
+ BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt");
+ intWriter.setPosition(5);
+ intWriter.writeInt(999);
+ bigIntWriter.setPosition(4);
+ bigIntWriter.writeBigInt(777L);
+ writer.setValueCount(count);
+ write(parent.getChild("root"), testInFile);
+ }
+ }
+
+ @Before
+ public void init() {
+ allocator = new RootAllocator(Integer.MAX_VALUE);
+ }
+
+ @After
+ public void tearDown() {
+ allocator.close();
+ }
+
+ @Test
+ public void testValid() throws Exception {
+ File testInFile = testFolder.newFile("testIn.arrow");
+ File testJSONFile = testFolder.newFile("testOut.json");
+ testJSONFile.delete();
+ File testOutFile = testFolder.newFile("testOut.arrow");
+ testOutFile.delete();
+
+ // generate an arrow file
+ writeInput(testInFile, allocator);
+
+ Integration integration = new Integration();
+
+ // convert it to json
+ String[] args1 = {"-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile
+ .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()};
+ integration.run(args1);
+
+ // convert back to arrow
+ String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile
+ .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()};
+ integration.run(args2);
+
+ // check it is the same
+ validateOutput(testOutFile, allocator);
+
+ // validate arrow against json
+ String[] args3 = {"-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile
+ .getAbsolutePath(), "-command", Command.VALIDATE.name()};
+ integration.run(args3);
+ }
+
+ @Test
+ public void testJSONRoundTripWithVariableWidth() throws Exception {
+ File testJSONFile = new File("../../docs/source/format/integration_json_examples/simple.json").getCanonicalFile();
+ if (!testJSONFile.exists()) {
+ testJSONFile = new File("../docs/source/format/integration_json_examples/simple.json");
+ }
+ File testOutFile = testFolder.newFile("testOut.arrow");
+ File testRoundTripJSONFile = testFolder.newFile("testOut.json");
+ testOutFile.delete();
+ testRoundTripJSONFile.delete();
+
+ Integration integration = new Integration();
+
+ // convert to arrow
+ String[] args1 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile
+ .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()};
+ integration.run(args1);
+
+ // convert back to json
+ String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile
+ .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()};
+ integration.run(args2);
+
+ BufferedReader orig = readNormalized(testJSONFile);
+ BufferedReader rt = readNormalized(testRoundTripJSONFile);
+ String i;
+ String o;
+ int j = 0;
+ while ((i = orig.readLine()) != null && (o = rt.readLine()) != null) {
+ assertEquals("line: " + j, i, o);
+ ++j;
+ }
+ }
+
+ @Test
+ public void testJSONRoundTripWithStruct() throws Exception {
+ File testJSONFile = new File("../../docs/source/format/integration_json_examples/struct.json").getCanonicalFile();
+ if (!testJSONFile.exists()) {
+ testJSONFile = new File("../docs/source/format/integration_json_examples/struct.json");
+ }
+ File testOutFile = testFolder.newFile("testOutStruct.arrow");
+ File testRoundTripJSONFile = testFolder.newFile("testOutStruct.json");
+ testOutFile.delete();
+ testRoundTripJSONFile.delete();
+
+ Integration integration = new Integration();
+
+ // convert to arrow
+ String[] args1 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile
+ .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()};
+ integration.run(args1);
+
+ // convert back to json
+ String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile
+ .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()};
+ integration.run(args2);
+
+ BufferedReader orig = readNormalized(testJSONFile);
+ BufferedReader rt = readNormalized(testRoundTripJSONFile);
+ String i;
+ String o;
+ int j = 0;
+ while ((i = orig.readLine()) != null && (o = rt.readLine()) != null) {
+ assertEquals("line: " + j, i, o);
+ ++j;
+ }
+ }
+
+ private BufferedReader readNormalized(File f) throws IOException {
+ Map<?, ?> tree = om.readValue(f.getCanonicalFile(), Map.class);
+ String normalized = om.writeValueAsString(tree);
+ return new BufferedReader(new StringReader(normalized));
+ }
+
+ /**
+ * The test should not be sensitive to small variations in float representation.
+ */
+ @Test
+ public void testFloat() throws Exception {
+ File testValidInFile = testFolder.newFile("testValidFloatIn.arrow");
+ File testInvalidInFile = testFolder.newFile("testAlsoValidFloatIn.arrow");
+ File testJSONFile = testFolder.newFile("testValidOut.json");
+ testJSONFile.delete();
+
+ // generate an arrow file
+ writeInputFloat(testValidInFile, allocator, 912.4140000000002, 912.414);
+ // generate a different arrow file
+ writeInputFloat(testInvalidInFile, allocator, 912.414, 912.4140000000002);
+
+ Integration integration = new Integration();
+
+ // convert the "valid" file to json
+ String[] args1 = {"-arrow", testValidInFile.getAbsolutePath(), "-json", testJSONFile
+ .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()};
+ integration.run(args1);
+
+ // compare the "invalid" file to the "valid" json
+ String[] args3 = {"-arrow", testInvalidInFile.getAbsolutePath(), "-json", testJSONFile
+ .getAbsolutePath(), "-command", Command.VALIDATE.name()};
+ // this should fail
+ integration.run(args3);
+ }
+
+ @Test
+ public void testInvalid() throws Exception {
+ File testValidInFile = testFolder.newFile("testValidIn.arrow");
+ File testInvalidInFile = testFolder.newFile("testInvalidIn.arrow");
+ File testJSONFile = testFolder.newFile("testInvalidOut.json");
+ testJSONFile.delete();
+
+ // generate an arrow file
+ writeInput(testValidInFile, allocator);
+ // generate a different arrow file
+ writeInput2(testInvalidInFile, allocator);
+
+ Integration integration = new Integration();
+
+ // convert the "valid" file to json
+ String[] args1 = {"-arrow", testValidInFile.getAbsolutePath(), "-json", testJSONFile
+ .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()};
+ integration.run(args1);
+
+ // compare the "invalid" file to the "valid" json
+ String[] args3 = {"-arrow", testInvalidInFile.getAbsolutePath(), "-json", testJSONFile
+ .getAbsolutePath(), "-command", Command.VALIDATE.name()};
+ // this should fail
+ try {
+ integration.run(args3);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("Different values in column"));
+ assertTrue(e.getMessage(), e.getMessage().contains("999"));
+ }
+
+ }
+}
diff --git a/src/arrow/java/tools/src/test/resources/logback.xml b/src/arrow/java/tools/src/test/resources/logback.xml
new file mode 100644
index 000000000..ff848da2a
--- /dev/null
+++ b/src/arrow/java/tools/src/test/resources/logback.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<configuration>
+ <statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+ <logger name="org.apache.arrow" additivity="false">
+ <level value="info" />
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+</configuration>