summaryrefslogtreecommitdiffstats
path: root/src/arrow/java/adapter/avro
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/java/adapter/avro')
-rw-r--r--src/arrow/java/adapter/avro/pom.xml59
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java67
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfig.java86
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfigBuilder.java74
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java805
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java186
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java74
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java43
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java49
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java42
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroEnumConsumer.java43
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java46
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java42
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java42
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java42
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java79
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java39
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java48
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java76
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java86
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/BaseAvroConsumer.java65
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java73
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java71
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipConsumer.java67
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipFunction.java30
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDateConsumer.java43
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDecimalConsumer.java88
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMicroConsumer.java43
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMillisConsumer.java43
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMicrosConsumer.java43
-rw-r--r--src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMillisConsumer.java43
-rw-r--r--src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroLogicalTypesTest.java201
-rw-r--r--src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroSkipFieldTest.java626
-rw-r--r--src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroTestBase.java228
-rw-r--r--src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowIteratorTest.java313
-rw-r--r--src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java477
-rw-r--r--src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/TestWriteReadAvroRecord.java93
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_enum_attrs.avsc24
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_fixed_attr.avsc24
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_record_attrs.avsc37
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_date.avsc23
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid1.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid2.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid3.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid4.avsc26
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_bytes.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_fixed.avsc26
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_micros.avsc23
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_millis.avsc23
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_micros.avsc23
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_millis.avsc23
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_before.avsc27
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_expected.avsc26
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base1.avsc28
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base2.avsc29
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_boolean_expected.avsc28
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_bytes_expected.avsc27
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_double_expected.avsc28
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_enum_expected.avsc27
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_fixed_expected.avsc27
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_float_expected.avsc28
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_int_expected.avsc28
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_long_expected.avsc28
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_before.avsc27
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_expected.avsc26
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_multi_fields_expected.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_before.avsc38
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_expected.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_second_level_expected.avsc34
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_single_field_expected.avsc26
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_string_expected.avsc27
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_third_level_expected.avsc45
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_before.avsc28
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_multi_fields_expected.avsc27
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_nullable_field_expected.avsc27
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_one_field_expected.avsc27
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test.avsc27
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_array.avsc23
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_fixed.avsc23
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_large_data.avsc75
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_map.avsc23
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_nested_record.avsc35
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc25
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc22
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc22
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc22
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_enum.avsc23
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc22
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc22
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc22
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc22
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_record.avsc27
-rw-r--r--src/arrow/java/adapter/avro/src/test/resources/schema/test_union.avsc25
100 files changed, 6275 insertions, 0 deletions
diff --git a/src/arrow/java/adapter/avro/pom.xml b/src/arrow/java/adapter/avro/pom.xml
new file mode 100644
index 000000000..1f3fea849
--- /dev/null
+++ b/src/arrow/java/adapter/avro/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-java-root</artifactId>
+ <version>6.0.1</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>arrow-avro</artifactId>
+ <name>Arrow AVRO Adapter</name>
+ <description>(Contrib/Experimental) A library for converting Avro data to Arrow data.</description>
+ <url>http://maven.apache.org</url>
+
+ <dependencies>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory-core -->
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory-netty -->
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <version>${project.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-vector -->
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${dep.avro.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java
new file mode 100644
index 000000000..9fb5ce291
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import java.io.IOException;
+
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Utility class to convert Avro objects to columnar Arrow format objects.
+ */
+public class AvroToArrow {
+
+ /**
+ * Fetch the data from {@link Decoder} and convert it to Arrow objects.
+ * Only for testing purpose.
+ * @param schema avro schema.
+ * @param decoder avro decoder
+ * @param config configuration of the conversion.
+ * @return Arrow Data Objects {@link VectorSchemaRoot}
+ */
+ static VectorSchemaRoot avroToArrow(Schema schema, Decoder decoder, AvroToArrowConfig config)
+ throws IOException {
+ Preconditions.checkNotNull(schema, "Avro schema object can not be null");
+ Preconditions.checkNotNull(decoder, "Avro decoder object can not be null");
+ Preconditions.checkNotNull(config, "config can not be null");
+
+ return AvroToArrowUtils.avroToArrowVectors(schema, decoder, config);
+ }
+
+ /**
+ * Fetch the data from {@link Decoder} and iteratively convert it to Arrow objects.
+ * @param schema avro schema
+ * @param decoder avro decoder
+ * @param config configuration of the conversion.
+ * @throws IOException on error
+ */
+ public static AvroToArrowVectorIterator avroToArrowIterator(
+ Schema schema,
+ Decoder decoder,
+ AvroToArrowConfig config) throws IOException {
+
+ Preconditions.checkNotNull(schema, "Avro schema object can not be null");
+ Preconditions.checkNotNull(decoder, "Avro decoder object can not be null");
+ Preconditions.checkNotNull(config, "config can not be null");
+
+ return AvroToArrowVectorIterator.create(decoder, schema, config);
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfig.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfig.java
new file mode 100644
index 000000000..4f59ef384
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import java.util.Set;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+
+/**
+ * This class configures the Avro-to-Arrow conversion process.
+ */
+public class AvroToArrowConfig {
+
+ private final BufferAllocator allocator;
+ /**
+ * The maximum rowCount to read each time when partially convert data.
+ * Default value is 1024 and -1 means read all data into one vector.
+ */
+ private final int targetBatchSize;
+
+ /**
+ * The dictionary provider used for enum type.
+ * If avro schema has enum type, will create dictionary and update this provider.
+ */
+ private final DictionaryProvider.MapDictionaryProvider provider;
+
+ /**
+ * The field names which to skip when reading decoder values.
+ */
+ private final Set<String> skipFieldNames;
+
+ /**
+ * Instantiate an instance.
+ * @param allocator The memory allocator to construct the Arrow vectors with.
+ * @param targetBatchSize The maximum rowCount to read each time when partially convert data.
+ * @param provider The dictionary provider used for enum type, adapter will update this provider.
+ * @param skipFieldNames Field names which to skip.
+ */
+ AvroToArrowConfig(
+ BufferAllocator allocator,
+ int targetBatchSize,
+ DictionaryProvider.MapDictionaryProvider provider,
+ Set<String> skipFieldNames) {
+
+ Preconditions.checkArgument(targetBatchSize == AvroToArrowVectorIterator.NO_LIMIT_BATCH_SIZE ||
+ targetBatchSize > 0, "invalid targetBatchSize: %s", targetBatchSize);
+
+ this.allocator = allocator;
+ this.targetBatchSize = targetBatchSize;
+ this.provider = provider;
+ this.skipFieldNames = skipFieldNames;
+ }
+
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
+
+ public int getTargetBatchSize() {
+ return targetBatchSize;
+ }
+
+ public DictionaryProvider.MapDictionaryProvider getProvider() {
+ return provider;
+ }
+
+ public Set<String> getSkipFieldNames() {
+ return skipFieldNames;
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfigBuilder.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfigBuilder.java
new file mode 100644
index 000000000..474c1eb5c
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfigBuilder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+
+/**
+ * This class builds {@link AvroToArrowConfig}s.
+ */
+public class AvroToArrowConfigBuilder {
+
+ private BufferAllocator allocator;
+
+ private int targetBatchSize;
+
+ private DictionaryProvider.MapDictionaryProvider provider;
+
+ private Set<String> skipFieldNames;
+
+ /**
+ * Default constructor for the {@link AvroToArrowConfigBuilder}.
+ */
+ public AvroToArrowConfigBuilder(BufferAllocator allocator) {
+ this.allocator = allocator;
+ this.targetBatchSize = AvroToArrowVectorIterator.DEFAULT_BATCH_SIZE;
+ this.provider = new DictionaryProvider.MapDictionaryProvider();
+ this.skipFieldNames = new HashSet<>();
+ }
+
+ public AvroToArrowConfigBuilder setTargetBatchSize(int targetBatchSize) {
+ this.targetBatchSize = targetBatchSize;
+ return this;
+ }
+
+ public AvroToArrowConfigBuilder setProvider(DictionaryProvider.MapDictionaryProvider provider) {
+ this.provider = provider;
+ return this;
+ }
+
+ public AvroToArrowConfigBuilder setSkipFieldNames(Set<String> skipFieldNames) {
+ this.skipFieldNames = skipFieldNames;
+ return this;
+ }
+
+ /**
+ * This builds the {@link AvroToArrowConfig} from the provided params.
+ */
+ public AvroToArrowConfig build() {
+ return new AvroToArrowConfig(
+ allocator,
+ targetBatchSize,
+ provider,
+ skipFieldNames);
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
new file mode 100644
index 000000000..80293c8b8
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE;
+import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.consumers.AvroArraysConsumer;
+import org.apache.arrow.consumers.AvroBooleanConsumer;
+import org.apache.arrow.consumers.AvroBytesConsumer;
+import org.apache.arrow.consumers.AvroDoubleConsumer;
+import org.apache.arrow.consumers.AvroEnumConsumer;
+import org.apache.arrow.consumers.AvroFixedConsumer;
+import org.apache.arrow.consumers.AvroFloatConsumer;
+import org.apache.arrow.consumers.AvroIntConsumer;
+import org.apache.arrow.consumers.AvroLongConsumer;
+import org.apache.arrow.consumers.AvroMapConsumer;
+import org.apache.arrow.consumers.AvroNullConsumer;
+import org.apache.arrow.consumers.AvroStringConsumer;
+import org.apache.arrow.consumers.AvroStructConsumer;
+import org.apache.arrow.consumers.AvroUnionsConsumer;
+import org.apache.arrow.consumers.CompositeAvroConsumer;
+import org.apache.arrow.consumers.Consumer;
+import org.apache.arrow.consumers.SkipConsumer;
+import org.apache.arrow.consumers.SkipFunction;
+import org.apache.arrow.consumers.logical.AvroDateConsumer;
+import org.apache.arrow.consumers.logical.AvroDecimalConsumer;
+import org.apache.arrow.consumers.logical.AvroTimeMicroConsumer;
+import org.apache.arrow.consumers.logical.AvroTimeMillisConsumer;
+import org.apache.arrow.consumers.logical.AvroTimestampMicrosConsumer;
+import org.apache.arrow.consumers.logical.AvroTimestampMillisConsumer;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.BaseIntVector;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.complex.UnionVector;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryEncoder;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.UnionMode;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.util.JsonStringArrayList;
+import org.apache.arrow.vector.util.ValueVectorUtility;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Class that does most of the work to convert Avro data into Arrow columnar format Vector objects.
+ */
+public class AvroToArrowUtils {
+
+ /**
+ * Creates a {@link Consumer} from the {@link Schema}
+ *
+ <p>This method currently performs following type mapping for Avro data types to corresponding Arrow data types.
+ *
+ * <ul>
+ * <li>STRING --> ArrowType.Utf8</li>
+ * <li>INT --> ArrowType.Int(32, signed)</li>
+ * <li>LONG --> ArrowType.Int(64, signed)</li>
+ * <li>FLOAT --> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)</li>
+ * <li>DOUBLE --> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)</li>
+ * <li>BOOLEAN --> ArrowType.Bool</li>
+ * <li>BYTES --> ArrowType.Binary</li>
+ * <li>ARRAY --> ArrowType.List</li>
+ * <li>MAP --> ArrowType.Map</li>
+ * <li>FIXED --> ArrowType.FixedSizeBinary</li>
+ * <li>RECORD --> ArrowType.Struct</li>
+ * <li>UNION --> ArrowType.Union</li>
+ * <li>ENUM--> ArrowType.Int</li>
+ * <li>DECIMAL --> ArrowType.Decimal</li>
+ * <li>Date --> ArrowType.Date(DateUnit.DAY)</li>
+ * <li>TimeMillis --> ArrowType.Time(TimeUnit.MILLISECOND, 32)</li>
+ * <li>TimeMicros --> ArrowType.Time(TimeUnit.MICROSECOND, 64)</li>
+ * <li>TimestampMillis --> ArrowType.Timestamp(TimeUnit.MILLISECOND, null)</li>
+ * <li>TimestampMicros --> ArrowType.Timestamp(TimeUnit.MICROSECOND, null)</li>
+ * </ul>
+ */
+
+ private static Consumer createConsumer(Schema schema, String name, AvroToArrowConfig config) {
+ return createConsumer(schema, name, false, config, null);
+ }
+
+ private static Consumer createConsumer(Schema schema, String name, AvroToArrowConfig config, FieldVector vector) {
+ return createConsumer(schema, name, false, config, vector);
+ }
+
+ /**
+ * Create a consumer with the given Avro schema.
+ *
+ * @param schema avro schema
+ * @param name arrow field name
+ * @param consumerVector vector to keep in consumer, if v == null, will create a new vector via field.
+ * @return consumer
+ */
+ private static Consumer createConsumer(
+ Schema schema,
+ String name,
+ boolean nullable,
+ AvroToArrowConfig config,
+ FieldVector consumerVector) {
+
+ Preconditions.checkNotNull(schema, "Avro schema object can't be null");
+ Preconditions.checkNotNull(config, "Config can't be null");
+
+ final BufferAllocator allocator = config.getAllocator();
+
+ final Type type = schema.getType();
+ final LogicalType logicalType = schema.getLogicalType();
+
+ final ArrowType arrowType;
+ final FieldType fieldType;
+ final FieldVector vector;
+ final Consumer consumer;
+
+ switch (type) {
+ case UNION:
+ consumer = createUnionConsumer(schema, name, config, consumerVector);
+ break;
+ case ARRAY:
+ consumer = createArrayConsumer(schema, name, config, consumerVector);
+ break;
+ case MAP:
+ consumer = createMapConsumer(schema, name, config, consumerVector);
+ break;
+ case RECORD:
+ consumer = createStructConsumer(schema, name, config, consumerVector);
+ break;
+ case ENUM:
+ consumer = createEnumConsumer(schema, name, config, consumerVector);
+ break;
+ case STRING:
+ arrowType = new ArrowType.Utf8();
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroStringConsumer((VarCharVector) vector);
+ break;
+ case FIXED:
+ Map<String, String> extProps = createExternalProps(schema);
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema, extProps));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroDecimalConsumer.FixedDecimalConsumer((DecimalVector) vector, schema.getFixedSize());
+ } else {
+ arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize());
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema, extProps));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroFixedConsumer((FixedSizeBinaryVector) vector, schema.getFixedSize());
+ }
+ break;
+ case INT:
+ if (logicalType instanceof LogicalTypes.Date) {
+ arrowType = new ArrowType.Date(DateUnit.DAY);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroDateConsumer((DateDayVector) vector);
+ } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+ arrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroTimeMillisConsumer((TimeMilliVector) vector);
+ } else {
+ arrowType = new ArrowType.Int(32, /*signed=*/true);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroIntConsumer((IntVector) vector);
+ }
+ break;
+ case BOOLEAN:
+ arrowType = new ArrowType.Bool();
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroBooleanConsumer((BitVector) vector);
+ break;
+ case LONG:
+ if (logicalType instanceof LogicalTypes.TimeMicros) {
+ arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroTimeMicroConsumer((TimeMicroVector) vector);
+ } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
+ arrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroTimestampMillisConsumer((TimeStampMilliVector) vector);
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+ arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroTimestampMicrosConsumer((TimeStampMicroVector) vector);
+ } else {
+ arrowType = new ArrowType.Int(64, /*signed=*/true);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroLongConsumer((BigIntVector) vector);
+ }
+ break;
+ case FLOAT:
+ arrowType = new ArrowType.FloatingPoint(SINGLE);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroFloatConsumer((Float4Vector) vector);
+ break;
+ case DOUBLE:
+ arrowType = new ArrowType.FloatingPoint(DOUBLE);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroDoubleConsumer((Float8Vector) vector);
+ break;
+ case BYTES:
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroDecimalConsumer.BytesDecimalConsumer((DecimalVector) vector);
+ } else {
+ arrowType = new ArrowType.Binary();
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(consumerVector, fieldType, name, allocator);
+ consumer = new AvroBytesConsumer((VarBinaryVector) vector);
+ }
+ break;
+ case NULL:
+ arrowType = new ArrowType.Null();
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = fieldType.createNewSingleVector(name, allocator, /*schemaCallback=*/null);
+ consumer = new AvroNullConsumer((NullVector) vector);
+ break;
+ default:
+ // no-op, shouldn't get here
+ throw new UnsupportedOperationException("Can't convert avro type %s to arrow type." + type.getName());
+ }
+ return consumer;
+ }
+
+ private static ArrowType createDecimalArrowType(LogicalTypes.Decimal logicalType) {
+ final int scale = logicalType.getScale();
+ final int precision = logicalType.getPrecision();
+ Preconditions.checkArgument(precision > 0 && precision <= 38,
+ "Precision must be in range of 1 to 38");
+ Preconditions.checkArgument(scale >= 0 && scale <= 38,
+ "Scale must be in range of 0 to 38.");
+ Preconditions.checkArgument(scale <= precision,
+ "Invalid decimal scale: %s (greater than precision: %s)", scale, precision);
+
+ return new ArrowType.Decimal(precision, scale, 128);
+
+ }
+
+ private static Consumer createSkipConsumer(Schema schema) {
+
+ SkipFunction skipFunction;
+ Type type = schema.getType();
+
+ switch (type) {
+ case UNION:
+ List<Consumer> unionDelegates = schema.getTypes().stream().map(s ->
+ createSkipConsumer(s)).collect(Collectors.toList());
+ skipFunction = decoder -> unionDelegates.get(decoder.readInt()).consume(decoder);
+
+ break;
+ case ARRAY:
+ Consumer elementDelegate = createSkipConsumer(schema.getElementType());
+ skipFunction = decoder -> {
+ for (long i = decoder.skipArray(); i != 0; i = decoder.skipArray()) {
+ for (long j = 0; j < i; j++) {
+ elementDelegate.consume(decoder);
+ }
+ }
+ };
+ break;
+ case MAP:
+ Consumer valueDelegate = createSkipConsumer(schema.getValueType());
+ skipFunction = decoder -> {
+ for (long i = decoder.skipMap(); i != 0; i = decoder.skipMap()) {
+ for (long j = 0; j < i; j++) {
+ decoder.skipString(); // Discard key
+ valueDelegate.consume(decoder);
+ }
+ }
+ };
+ break;
+ case RECORD:
+ List<Consumer> delegates = schema.getFields().stream().map(field ->
+ createSkipConsumer(field.schema())).collect(Collectors.toList());
+
+ skipFunction = decoder -> {
+ for (Consumer consumer : delegates) {
+ consumer.consume(decoder);
+ }
+ };
+
+ break;
+ case ENUM:
+ skipFunction = decoder -> decoder.readEnum();
+ break;
+ case STRING:
+ skipFunction = decoder -> decoder.skipString();
+ break;
+ case FIXED:
+ skipFunction = decoder -> decoder.skipFixed(schema.getFixedSize());
+ break;
+ case INT:
+ skipFunction = decoder -> decoder.readInt();
+ break;
+ case BOOLEAN:
+ skipFunction = decoder -> decoder.skipFixed(1);
+ break;
+ case LONG:
+ skipFunction = decoder -> decoder.readLong();
+ break;
+ case FLOAT:
+ skipFunction = decoder -> decoder.readFloat();
+ break;
+ case DOUBLE:
+ skipFunction = decoder -> decoder.readDouble();
+ break;
+ case BYTES:
+ skipFunction = decoder -> decoder.skipBytes();
+ break;
+ case NULL:
+ skipFunction = decoder -> { };
+ break;
+ default:
+ // no-op, shouldn't get here
+ throw new UnsupportedOperationException("Invalid avro type: " + type.getName());
+ }
+
+ return new SkipConsumer(skipFunction);
+ }
+
+ static CompositeAvroConsumer createCompositeConsumer(
+ Schema schema, AvroToArrowConfig config) {
+
+ List<Consumer> consumers = new ArrayList<>();
+ final Set<String> skipFieldNames = config.getSkipFieldNames();
+
+ Schema.Type type = schema.getType();
+ if (type == Type.RECORD) {
+ for (Schema.Field field : schema.getFields()) {
+ if (skipFieldNames.contains(field.name())) {
+ consumers.add(createSkipConsumer(field.schema()));
+ } else {
+ Consumer consumer = createConsumer(field.schema(), field.name(), config);
+ consumers.add(consumer);
+ }
+
+ }
+ } else {
+ Consumer consumer = createConsumer(schema, "", config);
+ consumers.add(consumer);
+ }
+
+ return new CompositeAvroConsumer(consumers);
+ }
+
+ private static FieldVector createVector(FieldVector consumerVector, FieldType fieldType,
+ String name, BufferAllocator allocator) {
+ return consumerVector != null ? consumerVector : fieldType.createNewSingleVector(name, allocator, null);
+ }
+
+ private static String getDefaultFieldName(ArrowType type) {
+ Types.MinorType minorType = Types.getMinorTypeForArrowType(type);
+ return minorType.name().toLowerCase();
+ }
+
+ private static Field avroSchemaToField(Schema schema, String name, AvroToArrowConfig config) {
+ return avroSchemaToField(schema, name, config, null);
+ }
+
+ private static Field avroSchemaToField(
+ Schema schema,
+ String name,
+ AvroToArrowConfig config,
+ Map<String, String> externalProps) {
+
+ final Type type = schema.getType();
+ final LogicalType logicalType = schema.getLogicalType();
+ final List<Field> children = new ArrayList<>();
+ final FieldType fieldType;
+
+ switch (type) {
+ case UNION:
+ for (int i = 0; i < schema.getTypes().size(); i++) {
+ Schema childSchema = schema.getTypes().get(i);
+ // Union child vector should use default name
+ children.add(avroSchemaToField(childSchema, null, config));
+ }
+ fieldType = createFieldType(new ArrowType.Union(UnionMode.Sparse, null), schema, externalProps);
+ break;
+ case ARRAY:
+ Schema elementSchema = schema.getElementType();
+ children.add(avroSchemaToField(elementSchema, elementSchema.getName(), config));
+ fieldType = createFieldType(new ArrowType.List(), schema, externalProps);
+ break;
+ case MAP:
+ // MapVector internal struct field and key field should be non-nullable
+ FieldType keyFieldType = new FieldType(/*nullable=*/false, new ArrowType.Utf8(), /*dictionary=*/null);
+ Field keyField = new Field("key", keyFieldType, /*children=*/null);
+ Field valueField = avroSchemaToField(schema.getValueType(), "value", config);
+
+ FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), /*dictionary=*/null);
+ Field structField = new Field("internal", structFieldType, Arrays.asList(keyField, valueField));
+ children.add(structField);
+ fieldType = createFieldType(new ArrowType.Map(/*keySorted=*/false), schema, externalProps);
+ break;
+ case RECORD:
+ final Set<String> skipFieldNames = config.getSkipFieldNames();
+ for (int i = 0; i < schema.getFields().size(); i++) {
+ final Schema.Field field = schema.getFields().get(i);
+ Schema childSchema = field.schema();
+ String fullChildName = String.format("%s.%s", name, field.name());
+ if (!skipFieldNames.contains(fullChildName)) {
+ final Map<String, String> extProps = new HashMap<>();
+ String doc = field.doc();
+ Set<String> aliases = field.aliases();
+ if (doc != null) {
+ extProps.put("doc", doc);
+ }
+ if (aliases != null) {
+ extProps.put("aliases", convertAliases(aliases));
+ }
+ children.add(avroSchemaToField(childSchema, fullChildName, config, extProps));
+ }
+ }
+ fieldType = createFieldType(new ArrowType.Struct(), schema, externalProps);
+ break;
+ case ENUM:
+ DictionaryProvider.MapDictionaryProvider provider = config.getProvider();
+ int current = provider.getDictionaryIds().size();
+ int enumCount = schema.getEnumSymbols().size();
+ ArrowType.Int indexType = DictionaryEncoder.getIndexType(enumCount);
+
+ fieldType = createFieldType(indexType, schema, externalProps,
+ new DictionaryEncoding(current, /*ordered=*/false, /*indexType=*/indexType));
+ break;
+
+ case STRING:
+ fieldType = createFieldType(new ArrowType.Utf8(), schema, externalProps);
+ break;
+ case FIXED:
+ final ArrowType fixedArrowType;
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ fixedArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType);
+ } else {
+ fixedArrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize());
+ }
+ fieldType = createFieldType(fixedArrowType, schema, externalProps);
+ break;
+ case INT:
+ final ArrowType intArrowType;
+ if (logicalType instanceof LogicalTypes.Date) {
+ intArrowType = new ArrowType.Date(DateUnit.DAY);
+ } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+ intArrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+ } else {
+ intArrowType = new ArrowType.Int(32, /*signed=*/true);
+ }
+ fieldType = createFieldType(intArrowType, schema, externalProps);
+ break;
+ case BOOLEAN:
+ fieldType = createFieldType(new ArrowType.Bool(), schema, externalProps);
+ break;
+ case LONG:
+ final ArrowType longArrowType;
+ if (logicalType instanceof LogicalTypes.TimeMicros) {
+ longArrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64);
+ } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
+ longArrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+ longArrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
+ } else {
+ longArrowType = new ArrowType.Int(64, /*signed=*/true);
+ }
+ fieldType = createFieldType(longArrowType, schema, externalProps);
+ break;
+ case FLOAT:
+ fieldType = createFieldType(new ArrowType.FloatingPoint(SINGLE), schema, externalProps);
+ break;
+ case DOUBLE:
+ fieldType = createFieldType(new ArrowType.FloatingPoint(DOUBLE), schema, externalProps);
+ break;
+ case BYTES:
+ final ArrowType bytesArrowType;
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ bytesArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType);
+ } else {
+ bytesArrowType = new ArrowType.Binary();
+ }
+ fieldType = createFieldType(bytesArrowType, schema, externalProps);
+ break;
+ case NULL:
+ fieldType = createFieldType(ArrowType.Null.INSTANCE, schema, externalProps);
+ break;
+ default:
+ // no-op, shouldn't get here
+ throw new UnsupportedOperationException();
+ }
+
+ if (name == null) {
+ name = getDefaultFieldName(fieldType.getType());
+ }
+ return new Field(name, fieldType, children.size() == 0 ? null : children);
+ }
+
+ private static Consumer createArrayConsumer(Schema schema, String name, AvroToArrowConfig config,
+ FieldVector consumerVector) {
+
+ ListVector listVector;
+ if (consumerVector == null) {
+ final Field field = avroSchemaToField(schema, name, config);
+ listVector = (ListVector) field.createVector(config.getAllocator());
+ } else {
+ listVector = (ListVector) consumerVector;
+ }
+
+ FieldVector dataVector = listVector.getDataVector();
+
+ // create delegate
+ Schema childSchema = schema.getElementType();
+ Consumer delegate = createConsumer(childSchema, childSchema.getName(), config, dataVector);
+
+ return new AvroArraysConsumer(listVector, delegate);
+ }
+
+ private static Consumer createStructConsumer(Schema schema, String name, AvroToArrowConfig config,
+ FieldVector consumerVector) {
+
+ final Set<String> skipFieldNames = config.getSkipFieldNames();
+
+ StructVector structVector;
+ if (consumerVector == null) {
+ final Field field = avroSchemaToField(schema, name, config, createExternalProps(schema));
+ structVector = (StructVector) field.createVector(config.getAllocator());
+ } else {
+ structVector = (StructVector) consumerVector;
+ }
+
+ Consumer[] delegates = new Consumer[schema.getFields().size()];
+ int vectorIndex = 0;
+ for (int i = 0; i < schema.getFields().size(); i++) {
+ Schema.Field childField = schema.getFields().get(i);
+ Consumer delegate;
+ // use full name to distinguish fields have same names between parent and child fields.
+ final String fullChildName = String.format("%s.%s", name, childField.name());
+ if (skipFieldNames.contains(fullChildName)) {
+ delegate = createSkipConsumer(childField.schema());
+ } else {
+ delegate = createConsumer(childField.schema(), fullChildName, config,
+ structVector.getChildrenFromFields().get(vectorIndex++));
+ }
+
+ delegates[i] = delegate;
+ }
+
+ return new AvroStructConsumer(structVector, delegates);
+
+ }
+
+ private static Consumer createEnumConsumer(Schema schema, String name, AvroToArrowConfig config,
+ FieldVector consumerVector) {
+
+ BaseIntVector indexVector;
+ if (consumerVector == null) {
+ final Field field = avroSchemaToField(schema, name, config, createExternalProps(schema));
+ indexVector = (BaseIntVector) field.createVector(config.getAllocator());
+ } else {
+ indexVector = (BaseIntVector) consumerVector;
+ }
+
+ final int valueCount = schema.getEnumSymbols().size();
+ VarCharVector dictVector = new VarCharVector(name, config.getAllocator());
+ dictVector.allocateNewSafe();
+ dictVector.setValueCount(valueCount);
+ for (int i = 0; i < valueCount; i++) {
+ dictVector.set(i, schema.getEnumSymbols().get(i).getBytes(StandardCharsets.UTF_8));
+ }
+ Dictionary dictionary =
+ new Dictionary(dictVector, indexVector.getField().getDictionary());
+ config.getProvider().put(dictionary);
+
+ return new AvroEnumConsumer(indexVector);
+
+ }
+
+ private static Consumer createMapConsumer(Schema schema, String name, AvroToArrowConfig config,
+ FieldVector consumerVector) {
+
+ MapVector mapVector;
+ if (consumerVector == null) {
+ final Field field = avroSchemaToField(schema, name, config);
+ mapVector = (MapVector) field.createVector(config.getAllocator());
+ } else {
+ mapVector = (MapVector) consumerVector;
+ }
+
+ // create delegate struct consumer
+ StructVector structVector = (StructVector) mapVector.getDataVector();
+
+ // keys in avro map are always assumed to be strings.
+ Consumer keyConsumer = new AvroStringConsumer(
+ (VarCharVector) structVector.getChildrenFromFields().get(0));
+ Consumer valueConsumer = createConsumer(schema.getValueType(), schema.getValueType().getName(),
+ config, structVector.getChildrenFromFields().get(1));
+
+ AvroStructConsumer internalConsumer =
+ new AvroStructConsumer(structVector, new Consumer[] {keyConsumer, valueConsumer});
+
+ return new AvroMapConsumer(mapVector, internalConsumer);
+ }
+
+ private static Consumer createUnionConsumer(Schema schema, String name, AvroToArrowConfig config,
+ FieldVector consumerVector) {
+ final int size = schema.getTypes().size();
+
+ final boolean nullable = schema.getTypes().stream().anyMatch(t -> t.getType() == Type.NULL);
+
+ UnionVector unionVector;
+ if (consumerVector == null) {
+ final Field field = avroSchemaToField(schema, name, config);
+ unionVector = (UnionVector) field.createVector(config.getAllocator());
+ } else {
+ unionVector = (UnionVector) consumerVector;
+ }
+
+ List<FieldVector> childVectors = unionVector.getChildrenFromFields();
+
+ Consumer[] delegates = new Consumer[size];
+ Types.MinorType[] types = new Types.MinorType[size];
+
+ for (int i = 0; i < size; i++) {
+ FieldVector child = childVectors.get(i);
+ Schema subSchema = schema.getTypes().get(i);
+ Consumer delegate = createConsumer(subSchema, subSchema.getName(), nullable, config, child);
+ delegates[i] = delegate;
+ types[i] = child.getMinorType();
+ }
+ return new AvroUnionsConsumer(unionVector, delegates, types);
+ }
+
+ /**
+ * Read data from {@link Decoder} and generate a {@link VectorSchemaRoot}.
+ * @param schema avro schema
+ * @param decoder avro decoder to read data from
+ */
+ static VectorSchemaRoot avroToArrowVectors(
+ Schema schema,
+ Decoder decoder,
+ AvroToArrowConfig config)
+ throws IOException {
+
+ List<FieldVector> vectors = new ArrayList<>();
+ List<Consumer> consumers = new ArrayList<>();
+ final Set<String> skipFieldNames = config.getSkipFieldNames();
+
+ Schema.Type type = schema.getType();
+ if (type == Type.RECORD) {
+ for (Schema.Field field : schema.getFields()) {
+ if (skipFieldNames.contains(field.name())) {
+ consumers.add(createSkipConsumer(field.schema()));
+ } else {
+ Consumer consumer = createConsumer(field.schema(), field.name(), config);
+ consumers.add(consumer);
+ vectors.add(consumer.getVector());
+ }
+ }
+ } else {
+ Consumer consumer = createConsumer(schema, "", config);
+ consumers.add(consumer);
+ vectors.add(consumer.getVector());
+ }
+
+ long validConsumerCount = consumers.stream().filter(c -> !c.skippable()).count();
+ Preconditions.checkArgument(vectors.size() == validConsumerCount,
+ "vectors size not equals consumers size.");
+
+ List<Field> fields = vectors.stream().map(t -> t.getField()).collect(Collectors.toList());
+
+ VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
+
+ CompositeAvroConsumer compositeConsumer = new CompositeAvroConsumer(consumers);
+
+ int valueCount = 0;
+ try {
+ while (true) {
+ ValueVectorUtility.ensureCapacity(root, valueCount + 1);
+ compositeConsumer.consume(decoder);
+ valueCount++;
+ }
+ } catch (EOFException eof) {
+ // reach the end of encoder stream.
+ root.setRowCount(valueCount);
+ } catch (Exception e) {
+ compositeConsumer.close();
+ throw new UnsupportedOperationException("Error occurs while consume process.", e);
+ }
+
+ return root;
+ }
+
+ private static Map<String, String> getMetaData(Schema schema) {
+ Map<String, String> metadata = new HashMap<>();
+ schema.getObjectProps().forEach((k, v) -> metadata.put(k, v.toString()));
+ return metadata;
+ }
+
+ private static Map<String, String> getMetaData(Schema schema, Map<String, String> externalProps) {
+ Map<String, String> metadata = getMetaData(schema);
+ if (externalProps != null) {
+ metadata.putAll(externalProps);
+ }
+ return metadata;
+ }
+
+ /**
+ * Parse avro attributes and convert them to metadata.
+ */
+ private static Map<String, String> createExternalProps(Schema schema) {
+ final Map<String, String> extProps = new HashMap<>();
+ String doc = schema.getDoc();
+ Set<String> aliases = schema.getAliases();
+ if (doc != null) {
+ extProps.put("doc", doc);
+ }
+ if (aliases != null) {
+ extProps.put("aliases", convertAliases(aliases));
+ }
+ return extProps;
+ }
+
+ private static FieldType createFieldType(ArrowType arrowType, Schema schema, Map<String, String> externalProps) {
+ return createFieldType(arrowType, schema, externalProps, /*dictionary=*/null);
+ }
+
+ private static FieldType createFieldType(
+ ArrowType arrowType,
+ Schema schema,
+ Map<String, String> externalProps,
+ DictionaryEncoding dictionary) {
+
+ return new FieldType(/*nullable=*/false, arrowType, dictionary,
+ getMetaData(schema, externalProps));
+ }
+
+ private static String convertAliases(Set<String> aliases) {
+ JsonStringArrayList jsonList = new JsonStringArrayList();
+ aliases.stream().forEach(a -> jsonList.add(a));
+ return jsonList.toString();
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java
new file mode 100644
index 000000000..1faa7595c
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import java.io.EOFException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.consumers.CompositeAvroConsumer;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.util.ValueVectorUtility;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+
+/**
+ * VectorSchemaRoot iterator for partially converting avro data.
+ */
+public class AvroToArrowVectorIterator implements Iterator<VectorSchemaRoot>, AutoCloseable {
+
+ public static final int NO_LIMIT_BATCH_SIZE = -1;
+ public static final int DEFAULT_BATCH_SIZE = 1024;
+
+ private final Decoder decoder;
+ private final Schema schema;
+
+ private final AvroToArrowConfig config;
+
+ private CompositeAvroConsumer compositeConsumer;
+
+ private org.apache.arrow.vector.types.pojo.Schema rootSchema;
+
+ private VectorSchemaRoot nextBatch;
+
+ private final int targetBatchSize;
+
+ /**
+ * Construct an instance.
+ */
+ private AvroToArrowVectorIterator(
+ Decoder decoder,
+ Schema schema,
+ AvroToArrowConfig config) {
+
+ this.decoder = decoder;
+ this.schema = schema;
+ this.config = config;
+ this.targetBatchSize = config.getTargetBatchSize();
+
+ }
+
+ /**
+ * Create a ArrowVectorIterator to partially convert data.
+ */
+ public static AvroToArrowVectorIterator create(
+ Decoder decoder,
+ Schema schema,
+ AvroToArrowConfig config) {
+
+ AvroToArrowVectorIterator iterator = new AvroToArrowVectorIterator(decoder, schema, config);
+ try {
+ iterator.initialize();
+ return iterator;
+ } catch (Exception e) {
+ iterator.close();
+ throw new RuntimeException("Error occurs while creating iterator.", e);
+ }
+ }
+
+ private void initialize() {
+ // create consumers
+ compositeConsumer = AvroToArrowUtils.createCompositeConsumer(schema, config);
+ List<FieldVector> vectors = new ArrayList<>();
+ compositeConsumer.getConsumers().forEach(c -> vectors.add(c.getVector()));
+ List<Field> fields = vectors.stream().map(t -> t.getField()).collect(Collectors.toList());
+ VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
+ rootSchema = root.getSchema();
+
+ load(root);
+ }
+
+ private void consumeData(VectorSchemaRoot root) {
+ int readRowCount = 0;
+ try {
+ while ((targetBatchSize == NO_LIMIT_BATCH_SIZE || readRowCount < targetBatchSize)) {
+ compositeConsumer.consume(decoder);
+ readRowCount++;
+ }
+
+ if (targetBatchSize == NO_LIMIT_BATCH_SIZE) {
+ while (true) {
+ ValueVectorUtility.ensureCapacity(root, readRowCount + 1);
+ compositeConsumer.consume(decoder);
+ readRowCount++;
+ }
+ } else {
+ while (readRowCount < targetBatchSize) {
+ compositeConsumer.consume(decoder);
+ readRowCount++;
+ }
+ }
+
+ root.setRowCount(readRowCount);
+ } catch (EOFException eof) {
+ // reach the end of encoder stream.
+ root.setRowCount(readRowCount);
+ } catch (Exception e) {
+ compositeConsumer.close();
+ throw new RuntimeException("Error occurs while consuming data.", e);
+ }
+ }
+
+ // Loads the next schema root or null if no more rows are available.
+ private void load(VectorSchemaRoot root) {
+ final int targetBatchSize = config.getTargetBatchSize();
+ if (targetBatchSize != NO_LIMIT_BATCH_SIZE) {
+ ValueVectorUtility.preAllocate(root, targetBatchSize);
+ }
+
+ long validConsumerCount = compositeConsumer.getConsumers().stream().filter(c ->
+ !c.skippable()).count();
+ Preconditions.checkArgument(root.getFieldVectors().size() == validConsumerCount,
+ "Schema root vectors size not equals to consumers size.");
+
+ compositeConsumer.resetConsumerVectors(root);
+
+ // consume data
+ consumeData(root);
+
+ if (root.getRowCount() == 0) {
+ root.close();
+ nextBatch = null;
+ } else {
+ nextBatch = root;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextBatch != null;
+ }
+
+ /**
+ * Gets the next vector. The user is responsible for freeing its resources.
+ */
+ public VectorSchemaRoot next() {
+ Preconditions.checkArgument(hasNext());
+ VectorSchemaRoot returned = nextBatch;
+ try {
+ load(VectorSchemaRoot.create(rootSchema, config.getAllocator()));
+ } catch (Exception e) {
+ returned.close();
+ throw new RuntimeException("Error occurs while getting next schema root.", e);
+ }
+ return returned;
+ }
+
+ /**
+ * Clean up resources.
+ */
+ public void close() {
+ if (nextBatch != null) {
+ nextBatch.close();
+ }
+ compositeConsumer.close();
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java
new file mode 100644
index 000000000..b9d0f84cf
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume array type values from avro decoder.
+ * Write the data to {@link ListVector}.
+ */
+public class AvroArraysConsumer extends BaseAvroConsumer<ListVector> {
+
+ private final Consumer delegate;
+
+ /**
+ * Instantiate a ArrayConsumer.
+ */
+ public AvroArraysConsumer(ListVector vector, Consumer delegate) {
+ super(vector);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+
+ vector.startNewValue(currentIndex);
+ long totalCount = 0;
+ for (long count = decoder.readArrayStart(); count != 0; count = decoder.arrayNext()) {
+ totalCount += count;
+ ensureInnerVectorCapacity(totalCount);
+ for (int element = 0; element < count; element++) {
+ delegate.consume(decoder);
+ }
+ }
+ vector.endValue(currentIndex, (int) totalCount);
+ currentIndex++;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ delegate.close();
+ }
+
+ @Override
+ public boolean resetValueVector(ListVector vector) {
+ this.delegate.resetValueVector(vector.getDataVector());
+ return super.resetValueVector(vector);
+ }
+
+ void ensureInnerVectorCapacity(long targetCapacity) {
+ while (vector.getDataVector().getValueCapacity() < targetCapacity) {
+ vector.getDataVector().reAlloc();
+ }
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
new file mode 100644
index 000000000..4ca5f2445
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.BitVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume boolean type values from avro decoder.
+ * Write the data to {@link BitVector}.
+ */
+public class AvroBooleanConsumer extends BaseAvroConsumer<BitVector> {
+
+ /**
+ * Instantiate a AvroBooleanConsumer.
+ */
+ public AvroBooleanConsumer(BitVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ vector.set(currentIndex, decoder.readBoolean() ? 1 : 0);
+ currentIndex++;
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
new file mode 100644
index 000000000..eede68ebd
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume bytes type values from avro decoder.
+ * Write the data to {@link VarBinaryVector}.
+ */
+public class AvroBytesConsumer extends BaseAvroConsumer<VarBinaryVector> {
+
+ private ByteBuffer cacheBuffer;
+
+ /**
+ * Instantiate a AvroBytesConsumer.
+ */
+ public AvroBytesConsumer(VarBinaryVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ // cacheBuffer is initialized null and create in the first consume,
+ // if its capacity < size to read, decoder will create a new one with new capacity.
+ cacheBuffer = decoder.readBytes(cacheBuffer);
+ vector.setSafe(currentIndex, cacheBuffer, 0, cacheBuffer.limit());
+ currentIndex++;
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
new file mode 100644
index 000000000..356707a14
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume double type values from avro decoder.
+ * Write the data to {@link Float8Vector}.
+ */
+public class AvroDoubleConsumer extends BaseAvroConsumer<Float8Vector> {
+
+ /**
+ * Instantiate a AvroDoubleConsumer.
+ */
+ public AvroDoubleConsumer(Float8Vector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ vector.set(currentIndex++, decoder.readDouble());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroEnumConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroEnumConsumer.java
new file mode 100644
index 000000000..2f4443b74
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroEnumConsumer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.BaseIntVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume enum type values from avro decoder.
+ * Write the data to {@link IntVector}.
+ */
+public class AvroEnumConsumer extends BaseAvroConsumer<BaseIntVector> {
+
+ /**
+ * Instantiate a AvroEnumConsumer.
+ */
+ public AvroEnumConsumer(BaseIntVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ vector.setWithPossibleTruncate(currentIndex++, decoder.readEnum());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java
new file mode 100644
index 000000000..a065466e3
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume fixed type values from avro decoder.
+ * Write the data to {@link org.apache.arrow.vector.FixedSizeBinaryVector}.
+ */
+public class AvroFixedConsumer extends BaseAvroConsumer<FixedSizeBinaryVector> {
+
+ private final byte[] reuseBytes;
+
+ /**
+ * Instantiate a AvroFixedConsumer.
+ */
+ public AvroFixedConsumer(FixedSizeBinaryVector vector, int size) {
+ super(vector);
+ reuseBytes = new byte[size];
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ decoder.readFixed(reuseBytes);
+ vector.setSafe(currentIndex++, reuseBytes);
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
new file mode 100644
index 000000000..c8de4a21a
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume float type values from avro decoder.
+ * Write the data to {@link Float4Vector}.
+ */
+public class AvroFloatConsumer extends BaseAvroConsumer<Float4Vector> {
+
+ /**
+ * Instantiate a AvroFloatConsumer.
+ */
+ public AvroFloatConsumer(Float4Vector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ vector.set(currentIndex++, decoder.readFloat());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
new file mode 100644
index 000000000..bc8d4de78
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.IntVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume int type values from avro decoder.
+ * Write the data to {@link IntVector}.
+ */
+public class AvroIntConsumer extends BaseAvroConsumer<IntVector> {
+
+ /**
+ * Instantiate a AvroIntConsumer.
+ */
+ public AvroIntConsumer(IntVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ vector.set(currentIndex++, decoder.readInt());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
new file mode 100644
index 000000000..b9016c58f
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume long type values from avro decoder.
+ * Write the data to {@link BigIntVector}.
+ */
+public class AvroLongConsumer extends BaseAvroConsumer<BigIntVector> {
+
+ /**
+ * Instantiate a AvroLongConsumer.
+ */
+ public AvroLongConsumer(BigIntVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ vector.set(currentIndex++, decoder.readLong());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java
new file mode 100644
index 000000000..b8e8bd585
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume map type values from avro decoder.
+ * Write the data to {@link MapVector}.
+ */
+public class AvroMapConsumer extends BaseAvroConsumer<MapVector> {
+
+ private final Consumer delegate;
+
+ /**
+ * Instantiate a AvroMapConsumer.
+ */
+ public AvroMapConsumer(MapVector vector, Consumer delegate) {
+ super(vector);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+
+ vector.startNewValue(currentIndex);
+ long totalCount = 0;
+ for (long count = decoder.readMapStart(); count != 0; count = decoder.mapNext()) {
+ totalCount += count;
+ ensureInnerVectorCapacity(totalCount);
+ for (int element = 0; element < count; element++) {
+ delegate.consume(decoder);
+ }
+ }
+ vector.endValue(currentIndex, (int) totalCount);
+ currentIndex++;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ delegate.close();
+ }
+
+ @Override
+ public boolean resetValueVector(MapVector vector) {
+ this.delegate.resetValueVector(vector.getDataVector());
+ return super.resetValueVector(vector);
+ }
+
+ void ensureInnerVectorCapacity(long targetCapacity) {
+ StructVector innerVector = (StructVector) vector.getDataVector();
+ for (FieldVector v : innerVector.getChildrenFromFields()) {
+ while (v.getValueCapacity() < targetCapacity) {
+ v.reAlloc();
+ }
+ }
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
new file mode 100644
index 000000000..64768008a
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.NullVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume null type values from avro decoder.
+ * Corresponding to {@link org.apache.arrow.vector.NullVector}.
+ */
+public class AvroNullConsumer extends BaseAvroConsumer<NullVector> {
+
+ public AvroNullConsumer(NullVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ currentIndex++;
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
new file mode 100644
index 000000000..10fe234ac
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume string type values from avro decoder.
+ * Write the data to {@link VarCharVector}.
+ */
+public class AvroStringConsumer extends BaseAvroConsumer<VarCharVector> {
+
+ private ByteBuffer cacheBuffer;
+
+ /**
+ * Instantiate a AvroStringConsumer.
+ */
+ public AvroStringConsumer(VarCharVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ // cacheBuffer is initialized null and create in the first consume,
+ // if its capacity < size to read, decoder will create a new one with new capacity.
+ cacheBuffer = decoder.readBytes(cacheBuffer);
+ vector.setSafe(currentIndex++, cacheBuffer, 0, cacheBuffer.limit());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java
new file mode 100644
index 000000000..792d01ee5
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume nested record type values from avro decoder.
+ * Write the data to {@link org.apache.arrow.vector.complex.StructVector}.
+ */
+public class AvroStructConsumer extends BaseAvroConsumer<StructVector> {
+
+ private final Consumer[] delegates;
+
+ /**
+ * Instantiate a AvroStructConsumer.
+ */
+ public AvroStructConsumer(StructVector vector, Consumer[] delegates) {
+ super(vector);
+ this.delegates = delegates;
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+
+ ensureInnerVectorCapacity(currentIndex + 1);
+ for (int i = 0; i < delegates.length; i++) {
+ delegates[i].consume(decoder);
+ }
+ vector.setIndexDefined(currentIndex);
+ currentIndex++;
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ AutoCloseables.close(delegates);
+ }
+
+ @Override
+ public boolean resetValueVector(StructVector vector) {
+ for (int i = 0; i < delegates.length; i++) {
+ delegates[i].resetValueVector(vector.getChildrenFromFields().get(i));
+ }
+ return super.resetValueVector(vector);
+ }
+
+ void ensureInnerVectorCapacity(long targetCapacity) {
+ for (FieldVector v : vector.getChildrenFromFields()) {
+ while (v.getValueCapacity() < targetCapacity) {
+ v.reAlloc();
+ }
+ }
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
new file mode 100644
index 000000000..c0bb0200f
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.complex.UnionVector;
+import org.apache.arrow.vector.types.Types;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume unions type values from avro decoder.
+ * Write the data to {@link org.apache.arrow.vector.complex.UnionVector}.
+ */
+public class AvroUnionsConsumer extends BaseAvroConsumer<UnionVector> {
+
+ private Consumer[] delegates;
+ private Types.MinorType[] types;
+
+ /**
+ * Instantiate an AvroUnionConsumer.
+ */
+ public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorType[] types) {
+
+ super(vector);
+ this.delegates = delegates;
+ this.types = types;
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ int fieldIndex = decoder.readInt();
+
+ ensureInnerVectorCapacity(currentIndex + 1, fieldIndex);
+ Consumer delegate = delegates[fieldIndex];
+
+ vector.setType(currentIndex, types[fieldIndex]);
+ // In UnionVector we need to set sub vector writer position before consume a value
+ // because in the previous iterations we might not have written to the specific union sub vector.
+ delegate.setPosition(currentIndex);
+ delegate.consume(decoder);
+
+ currentIndex++;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ AutoCloseables.close(delegates);
+ }
+
+ @Override
+ public boolean resetValueVector(UnionVector vector) {
+ for (int i = 0; i < delegates.length; i++) {
+ delegates[i].resetValueVector(vector.getChildrenFromFields().get(i));
+ }
+ return super.resetValueVector(vector);
+ }
+
+ void ensureInnerVectorCapacity(long targetCapacity, int fieldIndex) {
+ ValueVector fieldVector = vector.getChildrenFromFields().get(fieldIndex);
+ if (fieldVector.getMinorType() == Types.MinorType.NULL) {
+ return;
+ }
+ while (fieldVector.getValueCapacity() < targetCapacity) {
+ fieldVector.reAlloc();
+ }
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/BaseAvroConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/BaseAvroConsumer.java
new file mode 100644
index 000000000..303be8e50
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/BaseAvroConsumer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import org.apache.arrow.vector.FieldVector;
+
+/**
+ * Base class for non-skippable avro consumers.
+ * @param <T> vector type.
+ */
+public abstract class BaseAvroConsumer<T extends FieldVector> implements Consumer<T> {
+
+ protected T vector;
+ protected int currentIndex;
+
+ /**
+ * Constructs a base avro consumer.
+ * @param vector the vector to consume.
+ */
+ public BaseAvroConsumer(T vector) {
+ this.vector = vector;
+ }
+
+ @Override
+ public void addNull() {
+ currentIndex++;
+ }
+
+ @Override
+ public void setPosition(int index) {
+ currentIndex = index;
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return vector;
+ }
+
+ @Override
+ public void close() throws Exception {
+ vector.close();
+ }
+
+ @Override
+ public boolean resetValueVector(T vector) {
+ this.vector = vector;
+ this.currentIndex = 0;
+ return true;
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java
new file mode 100644
index 000000000..af476d27c
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Composite consumer which hold all consumers.
+ * It manages the consume and cleanup process.
+ */
+public class CompositeAvroConsumer implements AutoCloseable {
+
+ private final List<Consumer> consumers;
+
+ public List<Consumer> getConsumers() {
+ return consumers;
+ }
+
+ public CompositeAvroConsumer(List<Consumer> consumers) {
+ this.consumers = consumers;
+ }
+
+ /**
+ * Consume decoder data.
+ */
+ public void consume(Decoder decoder) throws IOException {
+ for (Consumer consumer : consumers) {
+ consumer.consume(decoder);
+ }
+ }
+
+ /**
+ * Reset vector of consumers with the given {@link VectorSchemaRoot}.
+ */
+ public void resetConsumerVectors(VectorSchemaRoot root) {
+ int index = 0;
+ for (Consumer consumer : consumers) {
+ if (consumer.resetValueVector(root.getFieldVectors().get(index))) {
+ index++;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ // clean up
+ try {
+ AutoCloseables.close(consumers);
+ } catch (Exception e) {
+ throw new RuntimeException("Error occurs in close.", e);
+ }
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
new file mode 100644
index 000000000..8c4ee9a96
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Interface that is used to consume values from avro decoder.
+ * @param <T> The vector within consumer or its delegate, used for partially consume purpose.
+ */
+public interface Consumer<T extends FieldVector> extends AutoCloseable {
+
+ /**
+ * Consume a specific type value from avro decoder and write it to vector.
+ * @param decoder avro decoder to read data
+ * @throws IOException on error
+ */
+ void consume(Decoder decoder) throws IOException;
+
+ /**
+ * Add null value to vector by making writer position + 1.
+ */
+ void addNull();
+
+ /**
+ * Set the position to write value into vector.
+ */
+ void setPosition(int index);
+
+ /**
+ * Get the vector within the consumer.
+ */
+ FieldVector getVector();
+
+ /**
+ * Close this consumer when occurs exception to avoid potential leak.
+ */
+ void close() throws Exception;
+
+ /**
+ * Reset the vector within consumer for partial read purpose.
+ * @return true if reset is successful, false if reset is not needed.
+ */
+ boolean resetValueVector(T vector);
+
+ /**
+ * Indicates whether the consumer is type of {@link SkipConsumer}.
+ */
+ default boolean skippable() {
+ return false;
+ }
+
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipConsumer.java
new file mode 100644
index 000000000..94c5b339d
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipConsumer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which skip (throw away) data from the decoder.
+ */
+public class SkipConsumer implements Consumer {
+
+ private final SkipFunction skipFunction;
+
+ public SkipConsumer(SkipFunction skipFunction) {
+ this.skipFunction = skipFunction;
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ skipFunction.apply(decoder);
+ }
+
+ @Override
+ public void addNull() {
+ }
+
+ @Override
+ public void setPosition(int index) {
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ @Override
+ public boolean resetValueVector(FieldVector vector) {
+ return false;
+ }
+
+ @Override
+ public boolean skippable() {
+ return true;
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipFunction.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipFunction.java
new file mode 100644
index 000000000..61938916a
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.avro.io.Decoder;
+
+/**
+ * Adapter function to skip (throw away) data from the decoder.
+ */
+@FunctionalInterface
+public interface SkipFunction {
+ void apply(Decoder decoder) throws IOException;
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDateConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDateConsumer.java
new file mode 100644
index 000000000..3aa8970d9
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDateConsumer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers.logical;
+
+import java.io.IOException;
+
+import org.apache.arrow.consumers.BaseAvroConsumer;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume date type values from avro decoder.
+ * Write the data to {@link DateDayVector}.
+ */
+public class AvroDateConsumer extends BaseAvroConsumer<DateDayVector> {
+
+ /**
+ * Instantiate a AvroDateConsumer.
+ */
+ public AvroDateConsumer(DateDayVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ vector.set(currentIndex++, decoder.readInt());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDecimalConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDecimalConsumer.java
new file mode 100644
index 000000000..24d73cf82
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDecimalConsumer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers.logical;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.consumers.BaseAvroConsumer;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume decimal type values from avro decoder.
+ * Write the data to {@link DecimalVector}.
+ */
+public abstract class AvroDecimalConsumer extends BaseAvroConsumer<DecimalVector> {
+
+ /**
+ * Instantiate a AvroDecimalConsumer.
+ */
+ public AvroDecimalConsumer(DecimalVector vector) {
+ super(vector);
+ }
+
+ /**
+ * Consumer for decimal logical type with original bytes type.
+ */
+ public static class BytesDecimalConsumer extends AvroDecimalConsumer {
+
+ private ByteBuffer cacheBuffer;
+
+ /**
+ * Instantiate a BytesDecimalConsumer.
+ */
+ public BytesDecimalConsumer(DecimalVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ cacheBuffer = decoder.readBytes(cacheBuffer);
+ byte[] bytes = new byte[cacheBuffer.limit()];
+ Preconditions.checkArgument(bytes.length <= 16, "Decimal bytes length should <= 16.");
+ cacheBuffer.get(bytes);
+ vector.setBigEndian(currentIndex++, bytes);
+ }
+
+ }
+
+ /**
+ * Consumer for decimal logical type with original fixed type.
+ */
+ public static class FixedDecimalConsumer extends AvroDecimalConsumer {
+
+ private byte[] reuseBytes;
+
+ /**
+ * Instantiate a FixedDecimalConsumer.
+ */
+ public FixedDecimalConsumer(DecimalVector vector, int size) {
+ super(vector);
+ Preconditions.checkArgument(size <= 16, "Decimal bytes length should <= 16.");
+ reuseBytes = new byte[size];
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ decoder.readFixed(reuseBytes);
+ vector.setBigEndian(currentIndex++, reuseBytes);
+ }
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMicroConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMicroConsumer.java
new file mode 100644
index 000000000..e68ba158f
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMicroConsumer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers.logical;
+
+import java.io.IOException;
+
+import org.apache.arrow.consumers.BaseAvroConsumer;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume date time-micro values from avro decoder.
+ * Write the data to {@link TimeMicroVector}.
+ */
+public class AvroTimeMicroConsumer extends BaseAvroConsumer<TimeMicroVector> {
+
+ /**
+ * Instantiate a AvroTimeMicroConsumer.
+ */
+ public AvroTimeMicroConsumer(TimeMicroVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ vector.set(currentIndex++, decoder.readLong());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMillisConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMillisConsumer.java
new file mode 100644
index 000000000..f76186fc3
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMillisConsumer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers.logical;
+
+import java.io.IOException;
+
+import org.apache.arrow.consumers.BaseAvroConsumer;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume date time-millis values from avro decoder.
+ * Write the data to {@link TimeMilliVector}.
+ */
+public class AvroTimeMillisConsumer extends BaseAvroConsumer<TimeMilliVector> {
+
+ /**
+ * Instantiate a AvroTimeMilliConsumer.
+ */
+ public AvroTimeMillisConsumer(TimeMilliVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ vector.set(currentIndex++, decoder.readInt());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMicrosConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMicrosConsumer.java
new file mode 100644
index 000000000..82da0e805
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMicrosConsumer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers.logical;
+
+import java.io.IOException;
+
+import org.apache.arrow.consumers.BaseAvroConsumer;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume date timestamp-micro values from avro decoder.
+ * Write the data to {@link TimeStampMicroVector}.
+ */
+public class AvroTimestampMicrosConsumer extends BaseAvroConsumer<TimeStampMicroVector> {
+
+ /**
+ * Instantiate a AvroTimestampMicroConsumer.
+ */
+ public AvroTimestampMicrosConsumer(TimeStampMicroVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ vector.set(currentIndex++, decoder.readLong());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMillisConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMillisConsumer.java
new file mode 100644
index 000000000..159f49e14
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMillisConsumer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers.logical;
+
+import java.io.IOException;
+
+import org.apache.arrow.consumers.BaseAvroConsumer;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume date timestamp-millis values from avro decoder.
+ * Write the data to {@link TimeStampMilliVector}.
+ */
+public class AvroTimestampMillisConsumer extends BaseAvroConsumer<TimeStampMilliVector> {
+
+ /**
+ * Instantiate a AvroTimestampMillisConsumer.
+ */
+ public AvroTimestampMillisConsumer(TimeStampMilliVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ vector.set(currentIndex++, decoder.readLong());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroLogicalTypesTest.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroLogicalTypesTest.java
new file mode 100644
index 000000000..050a50dda
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroLogicalTypesTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import static junit.framework.TestCase.assertNull;
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.util.DateUtility;
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericFixed;
+import org.junit.Test;
+
+public class AvroLogicalTypesTest extends AvroTestBase {
+
+ @Test
+ public void testTimestampMicros() throws Exception {
+ Schema schema = getSchema("logical/test_timestamp_micros.avsc");
+
+ List<Long> data = Arrays.asList(10000L, 20000L, 30000L, 40000L, 50000L);
+ List<LocalDateTime> expected = Arrays.asList(
+ DateUtility.getLocalDateTimeFromEpochMicro(10000),
+ DateUtility.getLocalDateTimeFromEpochMicro(20000),
+ DateUtility.getLocalDateTimeFromEpochMicro(30000),
+ DateUtility.getLocalDateTimeFromEpochMicro(40000),
+ DateUtility.getLocalDateTimeFromEpochMicro(50000)
+ );
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(expected, vector);
+ }
+
+ @Test
+ public void testTimestampMillis() throws Exception {
+ Schema schema = getSchema("logical/test_timestamp_millis.avsc");
+
+ List<Long> data = Arrays.asList(10000L, 20000L, 30000L, 40000L, 50000L);
+ List<LocalDateTime> expected = Arrays.asList(
+ DateUtility.getLocalDateTimeFromEpochMilli(10000),
+ DateUtility.getLocalDateTimeFromEpochMilli(20000),
+ DateUtility.getLocalDateTimeFromEpochMilli(30000),
+ DateUtility.getLocalDateTimeFromEpochMilli(40000),
+ DateUtility.getLocalDateTimeFromEpochMilli(50000)
+ );
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(expected, vector);
+ }
+
+ @Test
+ public void testTimeMicros() throws Exception {
+ Schema schema = getSchema("logical/test_time_micros.avsc");
+
+ List<Long> data = Arrays.asList(10000L, 20000L, 30000L, 40000L, 50000L);
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testTimeMillis() throws Exception {
+ Schema schema = getSchema("logical/test_time_millis.avsc");
+
+ List<Integer> data = Arrays.asList(100, 200, 300, 400, 500);
+ List<LocalDateTime> expected = Arrays.asList(
+ DateUtility.getLocalDateTimeFromEpochMilli(100),
+ DateUtility.getLocalDateTimeFromEpochMilli(200),
+ DateUtility.getLocalDateTimeFromEpochMilli(300),
+ DateUtility.getLocalDateTimeFromEpochMilli(400),
+ DateUtility.getLocalDateTimeFromEpochMilli(500)
+ );
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(expected, vector);
+ }
+
+ @Test
+ public void testDate() throws Exception {
+ Schema schema = getSchema("logical/test_date.avsc");
+
+ List<Integer> data = Arrays.asList(100, 200, 300, 400, 500);
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testDecimalWithOriginalBytes() throws Exception {
+ Schema schema = getSchema("logical/test_decimal_with_original_bytes.avsc");
+ List<ByteBuffer> data = new ArrayList<>();
+ List<BigDecimal> expected = new ArrayList<>();
+
+ Conversions.DecimalConversion conversion = new Conversions.DecimalConversion();
+
+ for (int i = 0; i < 5; i++) {
+ BigDecimal value = new BigDecimal(i * i).setScale(2);
+ ByteBuffer buffer = conversion.toBytes(value, schema, schema.getLogicalType());
+ data.add(buffer);
+ expected.add(value);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+ checkPrimitiveResult(expected, vector);
+
+ }
+
+ @Test
+ public void testDecimalWithOriginalFixed() throws Exception {
+ Schema schema = getSchema("logical/test_decimal_with_original_fixed.avsc");
+
+ List<GenericFixed> data = new ArrayList<>();
+ List<BigDecimal> expected = new ArrayList<>();
+
+ Conversions.DecimalConversion conversion = new Conversions.DecimalConversion();
+
+ for (int i = 0; i < 5; i++) {
+ BigDecimal value = new BigDecimal(i * i).setScale(2);
+ GenericFixed fixed = conversion.toFixed(value, schema, schema.getLogicalType());
+ data.add(fixed);
+ expected.add(value);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+ checkPrimitiveResult(expected, vector);
+ }
+
+ @Test
+ public void testInvalidDecimalPrecision() throws Exception {
+ Schema schema = getSchema("logical/test_decimal_invalid1.avsc");
+ List<ByteBuffer> data = new ArrayList<>();
+
+ Conversions.DecimalConversion conversion = new Conversions.DecimalConversion();
+
+ for (int i = 0; i < 5; i++) {
+ BigDecimal value = new BigDecimal(i * i).setScale(2);
+ ByteBuffer buffer = conversion.toBytes(value, schema, schema.getLogicalType());
+ data.add(buffer);
+ }
+
+ IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
+ () -> writeAndRead(schema, data));
+ assertTrue(e.getMessage().contains("Precision must be in range of 1 to 38"));
+
+ }
+
+ @Test
+ public void testFailedToCreateDecimalLogicalType() throws Exception {
+ // For decimal logical type, if avro validate schema failed, it will not create logical type,
+ // and the schema will be treated as its original type.
+
+ // java.lang.IllegalArgumentException: Invalid decimal scale: -1 (must be positive)
+ Schema schema1 = getSchema("logical/test_decimal_invalid2.avsc");
+ assertNull(schema1.getLogicalType());
+
+ // java.lang.IllegalArgumentException: Invalid decimal scale: 40 (greater than precision: 20)
+ Schema schema2 = getSchema("logical/test_decimal_invalid3.avsc");
+ assertNull(schema2.getLogicalType());
+
+ // java.lang.IllegalArgumentException: fixed(1) cannot store 30 digits (max 2)
+ Schema schema3 = getSchema("logical/test_decimal_invalid4.avsc");
+ assertNull(schema3.getLogicalType());
+ }
+
+}
diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroSkipFieldTest.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroSkipFieldTest.java
new file mode 100644
index 000000000..b946dbd86
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroSkipFieldTest.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.types.Types;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
+
+public class AvroSkipFieldTest extends AvroTestBase {
+
+ @Test
+ public void testSkipUnionWithOneField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f0");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_union_before.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_union_one_field_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, "test" + i);
+ record.put(1, i % 2 == 0 ? "test" + i : null);
+ record.put(2, i % 2 == 0 ? "test" + i : i);
+ record.put(3, i);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, record.get(1));
+ expectedRecord.put(1, record.get(2));
+ expectedRecord.put(2, record.get(3));
+ expectedData.add(expectedRecord);
+ }
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipUnionWithNullableOneField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f1");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_union_before.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_union_nullable_field_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, "test" + i);
+ record.put(1, i % 2 == 0 ? "test" + i : null);
+ record.put(2, i % 2 == 0 ? "test" + i : i);
+ record.put(3, i);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, record.get(0));
+ expectedRecord.put(1, record.get(2));
+ expectedRecord.put(2, record.get(3));
+ expectedData.add(expectedRecord);
+ }
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipUnionWithMultiFields() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f2");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_union_before.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_union_multi_fields_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, "test" + i);
+ record.put(1, i % 2 == 0 ? "test" + i : null);
+ record.put(2, i % 2 == 0 ? "test" + i : i);
+ record.put(3, i);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, record.get(0));
+ expectedRecord.put(1, record.get(1));
+ expectedRecord.put(2, record.get(3));
+ expectedData.add(expectedRecord);
+ }
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipMapField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f1");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_map_before.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_map_expected.avsc");
+
+ HashMap map = new HashMap();
+ map.put("key1", "value1");
+ map.put("key2", "value3");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, "test" + i);
+ record.put(1, map);
+ record.put(2, i % 2 == 0);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, record.get(0));
+ expectedRecord.put(1, record.get(2));
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipArrayField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f1");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_array_before.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_array_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, "test" + i);
+ record.put(1, Arrays.asList("test" + i, "test" + i));
+ record.put(2, i % 2 == 0);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, record.get(0));
+ expectedRecord.put(1, record.get(2));
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipMultiFields() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f1");
+ skipFieldNames.add("f2");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("test_record.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_multi_fields_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, "test" + i);
+ record.put(1, i);
+ record.put(2, i % 2 == 0);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, record.get(0));
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipStringField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f2");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_base1.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_string_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ final byte[] testBytes = ("test" + i).getBytes();
+ GenericRecord record = new GenericData.Record(schema);
+ GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema());
+ fixed.bytes(testBytes);
+ record.put(0, fixed);
+ GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(schema.getField("f1").schema(), "TEST" + i % 2);
+ record.put(1, symbol);
+ record.put(2, "testtest" + i);
+ record.put(3, ByteBuffer.wrap(testBytes));
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, testBytes);
+ expectedRecord.put(1, (byte) i % 2);
+ expectedRecord.put(2, testBytes);
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipBytesField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f3");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_base1.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_bytes_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ final byte[] testBytes = ("test" + i).getBytes();
+ GenericRecord record = new GenericData.Record(schema);
+ GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema());
+ fixed.bytes(testBytes);
+ record.put(0, fixed);
+ GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(schema.getField("f1").schema(), "TEST" + i % 2);
+ record.put(1, symbol);
+ record.put(2, "testtest" + i);
+ record.put(3, ByteBuffer.wrap(testBytes));
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, testBytes);
+ expectedRecord.put(1, (byte) i % 2);
+ expectedRecord.put(2, record.get(2));
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipFixedField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f0");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_base1.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_fixed_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ final byte[] testBytes = ("test" + i).getBytes();
+ GenericRecord record = new GenericData.Record(schema);
+ GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema());
+ fixed.bytes(testBytes);
+ record.put(0, fixed);
+ GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(schema.getField("f1").schema(), "TEST" + i % 2);
+ record.put(1, symbol);
+ record.put(2, "testtest" + i);
+ record.put(3, ByteBuffer.wrap(testBytes));
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, (byte) i % 2);
+ expectedRecord.put(1, record.get(2));
+ expectedRecord.put(2, record.get(3));
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipEnumField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f1");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_base1.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_fixed_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ final byte[] testBytes = ("test" + i).getBytes();
+ GenericRecord record = new GenericData.Record(schema);
+ GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema());
+ fixed.bytes(testBytes);
+ record.put(0, fixed);
+ GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(schema.getField("f1").schema(), "TEST" + i % 2);
+ record.put(1, symbol);
+ record.put(2, "testtest" + i);
+ record.put(3, ByteBuffer.wrap(testBytes));
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, testBytes);
+ expectedRecord.put(1, record.get(2));
+ expectedRecord.put(2, record.get(3));
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipBooleanField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f0");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_base2.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_boolean_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0);
+ record.put(1, i);
+ record.put(2, (long) i);
+ record.put(3, (float) i);
+ record.put(4, (double) i);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, record.get(1));
+ expectedRecord.put(1, record.get(2));
+ expectedRecord.put(2, record.get(3));
+ expectedRecord.put(3, record.get(4));
+
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipIntField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f1");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_base2.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_int_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0);
+ record.put(1, i);
+ record.put(2, (long) i);
+ record.put(3, (float) i);
+ record.put(4, (double) i);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, record.get(0));
+ expectedRecord.put(1, record.get(2));
+ expectedRecord.put(2, record.get(3));
+ expectedRecord.put(3, record.get(4));
+
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipLongField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f2");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_base2.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_long_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0);
+ record.put(1, i);
+ record.put(2, (long) i);
+ record.put(3, (float) i);
+ record.put(4, (double) i);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, record.get(0));
+ expectedRecord.put(1, record.get(1));
+ expectedRecord.put(2, record.get(3));
+ expectedRecord.put(3, record.get(4));
+
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipFloatField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f3");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_base2.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_float_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0);
+ record.put(1, i);
+ record.put(2, (long) i);
+ record.put(3, (float) i);
+ record.put(4, (double) i);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, record.get(0));
+ expectedRecord.put(1, record.get(1));
+ expectedRecord.put(2, record.get(2));
+ expectedRecord.put(3, record.get(4));
+
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipDoubleField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f4");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_base2.avsc");
+ Schema expectedSchema = getSchema("skip/test_skip_double_expected.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0);
+ record.put(1, i);
+ record.put(2, (long) i);
+ record.put(3, (float) i);
+ record.put(4, (double) i);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, record.get(0));
+ expectedRecord.put(1, record.get(1));
+ expectedRecord.put(2, record.get(2));
+ expectedRecord.put(3, record.get(3));
+
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipRecordField() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f0");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("skip/test_skip_record_before.avsc");
+ Schema nestedSchema = schema.getFields().get(0).schema();
+ ArrayList<GenericRecord> data = new ArrayList<>();
+
+ Schema expectedSchema = getSchema("skip/test_skip_record_expected.avsc");
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+ nestedRecord.put(0, "test" + i);
+ nestedRecord.put(1, i);
+ record.put(0, nestedRecord);
+ record.put(1, i);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ expectedRecord.put(0, i);
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipNestedFields() throws Exception {
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f0.f0");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ Schema schema = getSchema("test_nested_record.avsc");
+ Schema nestedSchema = schema.getFields().get(0).schema();
+ ArrayList<GenericRecord> data = new ArrayList<>();
+
+ Schema expectedSchema = getSchema("skip/test_skip_second_level_expected.avsc");
+ Schema expectedNestedSchema = expectedSchema.getFields().get(0).schema();
+ ArrayList<GenericRecord> expectedData = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+ nestedRecord.put(0, "test" + i);
+ nestedRecord.put(1, i);
+ record.put(0, nestedRecord);
+ data.add(record);
+
+ GenericRecord expectedRecord = new GenericData.Record(expectedSchema);
+ GenericRecord expectedNestedRecord = new GenericData.Record(expectedNestedSchema);
+ expectedNestedRecord.put(0, nestedRecord.get(1));
+ expectedRecord.put(0, expectedNestedRecord);
+ expectedData.add(expectedRecord);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkNestedRecordResult(expectedSchema, expectedData, root);
+ }
+
+ @Test
+ public void testSkipThirdLevelField() throws Exception {
+ Schema firstLevelSchema = getSchema("skip/test_skip_third_level_expected.avsc");
+ Schema secondLevelSchema = firstLevelSchema.getFields().get(0).schema();
+ Schema thirdLevelSchema = secondLevelSchema.getFields().get(0).schema();
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord firstLevelRecord = new GenericData.Record(firstLevelSchema);
+ GenericRecord secondLevelRecord = new GenericData.Record(secondLevelSchema);
+ GenericRecord thirdLevelRecord = new GenericData.Record(thirdLevelSchema);
+
+ thirdLevelRecord.put(0, i);
+ thirdLevelRecord.put(1, "test" + i);
+ thirdLevelRecord.put(2, i % 2 == 0);
+
+ secondLevelRecord.put(0, thirdLevelRecord);
+ firstLevelRecord.put(0, secondLevelRecord);
+ data.add(firstLevelRecord);
+ }
+
+ // do not skip any fields first
+ VectorSchemaRoot root1 = writeAndRead(firstLevelSchema, data);
+
+ assertEquals(1, root1.getFieldVectors().size());
+ assertEquals(Types.MinorType.STRUCT, root1.getFieldVectors().get(0).getMinorType());
+ StructVector secondLevelVector = (StructVector) root1.getFieldVectors().get(0);
+ assertEquals(1, secondLevelVector.getChildrenFromFields().size());
+ assertEquals(Types.MinorType.STRUCT, secondLevelVector.getChildrenFromFields().get(0).getMinorType());
+ StructVector thirdLevelVector = (StructVector) secondLevelVector.getChildrenFromFields().get(0);
+ assertEquals(3, thirdLevelVector.getChildrenFromFields().size());
+
+ // skip third level field and validate
+ Set<String> skipFieldNames = new HashSet<>();
+ skipFieldNames.add("f0.f0.f0");
+ config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build();
+ VectorSchemaRoot root2 = writeAndRead(firstLevelSchema, data);
+
+ assertEquals(1, root2.getFieldVectors().size());
+ assertEquals(Types.MinorType.STRUCT, root2.getFieldVectors().get(0).getMinorType());
+ StructVector secondStruct = (StructVector) root2.getFieldVectors().get(0);
+ assertEquals(1, secondStruct.getChildrenFromFields().size());
+ assertEquals(Types.MinorType.STRUCT, secondStruct.getChildrenFromFields().get(0).getMinorType());
+ StructVector thirdStruct = (StructVector) secondStruct.getChildrenFromFields().get(0);
+ assertEquals(2, thirdStruct.getChildrenFromFields().size());
+
+ assertEquals(Types.MinorType.INT, thirdStruct.getChildrenFromFields().get(0).getMinorType());
+ assertEquals(Types.MinorType.BIT, thirdStruct.getChildrenFromFields().get(1).getMinorType());
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroTestBase.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroTestBase.java
new file mode 100644
index 000000000..a00cd7704
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroTestBase.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.util.Text;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+public class AvroTestBase {
+
+ @ClassRule
+ public static final TemporaryFolder TMP = new TemporaryFolder();
+
+ protected AvroToArrowConfig config;
+
+ @Before
+ public void init() {
+ BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ config = new AvroToArrowConfigBuilder(allocator).build();
+ }
+
+ protected Schema getSchema(String schemaName) throws Exception {
+ Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(),
+ "schema", schemaName);
+ return new Schema.Parser().parse(schemaPath.toFile());
+ }
+
+ protected VectorSchemaRoot writeAndRead(Schema schema, List data) throws Exception {
+ File dataFile = TMP.newFile();
+
+ BinaryEncoder
+ encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null);
+ DatumWriter writer = new GenericDatumWriter(schema);
+ BinaryDecoder
+ decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null);
+
+ for (Object value : data) {
+ writer.write(value, encoder);
+ }
+
+ return AvroToArrow.avroToArrow(schema, decoder, config);
+ }
+
+ protected void checkArrayResult(List<List<?>> expected, ListVector vector) {
+ assertEquals(expected.size(), vector.getValueCount());
+ for (int i = 0; i < expected.size(); i++) {
+ checkArrayElement(expected.get(i), vector.getObject(i));
+ }
+ }
+
+ protected void checkArrayElement(List expected, List actual) {
+ assertEquals(expected.size(), actual.size());
+ for (int i = 0; i < expected.size(); i++) {
+ Object value1 = expected.get(i);
+ Object value2 = actual.get(i);
+ if (value1 == null) {
+ assertTrue(value2 == null);
+ continue;
+ }
+ if (value2 instanceof byte[]) {
+ value2 = ByteBuffer.wrap((byte[]) value2);
+ } else if (value2 instanceof Text) {
+ value2 = value2.toString();
+ }
+ assertEquals(value1, value2);
+ }
+ }
+
+ protected void checkPrimitiveResult(List data, FieldVector vector) {
+ assertEquals(data.size(), vector.getValueCount());
+ for (int i = 0; i < data.size(); i++) {
+ Object value1 = data.get(i);
+ Object value2 = vector.getObject(i);
+ if (value1 == null) {
+ assertTrue(value2 == null);
+ continue;
+ }
+ if (value2 instanceof byte[]) {
+ value2 = ByteBuffer.wrap((byte[]) value2);
+ if (value1 instanceof byte[]) {
+ value1 = ByteBuffer.wrap((byte[]) value1);
+ }
+ } else if (value2 instanceof Text) {
+ value2 = value2.toString();
+ } else if (value2 instanceof Byte) {
+ value2 = ((Byte) value2).intValue();
+ }
+ assertEquals(value1, value2);
+ }
+ }
+
+ protected void checkRecordResult(Schema schema, ArrayList<GenericRecord> data, VectorSchemaRoot root) {
+ assertEquals(data.size(), root.getRowCount());
+ assertEquals(schema.getFields().size(), root.getFieldVectors().size());
+
+ for (int i = 0; i < schema.getFields().size(); i++) {
+ ArrayList fieldData = new ArrayList();
+ for (GenericRecord record : data) {
+ fieldData.add(record.get(i));
+ }
+
+ checkPrimitiveResult(fieldData, root.getFieldVectors().get(i));
+ }
+
+ }
+
+ protected void checkNestedRecordResult(Schema schema, List<GenericRecord> data, VectorSchemaRoot root) {
+ assertEquals(data.size(), root.getRowCount());
+ assertTrue(schema.getFields().size() == 1);
+
+ final Schema nestedSchema = schema.getFields().get(0).schema();
+ final StructVector structVector = (StructVector) root.getFieldVectors().get(0);
+
+ for (int i = 0; i < nestedSchema.getFields().size(); i++) {
+ ArrayList fieldData = new ArrayList();
+ for (GenericRecord record : data) {
+ GenericRecord nestedRecord = (GenericRecord) record.get(0);
+ fieldData.add(nestedRecord.get(i));
+ }
+
+ checkPrimitiveResult(fieldData, structVector.getChildrenFromFields().get(i));
+ }
+
+ }
+
+
+ // belows are for iterator api
+
+ protected void checkArrayResult(List<List<?>> expected, List<ListVector> vectors) {
+ int valueCount = vectors.stream().mapToInt(v -> v.getValueCount()).sum();
+ assertEquals(expected.size(), valueCount);
+
+ int index = 0;
+ for (ListVector vector : vectors) {
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ checkArrayElement(expected.get(index++), vector.getObject(i));
+ }
+ }
+ }
+
+ protected void checkRecordResult(Schema schema, ArrayList<GenericRecord> data, List<VectorSchemaRoot> roots) {
+ roots.forEach(root -> {
+ assertEquals(schema.getFields().size(), root.getFieldVectors().size());
+ });
+
+ for (int i = 0; i < schema.getFields().size(); i++) {
+ List fieldData = new ArrayList();
+ List<FieldVector> vectors = new ArrayList<>();
+ for (GenericRecord record : data) {
+ fieldData.add(record.get(i));
+ }
+ final int columnIndex = i;
+ roots.forEach(root -> vectors.add(root.getFieldVectors().get(columnIndex)));
+
+ checkPrimitiveResult(fieldData, vectors);
+ }
+
+ }
+
+ protected void checkPrimitiveResult(List data, List<FieldVector> vectors) {
+ int valueCount = vectors.stream().mapToInt(v -> v.getValueCount()).sum();
+ assertEquals(data.size(), valueCount);
+
+ int index = 0;
+ for (FieldVector vector : vectors) {
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ Object value1 = data.get(index++);
+ Object value2 = vector.getObject(i);
+ if (value1 == null) {
+ assertNull(value2);
+ continue;
+ }
+ if (value2 instanceof byte[]) {
+ value2 = ByteBuffer.wrap((byte[]) value2);
+ if (value1 instanceof byte[]) {
+ value1 = ByteBuffer.wrap((byte[]) value1);
+ }
+ } else if (value2 instanceof Text) {
+ value2 = value2.toString();
+ }
+ assertEquals(value1, value2);
+ }
+ }
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowIteratorTest.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowIteratorTest.java
new file mode 100644
index 000000000..2b05a19f3
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowIteratorTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+public class AvroToArrowIteratorTest extends AvroTestBase {
+
+ @Override
+ public void init() {
+ final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ this.config = new AvroToArrowConfigBuilder(allocator).setTargetBatchSize(3).build();
+ }
+
+ private AvroToArrowVectorIterator convert(Schema schema, List data) throws Exception {
+ File dataFile = TMP.newFile();
+
+ BinaryEncoder
+ encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null);
+ DatumWriter writer = new GenericDatumWriter(schema);
+ BinaryDecoder
+ decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null);
+
+ for (Object value : data) {
+ writer.write(value, encoder);
+ }
+
+ return AvroToArrow.avroToArrowIterator(schema, decoder, config);
+ }
+
+ @Test
+ public void testStringType() throws Exception {
+ Schema schema = getSchema("test_primitive_string.avsc");
+ List<String> data = Arrays.asList("v1", "v2", "v3", "v4", "v5");
+
+ List<VectorSchemaRoot> roots = new ArrayList<>();
+ List<FieldVector> vectors = new ArrayList<>();
+ try (AvroToArrowVectorIterator iterator = convert(schema, data)) {
+ while (iterator.hasNext()) {
+ VectorSchemaRoot root = iterator.next();
+ FieldVector vector = root.getFieldVectors().get(0);
+ roots.add(root);
+ vectors.add(vector);
+ }
+ }
+ checkPrimitiveResult(data, vectors);
+ AutoCloseables.close(roots);
+ }
+
+ @Test
+ public void testNullableStringType() throws Exception {
+ Schema schema = getSchema("test_nullable_string.avsc");
+
+ List<GenericRecord> data = new ArrayList<>();
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ String value = i % 2 == 0 ? "test" + i : null;
+ record.put(0, value);
+ expected.add(value);
+ data.add(record);
+ }
+
+ List<VectorSchemaRoot> roots = new ArrayList<>();
+ List<FieldVector> vectors = new ArrayList<>();
+ try (AvroToArrowVectorIterator iterator = convert(schema, data);) {
+ while (iterator.hasNext()) {
+ VectorSchemaRoot root = iterator.next();
+ FieldVector vector = root.getFieldVectors().get(0);
+ roots.add(root);
+ vectors.add(vector);
+ }
+ }
+ checkPrimitiveResult(expected, vectors);
+ AutoCloseables.close(roots);
+
+ }
+
+ @Test
+ public void testRecordType() throws Exception {
+ Schema schema = getSchema("test_record.avsc");
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, "test" + i);
+ record.put(1, i);
+ record.put(2, i % 2 == 0);
+ data.add(record);
+ }
+
+ List<VectorSchemaRoot> roots = new ArrayList<>();
+ try (AvroToArrowVectorIterator iterator = convert(schema, data)) {
+ while (iterator.hasNext()) {
+ roots.add(iterator.next());
+ }
+ }
+ checkRecordResult(schema, data, roots);
+ AutoCloseables.close(roots);
+
+ }
+
+ @Test
+ public void testArrayType() throws Exception {
+ Schema schema = getSchema("test_array.avsc");
+ List<List<?>> data = Arrays.asList(
+ Arrays.asList("11", "222", "999"),
+ Arrays.asList("12222", "2333", "1000"),
+ Arrays.asList("1rrr", "2ggg"),
+ Arrays.asList("1vvv", "2bbb"),
+ Arrays.asList("1fff", "2"));
+
+ List<VectorSchemaRoot> roots = new ArrayList<>();
+ List<ListVector> vectors = new ArrayList<>();
+ try (AvroToArrowVectorIterator iterator = convert(schema, data)) {
+ while (iterator.hasNext()) {
+ VectorSchemaRoot root = iterator.next();
+ roots.add(root);
+ vectors.add((ListVector) root.getFieldVectors().get(0));
+ }
+ }
+ checkArrayResult(data, vectors);
+ AutoCloseables.close(roots);
+ }
+
+ @Test
+ public void runLargeNumberOfRows() throws Exception {
+ Schema schema = getSchema("test_large_data.avsc");
+ int x = 0;
+ final int targetRows = 600000;
+ Decoder fakeDecoder = new FakeDecoder(targetRows);
+ try (AvroToArrowVectorIterator iter = AvroToArrow.avroToArrowIterator(schema, fakeDecoder,
+ new AvroToArrowConfigBuilder(config.getAllocator()).build())) {
+ while (iter.hasNext()) {
+ VectorSchemaRoot root = iter.next();
+ x += root.getRowCount();
+ root.close();
+ }
+ }
+
+ assertEquals(x, targetRows);
+ }
+
+ /**
+ * Fake avro decoder to test large data.
+ */
+ private class FakeDecoder extends Decoder {
+
+ private int numRows;
+
+ FakeDecoder(int numRows) {
+ this.numRows = numRows;
+ }
+
+ // note that Decoder has no hasNext() API, assume enum is the first type in schema
+ // and fixed is the last type in schema and they are unique.
+ private void validate() throws EOFException {
+ if (numRows <= 0) {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public void readNull() throws IOException {
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ return false;
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public Utf8 readString(Utf8 old) throws IOException {
+ return new Utf8("test123test123" + numRows);
+ }
+
+ @Override
+ public String readString() throws IOException {
+ return "test123test123" + numRows;
+ }
+
+ @Override
+ public void skipString() throws IOException {
+
+ }
+
+ @Override
+ public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+ return ByteBuffer.allocate(0);
+ }
+
+ @Override
+ public void skipBytes() throws IOException {
+
+ }
+
+ @Override
+ public void readFixed(byte[] bytes, int start, int length) throws IOException {
+ // fixed type is last column, after read value, decrease numRows
+ numRows--;
+ }
+
+ @Override
+ public void skipFixed(int length) throws IOException {
+
+ }
+
+ @Override
+ public int readEnum() throws IOException {
+ // enum type is first column, validate numRows first.
+ validate();
+ return 0;
+ }
+
+ @Override
+ public long readArrayStart() throws IOException {
+ return 5;
+ }
+
+ @Override
+ public long arrayNext() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long skipArray() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long readMapStart() throws IOException {
+ return 5;
+ }
+
+ @Override
+ public long mapNext() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long skipMap() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int readIndex() throws IOException {
+ return 0;
+ }
+ }
+}
diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
new file mode 100644
index 000000000..c007e1ac7
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
+
+public class AvroToArrowTest extends AvroTestBase {
+
+ @Test
+ public void testStringType() throws Exception {
+ Schema schema = getSchema("test_primitive_string.avsc");
+ List<String> data = Arrays.asList("v1", "v2", "v3", "v4", "v5");
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableStringType() throws Exception {
+ Schema schema = getSchema("test_nullable_string.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? "test" + i : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
+ }
+
+ @Test
+ public void testRecordType() throws Exception {
+ Schema schema = getSchema("test_record.avsc");
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, "test" + i);
+ record.put(1, i);
+ record.put(2, i % 2 == 0);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
+ }
+
+ @Test
+ public void testFixedAttributes() throws Exception {
+ Schema schema = getSchema("attrs/test_fixed_attr.avsc");
+
+ List<GenericData.Fixed> data = new ArrayList<>();
+ List<byte[]> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ byte[] value = ("value" + i).getBytes(StandardCharsets.UTF_8);
+ expected.add(value);
+ GenericData.Fixed fixed = new GenericData.Fixed(schema);
+ fixed.bytes(value);
+ data.add(fixed);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ Map<String, String> metadata = vector.getField().getMetadata();
+ assertEquals("fixed doc", metadata.get("doc"));
+ assertEquals("[\"alias1\",\"alias2\"]", metadata.get("aliases"));
+ }
+
+ @Test
+ public void testEnumAttributes() throws Exception {
+ Schema schema = getSchema("attrs/test_enum_attrs.avsc");
+ List<GenericData.EnumSymbol> data = Arrays.asList(
+ new GenericData.EnumSymbol(schema, "SPADES"),
+ new GenericData.EnumSymbol(schema, "HEARTS"),
+ new GenericData.EnumSymbol(schema, "DIAMONDS"),
+ new GenericData.EnumSymbol(schema, "CLUBS"),
+ new GenericData.EnumSymbol(schema, "SPADES"));
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ Map<String, String> metadata = vector.getField().getMetadata();
+ assertEquals("enum doc", metadata.get("doc"));
+ assertEquals("[\"alias1\",\"alias2\"]", metadata.get("aliases"));
+ }
+
+ @Test
+ public void testRecordAttributes() throws Exception {
+ Schema schema = getSchema("attrs/test_record_attrs.avsc");
+ Schema nestedSchema = schema.getFields().get(0).schema();
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+ nestedRecord.put(0, "test" + i);
+ nestedRecord.put(1, i);
+ record.put(0, nestedRecord);
+
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+
+ StructVector structVector = (StructVector) root.getFieldVectors().get(0);
+ Map<String, String> structMeta = structVector.getField().getMetadata();
+ Map<String, String> childMeta1 = structVector.getChildByOrdinal(0).getField().getMetadata();
+ Map<String, String> childMeta2 = structVector.getChildByOrdinal(1).getField().getMetadata();
+
+ assertEquals("f0 doc", structMeta.get("doc"));
+ assertEquals("[\"f0.a1\"]", structMeta.get("aliases"));
+ assertEquals("f1 doc", childMeta1.get("doc"));
+ assertEquals("[\"f1.a1\",\"f1.a2\"]", childMeta1.get("aliases"));
+ assertEquals("f2 doc", childMeta2.get("doc"));
+ assertEquals("[\"f2.a1\",\"f2.a2\"]", childMeta2.get("aliases"));
+ }
+
+ @Test
+ public void testNestedRecordType() throws Exception {
+ Schema schema = getSchema("test_nested_record.avsc");
+ Schema nestedSchema = schema.getFields().get(0).schema();
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+ nestedRecord.put(0, "test" + i);
+ nestedRecord.put(1, i);
+ record.put(0, nestedRecord);
+
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkNestedRecordResult(schema, data, root);
+ }
+
+ @Test
+ public void testEnumType() throws Exception {
+ Schema schema = getSchema("test_primitive_enum.avsc");
+ List<GenericData.EnumSymbol> data = Arrays.asList(
+ new GenericData.EnumSymbol(schema, "SPADES"),
+ new GenericData.EnumSymbol(schema, "HEARTS"),
+ new GenericData.EnumSymbol(schema, "DIAMONDS"),
+ new GenericData.EnumSymbol(schema, "CLUBS"),
+ new GenericData.EnumSymbol(schema, "SPADES"));
+
+ List<Integer> expectedIndices = Arrays.asList(0, 1, 2, 3, 0);
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(expectedIndices, vector);
+
+ VarCharVector dictVector = (VarCharVector) config.getProvider().lookup(0).getVector();
+ assertEquals(4, dictVector.getValueCount());
+
+ assertEquals("SPADES", dictVector.getObject(0).toString());
+ assertEquals("HEARTS", dictVector.getObject(1).toString());
+ assertEquals("DIAMONDS", dictVector.getObject(2).toString());
+ assertEquals("CLUBS", dictVector.getObject(3).toString());
+ }
+
+ @Test
+ public void testIntType() throws Exception {
+ Schema schema = getSchema("test_primitive_int.avsc");
+ List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableIntType() throws Exception {
+ Schema schema = getSchema("test_nullable_int.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? i : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
+ }
+
+ @Test
+ public void testLongType() throws Exception {
+ Schema schema = getSchema("test_primitive_long.avsc");
+ List<Long> data = Arrays.asList(1L, 2L, 3L, 4L, 5L);
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableLongType() throws Exception {
+ Schema schema = getSchema("test_nullable_long.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? (long) i : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
+ }
+
+ @Test
+ public void testFloatType() throws Exception {
+ Schema schema = getSchema("test_primitive_float.avsc");
+ List<Float> data = Arrays.asList(1.1f, 2.2f, 3.3f, 4.4f, 5.5f);
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableFloatType() throws Exception {
+ Schema schema = getSchema("test_nullable_float.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? i + 0.1f : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
+ }
+
+ @Test
+ public void testDoubleType() throws Exception {
+ Schema schema = getSchema("test_primitive_double.avsc");
+ List<Double> data = Arrays.asList(1.1, 2.2, 3.3, 4.4, 5.5);
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableDoubleType() throws Exception {
+ Schema schema = getSchema("test_nullable_double.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? i + 0.1 : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
+ }
+
+ @Test
+ public void testBytesType() throws Exception {
+ Schema schema = getSchema("test_primitive_bytes.avsc");
+ List<ByteBuffer> data = Arrays.asList(
+ ByteBuffer.wrap("value1".getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap("value2".getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap("value3".getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap("value4".getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap("value5".getBytes(StandardCharsets.UTF_8)));
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableBytesType() throws Exception {
+ Schema schema = getSchema("test_nullable_bytes.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? ByteBuffer.wrap(("test" + i).getBytes(StandardCharsets.UTF_8)) : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
+ }
+
+ @Test
+ public void testBooleanType() throws Exception {
+ Schema schema = getSchema("test_primitive_boolean.avsc");
+ List<Boolean> data = Arrays.asList(true, false, true, false, true);
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableBooleanType() throws Exception {
+ Schema schema = getSchema("test_nullable_boolean.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? true : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
+ }
+
+ @Test
+ public void testArrayType() throws Exception {
+ Schema schema = getSchema("test_array.avsc");
+ List<List<?>> data = Arrays.asList(
+ Arrays.asList("11", "222", "999"),
+ Arrays.asList("12222", "2333", "1000"),
+ Arrays.asList("1rrr", "2ggg"),
+ Arrays.asList("1vvv", "2bbb"),
+ Arrays.asList("1fff", "2"));
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkArrayResult(data, (ListVector) vector);
+ }
+
+ @Test
+ public void testMapType() throws Exception {
+ Schema schema = getSchema("test_map.avsc");
+
+ List keys = Arrays.asList("key1", "key2", "key3", "key4", "key5", "key6");
+ List vals = Arrays.asList("val1", "val2", "val3", "val4", "val5", "val6");
+
+ List<LinkedHashMap> data = new ArrayList<>();
+ LinkedHashMap map1 = new LinkedHashMap();
+ map1.put(keys.get(0), vals.get(0));
+ map1.put(keys.get(1), vals.get(1));
+ data.add(map1);
+
+ LinkedHashMap map2 = new LinkedHashMap();
+ map2.put(keys.get(2), vals.get(2));
+ map2.put(keys.get(3), vals.get(3));
+ data.add(map2);
+
+ LinkedHashMap map3 = new LinkedHashMap();
+ map3.put(keys.get(4), vals.get(4));
+ map3.put(keys.get(5), vals.get(5));
+ data.add(map3);
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ MapVector vector = (MapVector) root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0));
+ checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1));
+ assertEquals(0, vector.getOffsetBuffer().getInt(0));
+ assertEquals(2, vector.getOffsetBuffer().getInt(1 * 4));
+ assertEquals(4, vector.getOffsetBuffer().getInt(2 * 4));
+ assertEquals(6, vector.getOffsetBuffer().getInt(3 * 4));
+ }
+
+ @Test
+ public void testFixedType() throws Exception {
+ Schema schema = getSchema("test_fixed.avsc");
+
+ List<GenericData.Fixed> data = new ArrayList<>();
+ List<byte[]> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ byte[] value = ("value" + i).getBytes(StandardCharsets.UTF_8);
+ expected.add(value);
+ GenericData.Fixed fixed = new GenericData.Fixed(schema);
+ fixed.bytes(value);
+ data.add(fixed);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(expected, vector);
+ }
+
+ @Test
+ public void testUnionType() throws Exception {
+ Schema schema = getSchema("test_union.avsc");
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<Object> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? "test" + i : i);
+ expected.add(i % 2 == 0 ? "test" + i : i);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(expected, vector);
+ }
+
+ @Test
+ public void testNullableUnionType() throws Exception {
+ Schema schema = getSchema("test_nullable_union.avsc");
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<Object> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ if (i % 3 == 0) {
+ record.put(0, "test" + i);
+ expected.add("test" + i);
+ data.add(record);
+ } else if (i % 3 == 1) {
+ record.put(0, i);
+ expected.add(i);
+ data.add(record);
+ } else {
+ record.put(0, null);
+ expected.add(null);
+ data.add(record);
+ }
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(expected, vector);
+ }
+
+}
diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/TestWriteReadAvroRecord.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/TestWriteReadAvroRecord.java
new file mode 100644
index 000000000..bf695d193
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/TestWriteReadAvroRecord.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+
+public class TestWriteReadAvroRecord {
+
+ @ClassRule
+ public static final TemporaryFolder TMP = new TemporaryFolder();
+
+ @Test
+ public void testWriteAndRead() throws Exception {
+
+ File dataFile = TMP.newFile();
+ Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test.avsc");
+ Schema schema = new Schema.Parser().parse(schemaPath.toFile());
+
+ //write data to disk
+ GenericRecord user1 = new GenericData.Record(schema);
+ user1.put("name", "Alyssa");
+ user1.put("favorite_number", 256);
+
+ GenericRecord user2 = new GenericData.Record(schema);
+ user2.put("name", "Ben");
+ user2.put("favorite_number", 7);
+ user2.put("favorite_color", "red");
+
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
+ DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
+ dataFileWriter.create(schema, dataFile);
+ dataFileWriter.append(user1);
+ dataFileWriter.append(user2);
+ dataFileWriter.close();
+
+ //read data from disk
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
+ DataFileReader<GenericRecord>
+ dataFileReader = new DataFileReader<GenericRecord>(dataFile, datumReader);
+ List<GenericRecord> result = new ArrayList<>();
+ while (dataFileReader.hasNext()) {
+ GenericRecord user = dataFileReader.next();
+ result.add(user);
+ }
+
+ assertEquals(2, result.size());
+ GenericRecord deUser1 = result.get(0);
+ assertEquals("Alyssa", deUser1.get("name").toString());
+ assertEquals(256, deUser1.get("favorite_number"));
+ assertEquals(null, deUser1.get("favorite_color"));
+
+ GenericRecord deUser2 = result.get(1);
+ assertEquals("Ben", deUser2.get("name").toString());
+ assertEquals(7, deUser2.get("favorite_number"));
+ assertEquals("red", deUser2.get("favorite_color").toString());
+ }
+
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_enum_attrs.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_enum_attrs.avsc
new file mode 100644
index 000000000..afd00b8d9
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_enum_attrs.avsc
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "type": "enum",
+ "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"],
+ "name": "testEnum",
+ "doc" : "enum doc",
+ "aliases" : ["alias1", "alias2"]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_fixed_attr.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_fixed_attr.avsc
new file mode 100644
index 000000000..55e504def
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_fixed_attr.avsc
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "type": "fixed",
+ "size": 6,
+ "name": "testFixed",
+ "doc" : "fixed doc",
+ "aliases" : ["alias1", "alias2"]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_record_attrs.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_record_attrs.avsc
new file mode 100644
index 000000000..2e2e311a9
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_record_attrs.avsc
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testAttrs",
+ "fields": [
+ {
+ "name" : "f0",
+ "type" : {
+ "type" : "record",
+ "name" : "nestedInRecord",
+ "doc" : "f0 doc",
+ "aliases" : ["f0.a1"],
+ "fields": [
+ {"name": "f1", "type": "string", "doc": "f1 doc", "aliases" : ["f1.a1", "f1.a2"]},
+ {"name": "f2", "type": "int", "doc": "f2 doc", "aliases" : ["f2.a1", "f2.a2"]}
+ ]
+ }
+ }
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_date.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_date.avsc
new file mode 100644
index 000000000..f661e6506
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_date.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "name": "test",
+ "type": "int",
+ "logicalType" : "date"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid1.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid1.avsc
new file mode 100644
index 000000000..18d7d63fc
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid1.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "name": "test",
+ "type": "bytes",
+ "logicalType" : "decimal",
+ "precision": 39,
+ "scale": 2
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid2.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid2.avsc
new file mode 100644
index 000000000..eed7bd781
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid2.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "name": "test",
+ "type": "bytes",
+ "logicalType" : "decimal",
+ "precision": 20,
+ "scale": -1
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid3.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid3.avsc
new file mode 100644
index 000000000..1667b8aff
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid3.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "name": "test",
+ "type": "bytes",
+ "logicalType" : "decimal",
+ "precision": 20,
+ "scale": 40
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid4.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid4.avsc
new file mode 100644
index 000000000..e1f710416
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid4.avsc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "name": "test",
+ "type": "fixed",
+ "size" : 1,
+ "logicalType" : "decimal",
+ "precision": 30,
+ "scale": 2
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_bytes.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_bytes.avsc
new file mode 100644
index 000000000..944b5d85d
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_bytes.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "name": "test",
+ "type": "bytes",
+ "logicalType" : "decimal",
+ "precision": 10,
+ "scale": 2
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_fixed.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_fixed.avsc
new file mode 100644
index 000000000..1901f90a9
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_fixed.avsc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "name": "test",
+ "type": "fixed",
+ "size" : 10,
+ "logicalType" : "decimal",
+ "precision": 10,
+ "scale": 2
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_micros.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_micros.avsc
new file mode 100644
index 000000000..ee7d4e937
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_micros.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "name": "test",
+ "type": "long",
+ "logicalType" : "time-micros"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_millis.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_millis.avsc
new file mode 100644
index 000000000..54877babc
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_millis.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "name": "test",
+ "type": "int",
+ "logicalType" : "time-millis"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_micros.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_micros.avsc
new file mode 100644
index 000000000..15c0bf53d
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_micros.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "name": "test",
+ "type": "long",
+ "logicalType" : "timestamp-micros"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_millis.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_millis.avsc
new file mode 100644
index 000000000..822a2c360
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_millis.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "name": "test",
+ "type": "long",
+ "logicalType" : "timestamp-millis"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_before.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_before.avsc
new file mode 100644
index 000000000..e836aa768
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_before.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {"name": "f0", "type": "string"},
+ {"name": "f1", "type": {"type" : "array", "items": "string"}},
+ {"name": "f2", "type": "boolean"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_expected.avsc
new file mode 100644
index 000000000..36e7fdfb0
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_expected.avsc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {"name": "f0", "type": "string"},
+ {"name": "f2", "type": "boolean"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base1.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base1.avsc
new file mode 100644
index 000000000..5338253f4
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base1.avsc
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f0", "type": {"type" : "fixed", "size":5, "name" : "fix"}},
+ {"name": "f1", "type": {"type" : "enum", "name" : "enum", "symbols": ["TEST0", "TEST1"]}},
+ {"name": "f2", "type": "string"},
+ {"name": "f3", "type": "bytes"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base2.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base2.avsc
new file mode 100644
index 000000000..50655a70e
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base2.avsc
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f0", "type": "boolean"},
+ {"name": "f1", "type": "int"},
+ {"name": "f2", "type": "long"},
+ {"name": "f3", "type": "float"},
+ {"name": "f4", "type": "double"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_boolean_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_boolean_expected.avsc
new file mode 100644
index 000000000..9b62e3149
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_boolean_expected.avsc
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f1", "type": "int"},
+ {"name": "f2", "type": "long"},
+ {"name": "f3", "type": "float"},
+ {"name": "f4", "type": "double"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_bytes_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_bytes_expected.avsc
new file mode 100644
index 000000000..8a1903b34
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_bytes_expected.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f0", "type": {"type" : "fixed", "size":5, "name" : "fix"}},
+ {"name": "f1", "type": {"type" : "enum", "name" : "enum", "symbols": ["TEST0", "TEST1"]}},
+ {"name": "f2", "type": "string"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_double_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_double_expected.avsc
new file mode 100644
index 000000000..6021c4454
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_double_expected.avsc
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f0", "type": "boolean"},
+ {"name": "f1", "type": "int"},
+ {"name": "f2", "type": "long"},
+ {"name": "f3", "type": "float"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_enum_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_enum_expected.avsc
new file mode 100644
index 000000000..f5ed86a28
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_enum_expected.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f0", "type": {"type" : "fixed", "size":5, "name" : "fix"}},
+ {"name": "f2", "type": "string"},
+ {"name": "f3", "type": "bytes"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_fixed_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_fixed_expected.avsc
new file mode 100644
index 000000000..5423a7977
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_fixed_expected.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f1", "type": {"type" : "enum", "name" : "enum", "symbols": ["TEST0", "TEST1"]}},
+ {"name": "f2", "type": "string"},
+ {"name": "f3", "type": "bytes"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_float_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_float_expected.avsc
new file mode 100644
index 000000000..dea106331
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_float_expected.avsc
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f0", "type": "boolean"},
+ {"name": "f1", "type": "int"},
+ {"name": "f2", "type": "long"},
+ {"name": "f4", "type": "double"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_int_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_int_expected.avsc
new file mode 100644
index 000000000..53d4f1025
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_int_expected.avsc
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f0", "type": "boolean"},
+ {"name": "f2", "type": "long"},
+ {"name": "f3", "type": "float"},
+ {"name": "f4", "type": "double"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_long_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_long_expected.avsc
new file mode 100644
index 000000000..bf16601dd
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_long_expected.avsc
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f0", "type": "boolean"},
+ {"name": "f1", "type": "int"},
+ {"name": "f3", "type": "float"},
+ {"name": "f4", "type": "double"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_before.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_before.avsc
new file mode 100644
index 000000000..8cbb1a1d7
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_before.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {"name": "f0", "type": "string"},
+ {"name": "f1", "type": {"type" : "map", "values": "string"}},
+ {"name": "f2", "type": "boolean"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_expected.avsc
new file mode 100644
index 000000000..36e7fdfb0
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_expected.avsc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {"name": "f0", "type": "string"},
+ {"name": "f2", "type": "boolean"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_multi_fields_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_multi_fields_expected.avsc
new file mode 100644
index 000000000..b5d637b1d
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_multi_fields_expected.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testSkip",
+ "fields": [
+ {"name": "f0", "type": "string"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_before.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_before.avsc
new file mode 100644
index 000000000..7aee92b92
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_before.avsc
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {
+ "name" : "f0",
+ "type" : {
+ "type" : "record",
+ "name" : "nestedInRecord",
+ "fields": [
+ {"name": "f00", "type": "string"},
+ {"name": "f01", "type": "int"}
+ ]
+ }
+ },
+ {
+ "name" : "f1", "type" : "int"
+ }
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_expected.avsc
new file mode 100644
index 000000000..3e2495203
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_expected.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "test",
+ "fields": [
+ { "name" : "f1", "type" : "int"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_second_level_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_second_level_expected.avsc
new file mode 100644
index 000000000..f3b7f8c09
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_second_level_expected.avsc
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testSkipNested",
+ "fields": [
+ {
+ "name" : "nested",
+ "type" : {
+ "type" : "record",
+ "name" : "nestedInRecord",
+ "fields": [
+ {"name": "f1", "type": "int"}
+ ]
+ }
+ }
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_single_field_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_single_field_expected.avsc
new file mode 100644
index 000000000..553525847
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_single_field_expected.avsc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testSkip",
+ "fields": [
+ {"name": "f0", "type": "string"},
+ {"name": "f2", "type": "boolean"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_string_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_string_expected.avsc
new file mode 100644
index 000000000..2d2c08174
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_string_expected.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f0", "type": {"type" : "fixed", "size":5, "name" : "fix"}},
+ {"name": "f1", "type": {"type" : "enum", "name" : "enum", "symbols": ["TEST0", "TEST1"]}},
+ {"name": "f3", "type": "bytes"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_third_level_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_third_level_expected.avsc
new file mode 100644
index 000000000..6f42da893
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_third_level_expected.avsc
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "firstLevel",
+ "fields": [
+ {
+ "name" : "f0",
+ "type" : {
+ "type" : "record",
+ "name" : "secondLevel",
+ "fields": [
+ {
+ "name" : "f0",
+ "type" : {
+ "type" : "record",
+ "name" : "thirdLevel",
+ "fields" : [
+ {"name": "f1", "type": "int"},
+ {"name": "f0", "type": "string"},
+ {"name": "f2", "type": "boolean"}
+ ]
+ }
+ }
+ ]
+ }
+ }
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_before.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_before.avsc
new file mode 100644
index 000000000..fc1105911
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_before.avsc
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {"name": "f0", "type": ["string"]},
+ {"name": "f1", "type": ["string", "null"]},
+ {"name": "f2", "type": ["string", "int"]},
+ {"name": "f3", "type": "int"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_multi_fields_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_multi_fields_expected.avsc
new file mode 100644
index 000000000..308e027a2
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_multi_fields_expected.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {"name": "f0", "type": ["string"]},
+ {"name": "f1", "type": ["string", "null"]},
+ {"name": "f3", "type": "int"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_nullable_field_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_nullable_field_expected.avsc
new file mode 100644
index 000000000..cbc83e566
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_nullable_field_expected.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {"name": "f0", "type": ["string"]},
+ {"name": "f2", "type": ["string", "int"]},
+ {"name": "f3", "type": "int"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_one_field_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_one_field_expected.avsc
new file mode 100644
index 000000000..0f72fb432
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_one_field_expected.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {"name": "f1", "type": ["string", "null"]},
+ {"name": "f2", "type": ["string", "int"]},
+ {"name": "f3", "type": ["string", "int"]}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test.avsc
new file mode 100644
index 000000000..92c0873de
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_array.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_array.avsc
new file mode 100644
index 000000000..5b75a4031
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_array.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "array",
+ "items": "string",
+ "name": "testArray"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_fixed.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_fixed.avsc
new file mode 100644
index 000000000..a4d96e9ab
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_fixed.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "fixed",
+ "size": 6,
+ "name": "testFixed"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_large_data.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_large_data.avsc
new file mode 100644
index 000000000..f784ae623
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_large_data.avsc
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testLargeData",
+ "fields": [
+ {
+ "name": "f0",
+ "type": {
+ "name" : "f0",
+ "type" : "enum",
+ "symbols" : ["value1", "value2", "value3", "value4", "value5"]
+ }
+ },
+ {
+ "name" : "f1",
+ "type" : {
+ "type" : "record",
+ "name" : "nestedRecord",
+ "fields": [
+ {"name": "f1_0", "type": "string"},
+ {"name": "f1_1", "type": "int"}
+ ]
+ }
+ },
+
+ {"name": "f2", "type": "string"},
+ {"name": "f3", "type": "int"},
+ {"name": "f4", "type": "boolean"},
+ {"name": "f5", "type": "float"},
+ {"name": "f6", "type": "double"},
+ {"name": "f7", "type": "bytes"},
+ {"name": "f8", "type": ["string", "int"]},
+ {
+ "name": "f9",
+ "type": {
+ "name" : "f9",
+ "type" : "array",
+ "items" : "string"
+ }
+ },
+ {
+ "name": "f10",
+ "type": {
+ "name" : "f10",
+ "type" : "map",
+ "values" : "string"
+ }
+ },
+ {
+ "name": "f11",
+ "type": {
+ "type" : "fixed",
+ "name" : "f11",
+ "size" : 5
+ }
+ }
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_map.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_map.avsc
new file mode 100644
index 000000000..0dfa3a595
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_map.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "map",
+ "values": "string",
+ "name": "testMap"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nested_record.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nested_record.avsc
new file mode 100644
index 000000000..29dddfd1a
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nested_record.avsc
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testNestedRecord",
+ "fields": [
+ {
+ "name" : "f0",
+ "type" : {
+ "type" : "record",
+ "name" : "nestedInRecord",
+ "fields": [
+ {"name": "f0", "type": "string"},
+ {"name": "f1", "type": "int"}
+ ]
+ }
+ }
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc
new file mode 100644
index 000000000..62af1a85d
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableBoolean",
+ "fields": [
+ {"name": "f0", "type": ["null", "boolean"]}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc
new file mode 100644
index 000000000..002bc7ce2
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableBytes",
+ "fields": [
+ {"name": "f0", "type": ["null", "bytes"]}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc
new file mode 100644
index 000000000..642b7aa16
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableDouble",
+ "fields": [
+ {"name": "f0", "type": ["null", "double"]}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc
new file mode 100644
index 000000000..dff285909
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableFloat",
+ "fields": [
+ {"name": "f0", "type": ["null", "float"]}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc
new file mode 100644
index 000000000..abb2fc48a
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableInt",
+ "fields": [
+ {"name": "f0", "type": ["null", "int"]}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc
new file mode 100644
index 000000000..0624d2737
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableLong",
+ "fields": [
+ {"name": "f0", "type": ["null", "long"]}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc
new file mode 100644
index 000000000..347808ce6
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableString",
+ "fields": [
+ {"name": "f0", "type": ["null", "string"]}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc
new file mode 100644
index 000000000..af94812d7
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testNullableUnions",
+ "fields": [
+ {"name": "f0", "type": ["string", "int", "null"]}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc
new file mode 100644
index 000000000..7652ce723
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "boolean",
+ "name": "TestBoolean"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc
new file mode 100644
index 000000000..5102430b6
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "bytes",
+ "name": "TestBytes"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc
new file mode 100644
index 000000000..d1ae0b605
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "double",
+ "name": "TestDouble"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_enum.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_enum.avsc
new file mode 100644
index 000000000..bd8df6102
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_enum.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "enum",
+ "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"],
+ "name": "testEnum"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc
new file mode 100644
index 000000000..675d1090d
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "float",
+ "name": "TestFloat"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc
new file mode 100644
index 000000000..8fc848828
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "int",
+ "name": "TestInt"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc
new file mode 100644
index 000000000..b9706107c
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "long",
+ "name": "TestLong"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc
new file mode 100644
index 000000000..b4a89a7f6
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "string",
+ "name": "TestString"
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_record.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_record.avsc
new file mode 100644
index 000000000..e83cf1180
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_record.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testRecord",
+ "fields": [
+ {"name": "f0", "type": "string"},
+ {"name": "f1", "type": "int"},
+ {"name": "f2", "type": "boolean"}
+ ]
+}
diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_union.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_union.avsc
new file mode 100644
index 000000000..f181e36e3
--- /dev/null
+++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_union.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testUnions",
+ "fields": [
+ {"name": "f0", "type": ["string", "int"]}
+ ]
+}