diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/java/tools | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/java/tools')
11 files changed, 1559 insertions, 0 deletions
diff --git a/src/arrow/java/tools/pom.xml b/src/arrow/java/tools/pom.xml new file mode 100644 index 000000000..12a19ae2e --- /dev/null +++ b/src/arrow/java/tools/pom.xml @@ -0,0 +1,106 @@ +<?xml version="1.0"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-java-root</artifactId> + <version>6.0.1</version> + </parent> + <artifactId>arrow-tools</artifactId> + <name>Arrow Tools</name> + <description>Java applications for working with Arrow ValueVectors.</description> + + <dependencies> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${project.version}</version> + <classifier>${arrow.vector.classifier}</classifier> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-compression</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.4</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.2.3</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java new file mode 100644 index 000000000..0ddd1e946 --- /dev/null +++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple server that echoes back data received. + */ +public class EchoServer { + private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class); + private final ServerSocket serverSocket; + private boolean closed = false; + + /** + * Constructs a new instance that binds to the given port. + */ + public EchoServer(int port) throws IOException { + LOGGER.debug("Starting echo server."); + serverSocket = new ServerSocket(port); + LOGGER.debug("Running echo server on port: " + port()); + } + + /** + * Main method to run the server, the first argument is an optional port number. + */ + public static void main(String[] args) throws Exception { + int port; + if (args.length > 0) { + port = Integer.parseInt(args[0]); + } else { + port = 8080; + } + new EchoServer(port).run(); + } + + public int port() { + return serverSocket.getLocalPort(); + } + + /** + * Starts the main server event loop. + */ + public void run() throws IOException { + try { + Socket clientSocket = null; + ClientConnection client = null; + while (!closed) { + LOGGER.debug("Waiting to accept new client connection."); + clientSocket = serverSocket.accept(); + LOGGER.debug("Accepted new client connection."); + client = new ClientConnection(clientSocket); + try { + client.run(); + } catch (IOException e) { + LOGGER.warn("Error handling client connection.", e); + } + LOGGER.debug("Closed connection with client"); + } + } catch (java.net.SocketException ex) { + if (!closed) { + throw ex; + } + } finally { + serverSocket.close(); + LOGGER.debug("Server closed."); + } + } + + public void close() throws IOException { + closed = true; + serverSocket.close(); + } + + /** + * Handler for each client connection to the server. + */ + public static class ClientConnection implements AutoCloseable { + public final Socket socket; + + public ClientConnection(Socket socket) { + this.socket = socket; + } + + /** + * Reads a record batch off the socket and writes it back out. + */ + public void run() throws IOException { + // Read the entire input stream and write it back + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + // load the first batch before instantiating the writer so that we have any dictionaries + reader.loadNextBatch(); + ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream()); + writer.start(); + int echoed = 0; + while (true) { + int rowCount = reader.getVectorSchemaRoot().getRowCount(); + if (rowCount == 0) { + break; + } else { + writer.writeBatch(); + echoed += rowCount; + reader.loadNextBatch(); + } + } + writer.end(); + Preconditions.checkState(reader.bytesRead() == writer.bytesWritten()); + LOGGER.debug(String.format("Echoed %d records", echoed)); + reader.close(false); + } + } + + @Override + public void close() throws IOException { + socket.close(); + } + } +} diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java new file mode 100644 index 000000000..c49b04c85 --- /dev/null +++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Application that verifies data can be round-tripped through a file. + */ +public class FileRoundtrip { + private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class); + private final Options options; + private final PrintStream out; + private final PrintStream err; + + FileRoundtrip(PrintStream out, PrintStream err) { + this.out = out; + this.err = err; + this.options = new Options(); + this.options.addOption("i", "in", true, "input file"); + this.options.addOption("o", "out", true, "output file"); + + } + + public static void main(String[] args) { + System.exit(new FileRoundtrip(System.out, System.err).run(args)); + } + + private File validateFile(String type, String fileName) { + if (fileName == null) { + throw new IllegalArgumentException("missing " + type + " file parameter"); + } + File f = new File(fileName); + if (!f.exists() || f.isDirectory()) { + throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath()); + } + return f; + } + + int run(String[] args) { + try { + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args, false); + + String inFileName = cmd.getOptionValue("in"); + String outFileName = cmd.getOptionValue("out"); + + File inFile = validateFile("input", inFileName); + File outFile = validateFile("output", outFileName); + + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(inFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + allocator)) { + + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("Input file size: " + inFile.length()); + LOGGER.debug("Found schema: " + schema); + + try (FileOutputStream fileOutputStream = new FileOutputStream(outFile); + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader, + fileOutputStream.getChannel())) { + arrowWriter.start(); + while (true) { + if (!arrowReader.loadNextBatch()) { + break; + } else { + arrowWriter.writeBatch(); + } + } + arrowWriter.end(); + } + LOGGER.debug("Output file size: " + outFile.length()); + } + } catch (ParseException e) { + return fatalError("Invalid parameters", e); + } catch (IOException e) { + return fatalError("Error accessing files", e); + } + return 0; + } + + private int fatalError(String message, Throwable e) { + err.println(message); + LOGGER.error(message, e); + return 1; + } + +} diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java new file mode 100644 index 000000000..bb7cedeb7 --- /dev/null +++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; + +/** + * Converts an Arrow file to an Arrow stream. The file should be specified as the + * first argument and the output is written to standard out. + */ +public class FileToStream { + private FileToStream() {} + + /** + * Reads an Arrow file from in and writes it back to out. + */ + public static void convert(FileInputStream in, OutputStream out) throws IOException { + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + try (ArrowFileReader reader = new ArrowFileReader(in.getChannel(), allocator)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + // load the first batch before instantiating the writer so that we have any dictionaries + // only writeBatches if we loaded one in the first place. + boolean writeBatches = reader.loadNextBatch(); + try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, out)) { + writer.start(); + while (writeBatches) { + writer.writeBatch(); + if (!reader.loadNextBatch()) { + break; + } + } + writer.end(); + } + } + } + + /** + * Main method. The first arg is the file path. The second, optional argument, + * is an output file location (defaults to standard out). + */ + public static void main(String[] args) throws IOException { + if (args.length != 1 && args.length != 2) { + System.err.println("Usage: FileToStream <input file> [output file]"); + System.exit(1); + } + + FileInputStream in = new FileInputStream(new File(args[0])); + OutputStream out = args.length == 1 ? + System.out : new FileOutputStream(new File(args[1])); + + convert(in, out); + } +} diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/Integration.java new file mode 100644 index 000000000..1db3eeb64 --- /dev/null +++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/Integration.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.arrow.compression.CommonsCompressionFactory; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.JsonFileReader; +import org.apache.arrow.vector.ipc.JsonFileWriter; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Validator; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Application for cross language integration testing. + */ +public class Integration { + private static final Logger LOGGER = LoggerFactory.getLogger(Integration.class); + private final Options options; + + Integration() { + this.options = new Options(); + this.options.addOption("a", "arrow", true, "arrow file"); + this.options.addOption("j", "json", true, "json file"); + this.options.addOption("c", "command", true, "command to execute: " + Arrays.toString(Command + .values())); + } + + /** + * Main method. + */ + public static void main(String[] args) { + try { + new Integration().run(args); + } catch (ParseException e) { + fatalError("Invalid parameters", e); + } catch (IOException e) { + fatalError("Error accessing files", e); + } catch (RuntimeException e) { + fatalError("Incompatible files", e); + } + } + + private static void fatalError(String message, Throwable e) { + System.err.println(message); + System.err.println(e.getMessage()); + LOGGER.error(message, e); + System.exit(1); + } + + private File validateFile(String type, String fileName, boolean shouldExist) { + if (fileName == null) { + throw new IllegalArgumentException("missing " + type + " file parameter"); + } + File f = new File(fileName); + if (shouldExist && (!f.exists() || f.isDirectory())) { + throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath()); + } + if (!shouldExist && f.exists()) { + throw new IllegalArgumentException(type + " file already exists: " + f.getAbsolutePath()); + } + return f; + } + + static void extractDictionaryEncodings(List<Field> fields, List<DictionaryEncoding> encodings) { + for (Field field : fields) { + DictionaryEncoding encoding = field.getDictionary(); + if (encoding != null) { + encodings.add(encoding); + } + + extractDictionaryEncodings(field.getChildren(), encodings); + } + } + + void run(String[] args) throws ParseException, IOException { + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args, false); + + Command command = toCommand(cmd.getOptionValue("command")); + File arrowFile = validateFile("arrow", cmd.getOptionValue("arrow"), command.arrowExists); + File jsonFile = validateFile("json", cmd.getOptionValue("json"), command.jsonExists); + command.execute(arrowFile, jsonFile); + } + + private Command toCommand(String commandName) { + try { + return Command.valueOf(commandName); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unknown command: " + commandName + " expected one of " + + Arrays.toString(Command.values())); + } + } + + /** + * Commands (actions) the application can perform. + */ + enum Command { + ARROW_TO_JSON(true, false) { + @Override + public void execute(File arrowFile, File jsonFile) throws IOException { + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(arrowFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + allocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("Input file size: " + arrowFile.length()); + LOGGER.debug("Found schema: " + schema); + try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config() + .pretty(true))) { + writer.start(schema, arrowReader); + for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { + if (!arrowReader.loadRecordBatch(rbBlock)) { + throw new IOException("Expected to load record batch"); + } + writer.write(root); + } + } + LOGGER.debug("Output file size: " + jsonFile.length()); + } + } + }, + JSON_TO_ARROW(false, true) { + @Override + public void execute(File arrowFile, File jsonFile) throws IOException { + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + JsonFileReader reader = new JsonFileReader(jsonFile, allocator)) { + Schema schema = reader.start(); + LOGGER.debug("Input file size: " + jsonFile.length()); + LOGGER.debug("Found schema: " + schema); + try (FileOutputStream fileOutputStream = new FileOutputStream(arrowFile); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + // TODO json dictionaries + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, reader, fileOutputStream + .getChannel())) { + arrowWriter.start(); + while (reader.read(root)) { + arrowWriter.writeBatch(); + } + arrowWriter.end(); + } + LOGGER.debug("Output file size: " + arrowFile.length()); + } + } + }, + VALIDATE(true, true) { + @Override + public void execute(File arrowFile, File jsonFile) throws IOException { + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator); + FileInputStream fileInputStream = new FileInputStream(arrowFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + allocator, CommonsCompressionFactory.INSTANCE)) { + Schema jsonSchema = jsonReader.start(); + VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot(); + Schema arrowSchema = arrowRoot.getSchema(); + LOGGER.debug("Arrow Input file size: " + arrowFile.length()); + LOGGER.debug("ARROW schema: " + arrowSchema); + LOGGER.debug("JSON Input file size: " + jsonFile.length()); + LOGGER.debug("JSON schema: " + jsonSchema); + Validator.compareSchemas(jsonSchema, arrowSchema); + + List<ArrowBlock> recordBatches = arrowReader.getRecordBlocks(); + Iterator<ArrowBlock> iterator = recordBatches.iterator(); + VectorSchemaRoot jsonRoot; + int totalBatches = 0; + while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) { + ArrowBlock rbBlock = iterator.next(); + if (!arrowReader.loadRecordBatch(rbBlock)) { + throw new IOException("Expected to load record batch"); + } + Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot); + jsonRoot.close(); + totalBatches++; + } + + // Validate Dictionaries after ArrowFileReader has read batches + List<DictionaryEncoding> encodingsJson = new ArrayList<>(); + extractDictionaryEncodings(jsonSchema.getFields(), encodingsJson); + List<DictionaryEncoding> encodingsArrow = new ArrayList<>(); + extractDictionaryEncodings(arrowSchema.getFields(), encodingsArrow); + Validator.compareDictionaries(encodingsJson, encodingsArrow, jsonReader, arrowReader); + + boolean hasMoreJSON = jsonRoot != null; + boolean hasMoreArrow = iterator.hasNext(); + if (hasMoreJSON || hasMoreArrow) { + throw new IllegalArgumentException("Unexpected RecordBatches. Total: " + totalBatches + + " J:" + hasMoreJSON + " " + + "A:" + hasMoreArrow); + } + } + } + }; + + public final boolean arrowExists; + public final boolean jsonExists; + + Command(boolean arrowExists, boolean jsonExists) { + this.arrowExists = arrowExists; + this.jsonExists = jsonExists; + } + + public abstract void execute(File arrowFile, File jsonFile) throws IOException; + + } + +} diff --git a/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java new file mode 100644 index 000000000..6bd3c2fba --- /dev/null +++ b/src/arrow/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; + +import org.apache.arrow.compression.CommonsCompressionFactory; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.ArrowStreamReader; + +/** + * Converts an Arrow stream to an Arrow file. + */ +public class StreamToFile { + /** + * Reads an Arrow stream from <code>in</code> and writes it to <code>out</code>. + */ + public static void convert(InputStream in, OutputStream out) throws IOException { + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator, CommonsCompressionFactory.INSTANCE)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + // load the first batch before instantiating the writer so that we have any dictionaries. + // Only writeBatches if we load the first one. + boolean writeBatches = reader.loadNextBatch(); + try (ArrowFileWriter writer = new ArrowFileWriter(root, reader, Channels.newChannel(out))) { + writer.start(); + while (writeBatches) { + writer.writeBatch(); + if (!reader.loadNextBatch()) { + break; + } + } + writer.end(); + } + } + } + + /** + * Main method. Defaults to reading from standard in and standard out. + * If there are two arguments the first is interpreted as the input file path, + * the second is the output file path. + */ + public static void main(String[] args) throws IOException { + InputStream in = System.in; + OutputStream out = System.out; + if (args.length == 2) { + in = new FileInputStream(new File(args[0])); + out = new FileOutputStream(new File(args[1])); + } + convert(in, out); + } +} diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java new file mode 100644 index 000000000..178a0834f --- /dev/null +++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; +import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.message.ArrowBlock; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Assert; + +public class ArrowFileTestFixtures { + static final int COUNT = 10; + + static void writeData(int count, NonNullableStructVector parent) { + ComplexWriter writer = new ComplexWriterImpl("root", parent); + StructWriter rootWriter = writer.rootAsStruct(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + for (int i = 0; i < count; i++) { + intWriter.setPosition(i); + intWriter.writeInt(i); + bigIntWriter.setPosition(i); + bigIntWriter.writeBigInt(i); + } + writer.setValueCount(count); + } + + static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception { + // read + try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer + .MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(testOutFile); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), + readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) { + if (!arrowReader.loadRecordBatch(rbBlock)) { + throw new IOException("Expected to read record batch"); + } + validateContent(COUNT, root); + } + } + } + + static void validateContent(int count, VectorSchemaRoot root) { + Assert.assertEquals(count, root.getRowCount()); + for (int i = 0; i < count; i++) { + Assert.assertEquals(i, root.getVector("int").getObject(i)); + Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getObject(i)); + } + } + + static void write(FieldVector parent, File file) throws FileNotFoundException, IOException { + VectorSchemaRoot root = new VectorSchemaRoot(parent); + try (FileOutputStream fileOutputStream = new FileOutputStream(file); + ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream + .getChannel())) { + arrowWriter.writeBatch(); + } + } + + + static void writeInput(File testInFile, BufferAllocator allocator) throws + FileNotFoundException, IOException { + int count = ArrowFileTestFixtures.COUNT; + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, + Integer.MAX_VALUE); + NonNullableStructVector parent = NonNullableStructVector.empty("parent", vectorAllocator)) { + writeData(count, parent); + write(parent.getChild("root"), testInFile); + } + } +} diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java new file mode 100644 index 000000000..714cb416b --- /dev/null +++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import static java.util.Arrays.asList; +import static org.apache.arrow.vector.types.Types.MinorType.TINYINT; +import static org.apache.arrow.vector.types.Types.MinorType.VARCHAR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.apache.arrow.vector.dictionary.Dictionary; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.testing.ValueVectorDataPopulator; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.ArrowType.Int; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; + +public class EchoServerTest { + + private static EchoServer server; + private static int serverPort; + private static Thread serverThread; + + @BeforeClass + public static void startEchoServer() throws IOException { + server = new EchoServer(0); + serverPort = server.port(); + serverThread = new Thread() { + @Override + public void run() { + try { + server.run(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + serverThread.start(); + } + + @AfterClass + public static void stopEchoServer() throws IOException, InterruptedException { + server.close(); + serverThread.join(); + } + + private void testEchoServer(int serverPort, + Field field, + TinyIntVector vector, + int batches) + throws UnknownHostException, IOException { + VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector), 0); + try (BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + Socket socket = new Socket("localhost", serverPort); + ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream()); + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) { + writer.start(); + for (int i = 0; i < batches; i++) { + vector.allocateNew(16); + for (int j = 0; j < 8; j++) { + vector.set(j, j + i); + vector.set(j + 8, 0, (byte) (j + i)); + } + vector.setValueCount(16); + root.setRowCount(16); + writer.writeBatch(); + } + writer.end(); + + assertEquals(new Schema(asList(field)), reader.getVectorSchemaRoot().getSchema()); + + TinyIntVector readVector = (TinyIntVector) reader.getVectorSchemaRoot() + .getFieldVectors().get(0); + for (int i = 0; i < batches; i++) { + Assert.assertTrue(reader.loadNextBatch()); + assertEquals(16, reader.getVectorSchemaRoot().getRowCount()); + assertEquals(16, readVector.getValueCount()); + for (int j = 0; j < 8; j++) { + assertEquals(j + i, readVector.get(j)); + assertTrue(readVector.isNull(j + 8)); + } + } + Assert.assertFalse(reader.loadNextBatch()); + assertEquals(0, reader.getVectorSchemaRoot().getRowCount()); + assertEquals(reader.bytesRead(), writer.bytesWritten()); + } + } + + @Test + public void basicTest() throws InterruptedException, IOException { + BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + + Field field = new Field( + "testField", + new FieldType(true, new ArrowType.Int(8, true), null, null), + Collections.<Field>emptyList()); + TinyIntVector vector = + new TinyIntVector("testField", FieldType.nullable(TINYINT.getType()), alloc); + Schema schema = new Schema(asList(field)); + + // Try an empty stream, just the header. + testEchoServer(serverPort, field, vector, 0); + + // Try with one batch. + testEchoServer(serverPort, field, vector, 1); + + // Try with a few + testEchoServer(serverPort, field, vector, 10); + } + + @Test + public void testFlatDictionary() throws IOException { + DictionaryEncoding writeEncoding = new DictionaryEncoding(1L, false, null); + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + IntVector writeVector = + new IntVector( + "varchar", + new FieldType(true, MinorType.INT.getType(), writeEncoding, null), + allocator); + VarCharVector writeDictionaryVector = + new VarCharVector( + "dict", + FieldType.nullable(VARCHAR.getType()), + allocator)) { + + ValueVectorDataPopulator.setVector(writeVector, 0, 1, null, 2, 1, 2); + ValueVectorDataPopulator.setVector(writeDictionaryVector, "foo".getBytes(StandardCharsets.UTF_8), + "bar".getBytes(StandardCharsets.UTF_8), "baz".getBytes(StandardCharsets.UTF_8)); + + List<Field> fields = ImmutableList.of(writeVector.getField()); + List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector); + VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 6); + + DictionaryProvider writeProvider = new MapDictionaryProvider( + new Dictionary(writeDictionaryVector, writeEncoding)); + + try (Socket socket = new Socket("localhost", serverPort); + ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket + .getOutputStream()); + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { + writer.start(); + writer.writeBatch(); + writer.end(); + + reader.loadNextBatch(); + VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); + Assert.assertEquals(6, readerRoot.getRowCount()); + + FieldVector readVector = readerRoot.getFieldVectors().get(0); + Assert.assertNotNull(readVector); + + DictionaryEncoding readEncoding = readVector.getField().getDictionary(); + Assert.assertNotNull(readEncoding); + Assert.assertEquals(1L, readEncoding.getId()); + + Assert.assertEquals(6, readVector.getValueCount()); + Assert.assertEquals(0, readVector.getObject(0)); + Assert.assertEquals(1, readVector.getObject(1)); + Assert.assertEquals(null, readVector.getObject(2)); + Assert.assertEquals(2, readVector.getObject(3)); + Assert.assertEquals(1, readVector.getObject(4)); + Assert.assertEquals(2, readVector.getObject(5)); + + Dictionary dictionary = reader.lookup(1L); + Assert.assertNotNull(dictionary); + VarCharVector dictionaryVector = ((VarCharVector) dictionary.getVector()); + Assert.assertEquals(3, dictionaryVector.getValueCount()); + Assert.assertEquals(new Text("foo"), dictionaryVector.getObject(0)); + Assert.assertEquals(new Text("bar"), dictionaryVector.getObject(1)); + Assert.assertEquals(new Text("baz"), dictionaryVector.getObject(2)); + } + } + } + + @Test + public void testNestedDictionary() throws IOException { + DictionaryEncoding writeEncoding = new DictionaryEncoding(2L, false, null); + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + VarCharVector writeDictionaryVector = + new VarCharVector("dictionary", FieldType.nullable(VARCHAR.getType()), allocator); + ListVector writeVector = ListVector.empty("list", allocator)) { + + // data being written: + // [['foo', 'bar'], ['foo'], ['bar']] -> [[0, 1], [0], [1]] + + writeDictionaryVector.allocateNew(); + writeDictionaryVector.set(0, "foo".getBytes(StandardCharsets.UTF_8)); + writeDictionaryVector.set(1, "bar".getBytes(StandardCharsets.UTF_8)); + writeDictionaryVector.setValueCount(2); + + writeVector.addOrGetVector(new FieldType(true, MinorType.INT.getType(), writeEncoding, null)); + writeVector.allocateNew(); + UnionListWriter listWriter = new UnionListWriter(writeVector); + listWriter.startList(); + listWriter.writeInt(0); + listWriter.writeInt(1); + listWriter.endList(); + listWriter.startList(); + listWriter.writeInt(0); + listWriter.endList(); + listWriter.startList(); + listWriter.writeInt(1); + listWriter.endList(); + listWriter.setValueCount(3); + + List<Field> fields = ImmutableList.of(writeVector.getField()); + List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector); + VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 3); + + DictionaryProvider writeProvider = new MapDictionaryProvider( + new Dictionary(writeDictionaryVector, writeEncoding)); + + try (Socket socket = new Socket("localhost", serverPort); + ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket + .getOutputStream()); + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { + writer.start(); + writer.writeBatch(); + writer.end(); + + reader.loadNextBatch(); + VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); + Assert.assertEquals(3, readerRoot.getRowCount()); + + ListVector readVector = (ListVector) readerRoot.getFieldVectors().get(0); + Assert.assertNotNull(readVector); + + Assert.assertNull(readVector.getField().getDictionary()); + DictionaryEncoding readEncoding = readVector.getField().getChildren().get(0) + .getDictionary(); + Assert.assertNotNull(readEncoding); + Assert.assertEquals(2L, readEncoding.getId()); + + Field nestedField = readVector.getField().getChildren().get(0); + + DictionaryEncoding encoding = nestedField.getDictionary(); + Assert.assertNotNull(encoding); + Assert.assertEquals(2L, encoding.getId()); + Assert.assertEquals(new Int(32, true), encoding.getIndexType()); + + Assert.assertEquals(3, readVector.getValueCount()); + Assert.assertEquals(Arrays.asList(0, 1), readVector.getObject(0)); + Assert.assertEquals(Arrays.asList(0), readVector.getObject(1)); + Assert.assertEquals(Arrays.asList(1), readVector.getObject(2)); + + Dictionary readDictionary = reader.lookup(2L); + Assert.assertNotNull(readDictionary); + VarCharVector dictionaryVector = ((VarCharVector) readDictionary.getVector()); + Assert.assertEquals(2, dictionaryVector.getValueCount()); + Assert.assertEquals(new Text("foo"), dictionaryVector.getObject(0)); + Assert.assertEquals(new Text("bar"), dictionaryVector.getObject(1)); + } + } + } +} diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java new file mode 100644 index 000000000..ddac6f793 --- /dev/null +++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput; +import static org.junit.Assert.assertEquals; + +import java.io.File; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestFileRoundtrip { + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + @Test + public void test() throws Exception { + File testInFile = testFolder.newFile("testIn.arrow"); + File testOutFile = testFolder.newFile("testOut.arrow"); + + writeInput(testInFile, allocator); + + String[] args = {"-i", testInFile.getAbsolutePath(), "-o", testOutFile.getAbsolutePath()}; + int result = new FileRoundtrip(System.out, System.err).run(args); + assertEquals(0, result); + + validateOutput(testOutFile, allocator); + } + +} diff --git a/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java new file mode 100644 index 000000000..1232c6c1d --- /dev/null +++ b/src/arrow/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.tools; + +import static org.apache.arrow.tools.ArrowFileTestFixtures.validateOutput; +import static org.apache.arrow.tools.ArrowFileTestFixtures.write; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeData; +import static org.apache.arrow.tools.ArrowFileTestFixtures.writeInput; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.util.Map; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.tools.Integration.Command; +import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; +import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.arrow.vector.complex.writer.Float8Writer; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + +public class TestIntegration { + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private BufferAllocator allocator; + private ObjectMapper om = new ObjectMapper(); + + { + DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter(); + prettyPrinter.indentArraysWith(NopIndenter.instance); + om.setDefaultPrettyPrinter(prettyPrinter); + om.enable(SerializationFeature.INDENT_OUTPUT); + om.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); + } + + static void writeInputFloat(File testInFile, BufferAllocator allocator, double... f) throws + FileNotFoundException, IOException { + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, + Integer.MAX_VALUE); + NonNullableStructVector parent = NonNullableStructVector.empty("parent", vectorAllocator)) { + ComplexWriter writer = new ComplexWriterImpl("root", parent); + StructWriter rootWriter = writer.rootAsStruct(); + Float8Writer floatWriter = rootWriter.float8("float"); + for (int i = 0; i < f.length; i++) { + floatWriter.setPosition(i); + floatWriter.writeFloat8(f[i]); + } + writer.setValueCount(f.length); + write(parent.getChild("root"), testInFile); + } + } + + static void writeInput2(File testInFile, BufferAllocator allocator) throws + FileNotFoundException, IOException { + int count = ArrowFileTestFixtures.COUNT; + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, + Integer.MAX_VALUE); + NonNullableStructVector parent = NonNullableStructVector.empty("parent", vectorAllocator)) { + writeData(count, parent); + ComplexWriter writer = new ComplexWriterImpl("root", parent); + StructWriter rootWriter = writer.rootAsStruct(); + IntWriter intWriter = rootWriter.integer("int"); + BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt"); + intWriter.setPosition(5); + intWriter.writeInt(999); + bigIntWriter.setPosition(4); + bigIntWriter.writeBigInt(777L); + writer.setValueCount(count); + write(parent.getChild("root"), testInFile); + } + } + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + @Test + public void testValid() throws Exception { + File testInFile = testFolder.newFile("testIn.arrow"); + File testJSONFile = testFolder.newFile("testOut.json"); + testJSONFile.delete(); + File testOutFile = testFolder.newFile("testOut.arrow"); + testOutFile.delete(); + + // generate an arrow file + writeInput(testInFile, allocator); + + Integration integration = new Integration(); + + // convert it to json + String[] args1 = {"-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args1); + + // convert back to arrow + String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; + integration.run(args2); + + // check it is the same + validateOutput(testOutFile, allocator); + + // validate arrow against json + String[] args3 = {"-arrow", testInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.VALIDATE.name()}; + integration.run(args3); + } + + @Test + public void testJSONRoundTripWithVariableWidth() throws Exception { + File testJSONFile = new File("../../docs/source/format/integration_json_examples/simple.json").getCanonicalFile(); + if (!testJSONFile.exists()) { + testJSONFile = new File("../docs/source/format/integration_json_examples/simple.json"); + } + File testOutFile = testFolder.newFile("testOut.arrow"); + File testRoundTripJSONFile = testFolder.newFile("testOut.json"); + testOutFile.delete(); + testRoundTripJSONFile.delete(); + + Integration integration = new Integration(); + + // convert to arrow + String[] args1 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; + integration.run(args1); + + // convert back to json + String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args2); + + BufferedReader orig = readNormalized(testJSONFile); + BufferedReader rt = readNormalized(testRoundTripJSONFile); + String i; + String o; + int j = 0; + while ((i = orig.readLine()) != null && (o = rt.readLine()) != null) { + assertEquals("line: " + j, i, o); + ++j; + } + } + + @Test + public void testJSONRoundTripWithStruct() throws Exception { + File testJSONFile = new File("../../docs/source/format/integration_json_examples/struct.json").getCanonicalFile(); + if (!testJSONFile.exists()) { + testJSONFile = new File("../docs/source/format/integration_json_examples/struct.json"); + } + File testOutFile = testFolder.newFile("testOutStruct.arrow"); + File testRoundTripJSONFile = testFolder.newFile("testOutStruct.json"); + testOutFile.delete(); + testRoundTripJSONFile.delete(); + + Integration integration = new Integration(); + + // convert to arrow + String[] args1 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()}; + integration.run(args1); + + // convert back to json + String[] args2 = {"-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args2); + + BufferedReader orig = readNormalized(testJSONFile); + BufferedReader rt = readNormalized(testRoundTripJSONFile); + String i; + String o; + int j = 0; + while ((i = orig.readLine()) != null && (o = rt.readLine()) != null) { + assertEquals("line: " + j, i, o); + ++j; + } + } + + private BufferedReader readNormalized(File f) throws IOException { + Map<?, ?> tree = om.readValue(f.getCanonicalFile(), Map.class); + String normalized = om.writeValueAsString(tree); + return new BufferedReader(new StringReader(normalized)); + } + + /** + * The test should not be sensitive to small variations in float representation. + */ + @Test + public void testFloat() throws Exception { + File testValidInFile = testFolder.newFile("testValidFloatIn.arrow"); + File testInvalidInFile = testFolder.newFile("testAlsoValidFloatIn.arrow"); + File testJSONFile = testFolder.newFile("testValidOut.json"); + testJSONFile.delete(); + + // generate an arrow file + writeInputFloat(testValidInFile, allocator, 912.4140000000002, 912.414); + // generate a different arrow file + writeInputFloat(testInvalidInFile, allocator, 912.414, 912.4140000000002); + + Integration integration = new Integration(); + + // convert the "valid" file to json + String[] args1 = {"-arrow", testValidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args1); + + // compare the "invalid" file to the "valid" json + String[] args3 = {"-arrow", testInvalidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.VALIDATE.name()}; + // this should fail + integration.run(args3); + } + + @Test + public void testInvalid() throws Exception { + File testValidInFile = testFolder.newFile("testValidIn.arrow"); + File testInvalidInFile = testFolder.newFile("testInvalidIn.arrow"); + File testJSONFile = testFolder.newFile("testInvalidOut.json"); + testJSONFile.delete(); + + // generate an arrow file + writeInput(testValidInFile, allocator); + // generate a different arrow file + writeInput2(testInvalidInFile, allocator); + + Integration integration = new Integration(); + + // convert the "valid" file to json + String[] args1 = {"-arrow", testValidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()}; + integration.run(args1); + + // compare the "invalid" file to the "valid" json + String[] args3 = {"-arrow", testInvalidInFile.getAbsolutePath(), "-json", testJSONFile + .getAbsolutePath(), "-command", Command.VALIDATE.name()}; + // this should fail + try { + integration.run(args3); + fail("should have failed"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage(), e.getMessage().contains("Different values in column")); + assertTrue(e.getMessage(), e.getMessage().contains("999")); + } + + } +} diff --git a/src/arrow/java/tools/src/test/resources/logback.xml b/src/arrow/java/tools/src/test/resources/logback.xml new file mode 100644 index 000000000..ff848da2a --- /dev/null +++ b/src/arrow/java/tools/src/test/resources/logback.xml @@ -0,0 +1,27 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<configuration> + <statusListener class="ch.qos.logback.core.status.NopStatusListener"/> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <!-- encoders are assigned the type + ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> + </encoder> + </appender> + <logger name="org.apache.arrow" additivity="false"> + <level value="info" /> + <appender-ref ref="STDOUT" /> + </logger> + +</configuration> |