From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/arrow/java/tools/pom.xml | 106 ++++++++ .../java/org/apache/arrow/tools/EchoServer.java | 146 ++++++++++ .../java/org/apache/arrow/tools/FileRoundtrip.java | 123 +++++++++ .../java/org/apache/arrow/tools/FileToStream.java | 78 ++++++ .../java/org/apache/arrow/tools/Integration.java | 244 +++++++++++++++++ .../java/org/apache/arrow/tools/StreamToFile.java | 76 ++++++ .../apache/arrow/tools/ArrowFileTestFixtures.java | 105 +++++++ .../org/apache/arrow/tools/EchoServerTest.java | 301 +++++++++++++++++++++ .../org/apache/arrow/tools/TestFileRoundtrip.java | 65 +++++ .../org/apache/arrow/tools/TestIntegration.java | 288 ++++++++++++++++++++ .../java/tools/src/test/resources/logback.xml | 27 ++ 11 files changed, 1559 insertions(+) create mode 100644 src/arrow/java/tools/pom.xml create mode 100644 src/arrow/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java create mode 100644 src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java create mode 100644 src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java create mode 100644 src/arrow/java/tools/src/main/java/org/apache/arrow/tools/Integration.java create mode 100644 src/arrow/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java create mode 100644 src/arrow/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java create mode 100644 src/arrow/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java create mode 100644 src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java create mode 100644 src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java create mode 100644 src/arrow/java/tools/src/test/resources/logback.xml (limited to 'src/arrow/java/tools') diff --git a/src/arrow/java/tools/pom.xml b/src/arrow/java/tools/pom.xml new file mode 100644 index 000000000..12a19ae2e --- /dev/null +++ b/src/arrow/java/tools/pom.xml @@ -0,0 +1,106 @@ + + + + 4.0.0 + + org.apache.arrow + arrow-java-root + 6.0.1 + + arrow-tools + Arrow Tools + Java applications for working with Arrow ValueVectors. + + + + org.apache.arrow + arrow-memory-core + ${project.version} + + + org.apache.arrow + arrow-vector + ${project.version} + ${arrow.vector.classifier} + + + org.apache.arrow + arrow-compression + ${project.version} + + + com.google.guava + guava + + + commons-cli + commons-cli + 1.4 + + + ch.qos.logback + logback-classic + 1.2.3 + runtime + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + org.slf4j + slf4j-api + + + org.apache.arrow + arrow-memory-netty + ${project.version} + runtime + + + org.apache.arrow + arrow-vector + ${project.version} + tests + test-jar + test + + + + + + + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java new file mode 100644 index 000000000..0ddd1e946 --- /dev/null +++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple server that echoes back data received. + */ +public class EchoServer { + private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class); + private final ServerSocket serverSocket; + private boolean closed = false; + + /** + * Constructs a new instance that binds to the given port. + */ + public EchoServer(int port) throws IOException { + LOGGER.debug("Starting echo server."); + serverSocket = new ServerSocket(port); + LOGGER.debug("Running echo server on port: " + port()); + } + + /** + * Main method to run the server, the first argument is an optional port number. + */ + public static void main(String[] args) throws Exception { + int port; + if (args.length > 0) { + port = Integer.parseInt(args[0]); + } else { + port = 8080; + } + new EchoServer(port).run(); + } + + public int port() { + return serverSocket.getLocalPort(); + } + + /** + * Starts the main server event loop. + */ + public void run() throws IOException { + try { + Socket clientSocket = null; + ClientConnection client = null; + while (!closed) { + LOGGER.debug("Waiting to accept new client connection."); + clientSocket = serverSocket.accept(); + LOGGER.debug("Accepted new client connection."); + client = new ClientConnection(clientSocket); + try { + client.run(); + } catch (IOException e) { + LOGGER.warn("Error handling client connection.", e); + } + LOGGER.debug("Closed connection with client"); + } + } catch (java.net.SocketException ex) { + if (!closed) { + throw ex; + } + } finally { + serverSocket.close(); + LOGGER.debug("Server closed."); + } + } + + public void close() throws IOException { + closed = true; + serverSocket.close(); + } + + /** + * Handler for each client connection to the server. + */ + public static class ClientConnection implements AutoCloseable { + public final Socket socket; + + public ClientConnection(Socket socket) { + this.socket = socket; + } + + /** + * Reads a record batch off the socket and writes it back out. + */ + public void run() throws IOException { + // Read the entire input stream and write it back + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + // load the first batch before instantiating the writer so that we have any dictionaries + reader.loadNextBatch(); + ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream()); + writer.start(); + int echoed = 0; + while (true) { + int rowCount = reader.getVectorSchemaRoot().getRowCount(); + if (rowCount == 0) { + break; + } else { + writer.writeBatch(); + echoed += rowCount; + reader.loadNextBatch(); + } + } + writer.end(); + Preconditions.checkState(reader.bytesRead() == writer.bytesWritten()); + LOGGER.debug(String.format("Echoed %d records", echoed)); + reader.close(false); + } + } + + @Override + public void close() throws IOException { + socket.close(); + } + } +} diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java new file mode 100644 index 000000000..c49b04c85 --- /dev/null +++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Application that verifies data can be round-tripped through a file. + */ +public class FileRoundtrip { + private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class); + private final Options options; + private final PrintStream out; + private final PrintStream err; + + FileRoundtrip(PrintStream out, PrintStream err) { + this.out = out; + this.err = err; + this.options = new Options(); + this.options.addOption("i", "in", true, "input file"); + this.options.addOption("o", "out", true, "output file"); + + } + + public static void main(String[] args) { + System.exit(new FileRoundtrip(System.out, System.err).run(args)); + } + + private File validateFile(String type, String fileName) { + if (fileName == null) { + throw new IllegalArgumentException("missing " + type + " file parameter"); + } + File f = new File(fileName); + if (!f.exists() || f.isDirectory()) { + throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath()); + } + return f; + } + + int run(String[] args) { + try { + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args, false); + + String inFileName = cmd.getOptionValue("in"); + String outFileName = cmd.getOptionValue("out"); + + File inFile = validateFile("input", inFileName); + File outFile = validateFile("output", outFileName); + + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(inFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + allocator)) { + + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("Input file size: " + inFile.length()); + LOGGER.debug("Found schema: " + schema); + + try (FileOutputStream fileOutputStream = new FileOutputStream(outFile); + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader, + fileOutputStream.getChannel())) { + arrowWriter.start(); + while (true) { + if (!arrowReader.loadNextBatch()) { + break; + } else { + arrowWriter.writeBatch(); + } + } + arrowWriter.end(); + } + LOGGER.debug("Output file size: " + outFile.length()); + } + } catch (ParseException e) { + return fatalError("Invalid parameters", e); + } catch (IOException e) { + return fatalError("Error accessing files", e); + } + return 0; + } + + private int fatalError(String message, Throwable e) { + err.println(message); + LOGGER.error(message, e); + return 1; + } + +} diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java new file mode 100644 index 000000000..bb7cedeb7 --- /dev/null +++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; + +/** + * Converts an Arrow file to an Arrow stream. The file should be specified as the + * first argument and the output is written to standard out. + */ +public class FileToStream { + private FileToStream() {} + + /** + * Reads an Arrow file from in and writes it back to out. + */ + public static void convert(FileInputStream in, OutputStream out) throws IOException { + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + try (ArrowFileReader reader = new ArrowFileReader(in.getChannel(), allocator)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + // load the first batch before instantiating the writer so that we have any dictionaries + // only writeBatches if we loaded one in the first place. + boolean writeBatches = reader.loadNextBatch(); + try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, out)) { + writer.start(); + while (writeBatches) { + writer.writeBatch(); + if (!reader.loadNextBatch()) { + break; + } + } + writer.end(); + } + } + } + + /** + * Main method. The first arg is the file path. The second, optional argument, + * is an output file location (defaults to standard out). + */ + public static void main(String[] args) throws IOException { + if (args.length != 1 && args.length != 2) { + System.err.println("Usage: FileToStream [output file]"); + System.exit(1); + } + + FileInputStream in = new FileInputStream(new File(args[0])); + OutputStream out = args.length == 1 ? + System.out : new FileOutputStream(new File(args[1])); + + convert(in, out); + } +} diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/Integration.java new file mode 100644 index 000000000..1db3eeb64 --- /dev/null +++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/Integration.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.arrow.compression.CommonsCompressionFactory; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.JsonFileReader; +import org.apache.arrow.vector.ipc.JsonFileWriter; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Validator; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Application for cross language integration testing. + */ +public class Integration { + private static final Logger LOGGER = LoggerFactory.getLogger(Integration.class); + private final Options options; + + Integration() { + this.options = new Options(); + this.options.addOption("a", "arrow", true, "arrow file"); + this.options.addOption("j", "json", true, "json file"); + this.options.addOption("c", "command", true, "command to execute: " + Arrays.toString(Command + .values())); + } + + /** + * Main method. + */ + public static void main(String[] args) { + try { + new Integration().run(args); + } catch (ParseException e) { + fatalError("Invalid parameters", e); + } catch (IOException e) { + fatalError("Error accessing files", e); + } catch (RuntimeException e) { + fatalError("Incompatible files", e); + } + } + + private static void fatalError(String message, Throwable e) { + System.err.println(message); + System.err.println(e.getMessage()); + LOGGER.error(message, e); + System.exit(1); + } + + private File validateFile(String type, String fileName, boolean shouldExist) { + if (fileName == null) { + throw new IllegalArgumentException("missing " + type + " file parameter"); + } + File f = new File(fileName); + if (shouldExist && (!f.exists() || f.isDirectory())) { + throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath()); + } + if (!shouldExist && f.exists()) { + throw new IllegalArgumentException(type + " file already exists: " + f.getAbsolutePath()); + } + return f; + } + + static void extractDictionaryEncodings(List fields, List encodings) { + for (Field field : fields) { + DictionaryEncoding encoding = field.getDictionary(); + if (encoding != null) { + encodings.add(encoding); + } + + extractDictionaryEncodings(field.getChildren(), encodings); + } + } + + void run(String[] args) throws ParseException, IOException { + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args, false); + + Command command = toCommand(cmd.getOptionValue("command")); + File arrowFile = validateFile("arrow", cmd.getOptionValue("arrow"), command.arrowExists); + File jsonFile = validateFile("json", cmd.getOptionValue("json"), command.jsonExists); + command.execute(arrowFile, jsonFile); + } + + private Command toCommand(String commandName) { + try { + return Command.valueOf(commandName); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unknown command: " + commandName + " expected one of " + + Arrays.toString(Command.values())); + } + } + + /** + * Commands (actions) the application can perform. + */ + enum Command { + ARROW_TO_JSON(true, false) { + @Override + public void execute(File arrowFile, File jsonFile) throws IOException { + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(arrowFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + allocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("Input file size: " + arrowFile.length()); + LOGGER.debug("Found schema: " + schema); + try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config() + .pretty(true))) { + writer.start(schema, arrowReader); + for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { + if (!arrowReader.loadRecordBatch(rbBlock)) { + throw new IOException("Expected to load record batch"); + } + writer.write(root); + } + } + LOGGER.debug("Output file size: " + jsonFile.length()); + } + } + }, + JSON_TO_ARROW(false, true) { + @Override + public void execute(File arrowFile, File jsonFile) throws IOException { + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + JsonFileReader reader = new JsonFileReader(jsonFile, allocator)) { + Schema schema = reader.start(); + LOGGER.debug("Input file size: " + jsonFile.length()); + LOGGER.debug("Found schema: " + schema); + try (FileOutputStream fileOutputStream = new FileOutputStream(arrowFile); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + // TODO json dictionaries + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, reader, fileOutputStream + .getChannel())) { + arrowWriter.start(); + while (reader.read(root)) { + arrowWriter.writeBatch(); + } + arrowWriter.end(); + } + LOGGER.debug("Output file size: " + arrowFile.length()); + } + } + }, + VALIDATE(true, true) { + @Override + public void execute(File arrowFile, File jsonFile) throws IOException { + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator); + FileInputStream fileInputStream = new FileInputStream(arrowFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + allocator, CommonsCompressionFactory.INSTANCE)) { + Schema jsonSchema = jsonReader.start(); + VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot(); + Schema arrowSchema = arrowRoot.getSchema(); + LOGGER.debug("Arrow Input file size: " + arrowFile.length()); + LOGGER.debug("ARROW schema: " + arrowSchema); + LOGGER.debug("JSON Input file size: " + jsonFile.length()); + LOGGER.debug("JSON schema: " + jsonSchema); + Validator.compareSchemas(jsonSchema, arrowSchema); + + List recordBatches = arrowReader.getRecordBlocks(); + Iterator iterator = recordBatches.iterator(); + VectorSchemaRoot jsonRoot; + int totalBatches = 0; + while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) { + ArrowBlock rbBlock = iterator.next(); + if (!arrowReader.loadRecordBatch(rbBlock)) { + throw new IOException("Expected to load record batch"); + } + Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot); + jsonRoot.close(); + totalBatches++; + } + + // Validate Dictionaries after ArrowFileReader has read batches + List encodingsJson = new ArrayList<>(); + extractDictionaryEncodings(jsonSchema.getFields(), encodingsJson); + List encodingsArrow = new ArrayList<>(); + extractDictionaryEncodings(arrowSchema.getFields(), encodingsArrow); + Validator.compareDictionaries(encodingsJson, encodingsArrow, jsonReader, arrowReader); + + boolean hasMoreJSON = jsonRoot != null; + boolean hasMoreArrow = iterator.hasNext(); + if (hasMoreJSON || hasMoreArrow) { + throw new IllegalArgumentException("Unexpected RecordBatches. Total: " + totalBatches + + " J:" + hasMoreJSON + " " + + "A:" + hasMoreArrow); + } + } + } + }; + + public final boolean arrowExists; + public final boolean jsonExists; + + Command(boolean arrowExists, boolean jsonExists) { + this.arrowExists = arrowExists; + this.jsonExists = jsonExists; + } + + public abstract void execute(File arrowFile, File jsonFile) throws IOException; + + } + +} diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java new file mode 100644 index 000000000..6bd3c2fba --- /dev/null +++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; + +import org.apache.arrow.compression.CommonsCompressionFactory; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.ArrowStreamReader; + +/** + * Converts an Arrow stream to an Arrow file. + */ +public class StreamToFile { + /** + * Reads an Arrow stream from in and writes it to out. + */ + public static void convert(InputStream in, OutputStream out) throws IOException { + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator, CommonsCompressionFactory.INSTANCE)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + // load the first batch before instantiating the writer so that we have any dictionaries. + // Only writeBatches if we load the first one. + boolean writeBatches = reader.loadNextBatch(); + try (ArrowFileWriter writer = new ArrowFileWriter(root, reader, Channels.newChannel(out))) { + writer.start(); + while (writeBatches) { + writer.writeBatch(); + if (!reader.loadNextBatch()) { + break; + } + } + writer.end(); + } + } + } + + /** + * Main method. Defaults to reading from standard in and standard out. + * If there are two arguments the first is interpreted as the input file path, + * the second is the output file path. + */ + public static void main(String[] args) throws IOException { + InputStream in = System.in; + OutputStream out = System.out; + if (args.length == 2) { + in = new FileInputStream(new File(args[0])); + out = new FileOutputStream(new File(args[1])); + } + convert(in, out); + } +} diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java new file mode 100644 index 000000000..178a0834f --- /dev/null +++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; +import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Assert; + +public class ArrowFileTestFixtures { + static final int COUNT = 10; + + static void writeData(int count, NonNullableStructVector parent) { + ComplexWriter writer = new ComplexWriterImpl("root", parent); + StructWriter rootWriter = writer.rootAsStruct(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + for (int i = 0; i < count; i++) { + intWriter.setPosition(i); + intWriter.writeInt(i); + bigIntWriter.setPosition(i); + bigIntWriter.writeBigInt(i); + } + writer.setValueCount(count); + } + + static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception { + // read + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer + .MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(testOutFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { + if (!arrowReader.loadRecordBatch(rbBlock)) { + throw new IOException("Expected to read record batch"); + } + validateContent(COUNT, root); + } + } + } + + static void validateContent(int count, VectorSchemaRoot root) { + Assert.assertEquals(count, root.getRowCount()); + for (int i = 0; i < count; i++) { + Assert.assertEquals(i, root.getVector("int").getObject(i)); + Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getObject(i)); + } + } + + static void write(FieldVector parent, File file) throws FileNotFoundException, IOException { + VectorSchemaRoot root = new VectorSchemaRoot(parent); + try (FileOutputStream fileOutputStream = new FileOutputStream(file); + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream + .getChannel())) { + arrowWriter.writeBatch(); + } + } + + + static void writeInput(File testInFile, BufferAllocator allocator) throws + FileNotFoundException, IOException { + int count = ArrowFileTestFixtures.COUNT; + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, + Integer.MAX_VALUE); + NonNullableStructVector parent = NonNullableStructVector.empty("parent", vectorAllocator)) { + writeData(count, parent); + write(parent.getChild("root"), testInFile); + } + } +} diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java new file mode 100644 index 000000000..714cb416b --- /dev/null +++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import static java.util.Arrays.asList; +import static org.apache.arrow.vector.types.Types.MinorType.TINYINT; +import static org.apache.arrow.vector.types.Types.MinorType.VARCHAR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.apache.arrow.vector.dictionary.Dictionary; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.testing.ValueVectorDataPopulator; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.ArrowType.Int; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; + +public class EchoServerTest { + + private static EchoServer server; + private static int serverPort; + private static Thread serverThread; + + @BeforeClass + public static void startEchoServer() throws IOException { + server = new EchoServer(0); + serverPort = server.port(); + serverThread = new Thread() { + @Override + public void run() { + try { + server.run(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + serverThread.start(); + } + + @AfterClass + public static void stopEchoServer() throws IOException, InterruptedException { + server.close(); + serverThread.join(); + } + + private void testEchoServer(int serverPort, + Field field, + TinyIntVector vector, + int batches) + throws UnknownHostException, IOException { + VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector), 0); + try (BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + Socket socket = new Socket("localhost", serverPort); + ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream()); + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) { + writer.start(); + for (int i = 0; i < batches; i++) { + vector.allocateNew(16); + for (int j = 0; j < 8; j++) { + vector.set(j, j + i); + vector.set(j + 8, 0, (byte) (j + i)); + } + vector.setValueCount(16); + root.setRowCount(16); + writer.writeBatch(); + } + writer.end(); + + assertEquals(new Schema(asList(field)), reader.getVectorSchemaRoot().getSchema()); + + TinyIntVector readVector = (TinyIntVector) reader.getVectorSchemaRoot() + .getFieldVectors().get(0); + for (int i = 0; i < batches; i++) { + Assert.assertTrue(reader.loadNextBatch()); + assertEquals(16, reader.getVectorSchemaRoot().getRowCount()); + assertEquals(16, readVector.getValueCount()); + for (int j = 0; j < 8; j++) { + assertEquals(j + i, readVector.get(j)); + assertTrue(readVector.isNull(j + 8)); + } + } + Assert.assertFalse(reader.loadNextBatch()); + assertEquals(0, reader.getVectorSchemaRoot().getRowCount()); + assertEquals(reader.bytesRead(), writer.bytesWritten()); + } + } + + @Test + public void basicTest() throws InterruptedException, IOException { + BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + + Field field = new Field( + "testField", + new FieldType(true, new ArrowType.Int(8, true), null, null), + Collections.emptyList()); + TinyIntVector vector = + new TinyIntVector("testField", FieldType.nullable(TINYINT.getType()), alloc); + Schema schema = new Schema(asList(field)); + + // Try an empty stream, just the header. + testEchoServer(serverPort, field, vector, 0); + + // Try with one batch. + testEchoServer(serverPort, field, vector, 1); + + // Try with a few + testEchoServer(serverPort, field, vector, 10); + } + + @Test + public void testFlatDictionary() throws IOException { + DictionaryEncoding writeEncoding = new DictionaryEncoding(1L, false, null); + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + IntVector writeVector = + new IntVector( + "varchar", + new FieldType(true, MinorType.INT.getType(), writeEncoding, null), + allocator); + VarCharVector writeDictionaryVector = + new VarCharVector( + "dict", + FieldType.nullable(VARCHAR.getType()), + allocator)) { + + ValueVectorDataPopulator.setVector(writeVector, 0, 1, null, 2, 1, 2); + ValueVectorDataPopulator.setVector(writeDictionaryVector, "foo".getBytes(StandardCharsets.UTF_8), + "bar".getBytes(StandardCharsets.UTF_8), "baz".getBytes(StandardCharsets.UTF_8)); + + List fields = ImmutableList.of(writeVector.getField()); + List vectors = ImmutableList.of((FieldVector) writeVector); + VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 6); + + DictionaryProvider writeProvider = new MapDictionaryProvider( + new Dictionary(writeDictionaryVector, writeEncoding)); + + try (Socket socket = new Socket("localhost", serverPort); + ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket + .getOutputStream()); + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { + writer.start(); + writer.writeBatch(); + writer.end(); + + reader.loadNextBatch(); + VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); + Assert.assertEquals(6, readerRoot.getRowCount()); + + FieldVector readVector = readerRoot.getFieldVectors().get(0); + Assert.assertNotNull(readVector); + + DictionaryEncoding readEncoding = readVector.getField().getDictionary(); + Assert.assertNotNull(readEncoding); + Assert.assertEquals(1L, readEncoding.getId()); + + Assert.assertEquals(6, readVector.getValueCount()); + Assert.assertEquals(0, readVector.getObject(0)); + Assert.assertEquals(1, readVector.getObject(1)); + Assert.assertEquals(null, readVector.getObject(2)); + Assert.assertEquals(2, readVector.getObject(3)); + Assert.assertEquals(1, readVector.getObject(4)); + Assert.assertEquals(2, readVector.getObject(5)); + + Dictionary dictionary = reader.lookup(1L); + Assert.assertNotNull(dictionary); + VarCharVector dictionaryVector = ((VarCharVector) dictionary.getVector()); + Assert.assertEquals(3, dictionaryVector.getValueCount()); + Assert.assertEquals(new Text("foo"), dictionaryVector.getObject(0)); + Assert.assertEquals(new Text("bar"), dictionaryVector.getObject(1)); + Assert.assertEquals(new Text("baz"), dictionaryVector.getObject(2)); + } + } + } + + @Test + public void testNestedDictionary() throws IOException { + DictionaryEncoding writeEncoding = new DictionaryEncoding(2L, false, null); + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + VarCharVector writeDictionaryVector = + new VarCharVector("dictionary", FieldType.nullable(VARCHAR.getType()), allocator); + ListVector writeVector = ListVector.empty("list", allocator)) { + + // data being written: + // [['foo', 'bar'], ['foo'], ['bar']] -> [[0, 1], [0], [1]] + + writeDictionaryVector.allocateNew(); + writeDictionaryVector.set(0, "foo".getBytes(StandardCharsets.UTF_8)); + writeDictionaryVector.set(1, "bar".getBytes(StandardCharsets.UTF_8)); + writeDictionaryVector.setValueCount(2); + + writeVector.addOrGetVector(new FieldType(true, MinorType.INT.getType(), writeEncoding, null)); + writeVector.allocateNew(); + UnionListWriter listWriter = new UnionListWriter(writeVector); + listWriter.startList(); + listWriter.writeInt(0); + listWriter.writeInt(1); + listWriter.endList(); + listWriter.startList(); + listWriter.writeInt(0); + listWriter.endList(); + listWriter.startList(); + listWriter.writeInt(1); + listWriter.endList(); + listWriter.setValueCount(3); + + List fields = ImmutableList.of(writeVector.getField()); + List vectors = ImmutableList.of((FieldVector) writeVector); + VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 3); + + DictionaryProvider writeProvider = new MapDictionaryProvider( + new Dictionary(writeDictionaryVector, writeEncoding)); + + try (Socket socket = new Socket("localhost", serverPort); + ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket + .getOutputStream()); + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { + writer.start(); + writer.writeBatch(); + writer.end(); + + reader.loadNextBatch(); + VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); + Assert.assertEquals(3, readerRoot.getRowCount()); + + ListVector readVector = (ListVector) readerRoot.getFieldVectors().get(0); + Assert.assertNotNull(readVector); + + Assert.assertNull(readVector.getField().getDictionary()); + DictionaryEncoding readEncoding = readVector.getField().getChildren().get(0) + .getDictionary(); + Assert.assertNotNull(readEncoding); + Assert.assertEquals(2L, readEncoding.getId()); + + Field nestedField = readVector.getField().getChildren().get(0); + + DictionaryEncoding encoding = nestedField.getDictionary(); + Assert.assertNotNull(encoding); + Assert.assertEquals(2L, encoding.getId()); + Assert.assertEquals(new Int(32, true), encoding.getIndexType()); + + Assert.assertEquals(3, readVector.getValueCount()); + Assert.assertEquals(Arrays.asList(0, 1), readVector.getObject(0)); + Assert.assertEquals(Arrays.asList(0), readVector.getObject(1)); + Assert.assertEquals(Arrays.asList(1), readVector.getObject(2)); + + Dictionary readDictionary = reader.lookup(2L); + Assert.assertNotNull(readDictionary); + VarCharVector dictionaryVector = ((VarCharVector) readDictionary.getVector()); + Assert.assertEquals(2, dictionaryVector.getValueCount()); + Assert.assertEquals(new Text("foo"), dictionaryVector.getObject(0)); + Assert.assertEquals(new Text("bar"), dictionaryVector.getObject(1)); + } + } + } +} diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java new file mode 100644 index 000000000..ddac6f793 --- /dev/null +++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput; +import static org.junit.Assert.assertEquals; + +import java.io.File; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestFileRoundtrip { + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + @Test + public void test() throws Exception { + File testInFile = testFolder.newFile("testIn.arrow"); + File testOutFile = testFolder.newFile("testOut.arrow"); + + writeInput(testInFile, allocator); + + String[] args = {"-i", testInFile.getAbsolutePath(), "-o", testOutFile.getAbsolutePath()}; + int result = new FileRoundtrip(System.out, System.err).run(args); + assertEquals(0, result); + + validateOutput(testOutFile, allocator); + } + +} diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java new file mode 100644 index 000000000..1232c6c1d --- /dev/null +++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput; +import static org.apache.arrow.tools.ArrowFileTestFixtures.write; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeData; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.util.Map; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.tools.Integration.Command; +import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; +import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.arrow.vector.complex.writer.Float8Writer; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + +public class TestIntegration { + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private BufferAllocator allocator; + private ObjectMapper om = new ObjectMapper(); + + { + DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter(); + prettyPrinter.indentArraysWith(NopIndenter.instance); + om.setDefaultPrettyPrinter(prettyPrinter); + om.enable(SerializationFeature.INDENT_OUTPUT); + om.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); + } + + static void writeInputFloat(File testInFile, BufferAllocator allocator, double... f) throws + FileNotFoundException, IOException { + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, + Integer.MAX_VALUE); + NonNullableStructVector parent = NonNullableStructVector.empty("parent", vectorAllocator)) { + ComplexWriter writer = new ComplexWriterImpl("root", parent); + StructWriter rootWriter = writer.rootAsStruct(); + Float8Writer floatWriter = rootWriter.float8("float"); + for (int i = 0; i < f.length; i++) { + floatWriter.setPosition(i); + floatWriter.writeFloat8(f[i]); + } + writer.setValueCount(f.length); + write(parent.getChild("root"), testInFile); + } + } + + static void writeInput2(File testInFile, BufferAllocator allocator) throws + FileNotFoundException, IOException { + int count = ArrowFileTestFixtures.COUNT; + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, + Integer.MAX_VALUE); + NonNullableStructVector parent = NonNullableStructVector.empty("parent", vectorAllocator)) { + writeData(count, parent); + ComplexWriter writer = new ComplexWriterImpl("root", parent); + StructWriter rootWriter = writer.rootAsStruct(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + intWriter.setPosition(5); + intWriter.writeInt(999); + bigIntWriter.setPosition(4); + bigIntWriter.writeBigInt(777L); + writer.setValueCount(count); + write(parent.getChild("root"), testInFile); + } + } + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + @Test + public void testValid() throws Exception { + File testInFile = testFolder.newFile("testIn.arrow"); + File testJSONFile = testFolder.newFile("testOut.json"); + testJSONFile.delete(); + File testOutFile = testFolder.newFile("testOut.arrow"); + testOutFile.delete(); + + // generate an arrow file + writeInput(testInFile, allocator); + + Integration integration = new Integration(); + + // convert it to json + String[] args1 = {"-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args1); + + // convert back to arrow + String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; + integration.run(args2); + + // check it is the same + validateOutput(testOutFile, allocator); + + // validate arrow against json + String[] args3 = {"-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.VALIDATE.name()}; + integration.run(args3); + } + + @Test + public void testJSONRoundTripWithVariableWidth() throws Exception { + File testJSONFile = new File("../../docs/source/format/integration_json_examples/simple.json").getCanonicalFile(); + if (!testJSONFile.exists()) { + testJSONFile = new File("../docs/source/format/integration_json_examples/simple.json"); + } + File testOutFile = testFolder.newFile("testOut.arrow"); + File testRoundTripJSONFile = testFolder.newFile("testOut.json"); + testOutFile.delete(); + testRoundTripJSONFile.delete(); + + Integration integration = new Integration(); + + // convert to arrow + String[] args1 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; + integration.run(args1); + + // convert back to json + String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args2); + + BufferedReader orig = readNormalized(testJSONFile); + BufferedReader rt = readNormalized(testRoundTripJSONFile); + String i; + String o; + int j = 0; + while ((i = orig.readLine()) != null && (o = rt.readLine()) != null) { + assertEquals("line: " + j, i, o); + ++j; + } + } + + @Test + public void testJSONRoundTripWithStruct() throws Exception { + File testJSONFile = new File("../../docs/source/format/integration_json_examples/struct.json").getCanonicalFile(); + if (!testJSONFile.exists()) { + testJSONFile = new File("../docs/source/format/integration_json_examples/struct.json"); + } + File testOutFile = testFolder.newFile("testOutStruct.arrow"); + File testRoundTripJSONFile = testFolder.newFile("testOutStruct.json"); + testOutFile.delete(); + testRoundTripJSONFile.delete(); + + Integration integration = new Integration(); + + // convert to arrow + String[] args1 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; + integration.run(args1); + + // convert back to json + String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args2); + + BufferedReader orig = readNormalized(testJSONFile); + BufferedReader rt = readNormalized(testRoundTripJSONFile); + String i; + String o; + int j = 0; + while ((i = orig.readLine()) != null && (o = rt.readLine()) != null) { + assertEquals("line: " + j, i, o); + ++j; + } + } + + private BufferedReader readNormalized(File f) throws IOException { + Map tree = om.readValue(f.getCanonicalFile(), Map.class); + String normalized = om.writeValueAsString(tree); + return new BufferedReader(new StringReader(normalized)); + } + + /** + * The test should not be sensitive to small variations in float representation. + */ + @Test + public void testFloat() throws Exception { + File testValidInFile = testFolder.newFile("testValidFloatIn.arrow"); + File testInvalidInFile = testFolder.newFile("testAlsoValidFloatIn.arrow"); + File testJSONFile = testFolder.newFile("testValidOut.json"); + testJSONFile.delete(); + + // generate an arrow file + writeInputFloat(testValidInFile, allocator, 912.4140000000002, 912.414); + // generate a different arrow file + writeInputFloat(testInvalidInFile, allocator, 912.414, 912.4140000000002); + + Integration integration = new Integration(); + + // convert the "valid" file to json + String[] args1 = {"-arrow", testValidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args1); + + // compare the "invalid" file to the "valid" json + String[] args3 = {"-arrow", testInvalidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.VALIDATE.name()}; + // this should fail + integration.run(args3); + } + + @Test + public void testInvalid() throws Exception { + File testValidInFile = testFolder.newFile("testValidIn.arrow"); + File testInvalidInFile = testFolder.newFile("testInvalidIn.arrow"); + File testJSONFile = testFolder.newFile("testInvalidOut.json"); + testJSONFile.delete(); + + // generate an arrow file + writeInput(testValidInFile, allocator); + // generate a different arrow file + writeInput2(testInvalidInFile, allocator); + + Integration integration = new Integration(); + + // convert the "valid" file to json + String[] args1 = {"-arrow", testValidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args1); + + // compare the "invalid" file to the "valid" json + String[] args3 = {"-arrow", testInvalidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.VALIDATE.name()}; + // this should fail + try { + integration.run(args3); + fail("should have failed"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage(), e.getMessage().contains("Different values in column")); + assertTrue(e.getMessage(), e.getMessage().contains("999")); + } + + } +} diff --git a/src/arrow/java/tools/src/test/resources/logback.xml b/src/arrow/java/tools/src/test/resources/logback.xml new file mode 100644 index 000000000..ff848da2a --- /dev/null +++ b/src/arrow/java/tools/src/test/resources/logback.xml @@ -0,0 +1,27 @@ + + + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + -- cgit v1.2.3