diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/java/adapter/avro | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/java/adapter/avro')
100 files changed, 6275 insertions, 0 deletions
diff --git a/src/arrow/java/adapter/avro/pom.xml b/src/arrow/java/adapter/avro/pom.xml new file mode 100644 index 000000000..1f3fea849 --- /dev/null +++ b/src/arrow/java/adapter/avro/pom.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-java-root</artifactId> + <version>6.0.1</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>arrow-avro</artifactId> + <name>Arrow AVRO Adapter</name> + <description>(Contrib/Experimental) A library for converting Avro data to Arrow data.</description> + <url>http://maven.apache.org</url> + + <dependencies> + + <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory-core --> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-core</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory-netty --> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-vector --> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${dep.avro.version}</version> + </dependency> + </dependencies> + +</project> diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java new file mode 100644 index 000000000..9fb5ce291 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import java.io.IOException; + +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.avro.Schema; +import org.apache.avro.io.Decoder; + +/** + * Utility class to convert Avro objects to columnar Arrow format objects. + */ +public class AvroToArrow { + + /** + * Fetch the data from {@link Decoder} and convert it to Arrow objects. + * Only for testing purpose. + * @param schema avro schema. + * @param decoder avro decoder + * @param config configuration of the conversion. + * @return Arrow Data Objects {@link VectorSchemaRoot} + */ + static VectorSchemaRoot avroToArrow(Schema schema, Decoder decoder, AvroToArrowConfig config) + throws IOException { + Preconditions.checkNotNull(schema, "Avro schema object can not be null"); + Preconditions.checkNotNull(decoder, "Avro decoder object can not be null"); + Preconditions.checkNotNull(config, "config can not be null"); + + return AvroToArrowUtils.avroToArrowVectors(schema, decoder, config); + } + + /** + * Fetch the data from {@link Decoder} and iteratively convert it to Arrow objects. + * @param schema avro schema + * @param decoder avro decoder + * @param config configuration of the conversion. + * @throws IOException on error + */ + public static AvroToArrowVectorIterator avroToArrowIterator( + Schema schema, + Decoder decoder, + AvroToArrowConfig config) throws IOException { + + Preconditions.checkNotNull(schema, "Avro schema object can not be null"); + Preconditions.checkNotNull(decoder, "Avro decoder object can not be null"); + Preconditions.checkNotNull(config, "config can not be null"); + + return AvroToArrowVectorIterator.create(decoder, schema, config); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfig.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfig.java new file mode 100644 index 000000000..4f59ef384 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfig.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import java.util.Set; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.dictionary.DictionaryProvider; + +/** + * This class configures the Avro-to-Arrow conversion process. + */ +public class AvroToArrowConfig { + + private final BufferAllocator allocator; + /** + * The maximum rowCount to read each time when partially convert data. + * Default value is 1024 and -1 means read all data into one vector. + */ + private final int targetBatchSize; + + /** + * The dictionary provider used for enum type. + * If avro schema has enum type, will create dictionary and update this provider. + */ + private final DictionaryProvider.MapDictionaryProvider provider; + + /** + * The field names which to skip when reading decoder values. + */ + private final Set<String> skipFieldNames; + + /** + * Instantiate an instance. + * @param allocator The memory allocator to construct the Arrow vectors with. + * @param targetBatchSize The maximum rowCount to read each time when partially convert data. + * @param provider The dictionary provider used for enum type, adapter will update this provider. + * @param skipFieldNames Field names which to skip. + */ + AvroToArrowConfig( + BufferAllocator allocator, + int targetBatchSize, + DictionaryProvider.MapDictionaryProvider provider, + Set<String> skipFieldNames) { + + Preconditions.checkArgument(targetBatchSize == AvroToArrowVectorIterator.NO_LIMIT_BATCH_SIZE || + targetBatchSize > 0, "invalid targetBatchSize: %s", targetBatchSize); + + this.allocator = allocator; + this.targetBatchSize = targetBatchSize; + this.provider = provider; + this.skipFieldNames = skipFieldNames; + } + + public BufferAllocator getAllocator() { + return allocator; + } + + public int getTargetBatchSize() { + return targetBatchSize; + } + + public DictionaryProvider.MapDictionaryProvider getProvider() { + return provider; + } + + public Set<String> getSkipFieldNames() { + return skipFieldNames; + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfigBuilder.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfigBuilder.java new file mode 100644 index 000000000..474c1eb5c --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfigBuilder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.dictionary.DictionaryProvider; + +/** + * This class builds {@link AvroToArrowConfig}s. + */ +public class AvroToArrowConfigBuilder { + + private BufferAllocator allocator; + + private int targetBatchSize; + + private DictionaryProvider.MapDictionaryProvider provider; + + private Set<String> skipFieldNames; + + /** + * Default constructor for the {@link AvroToArrowConfigBuilder}. + */ + public AvroToArrowConfigBuilder(BufferAllocator allocator) { + this.allocator = allocator; + this.targetBatchSize = AvroToArrowVectorIterator.DEFAULT_BATCH_SIZE; + this.provider = new DictionaryProvider.MapDictionaryProvider(); + this.skipFieldNames = new HashSet<>(); + } + + public AvroToArrowConfigBuilder setTargetBatchSize(int targetBatchSize) { + this.targetBatchSize = targetBatchSize; + return this; + } + + public AvroToArrowConfigBuilder setProvider(DictionaryProvider.MapDictionaryProvider provider) { + this.provider = provider; + return this; + } + + public AvroToArrowConfigBuilder setSkipFieldNames(Set<String> skipFieldNames) { + this.skipFieldNames = skipFieldNames; + return this; + } + + /** + * This builds the {@link AvroToArrowConfig} from the provided params. + */ + public AvroToArrowConfig build() { + return new AvroToArrowConfig( + allocator, + targetBatchSize, + provider, + skipFieldNames); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java new file mode 100644 index 000000000..80293c8b8 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -0,0 +1,805 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE; +import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.arrow.consumers.AvroArraysConsumer; +import org.apache.arrow.consumers.AvroBooleanConsumer; +import org.apache.arrow.consumers.AvroBytesConsumer; +import org.apache.arrow.consumers.AvroDoubleConsumer; +import org.apache.arrow.consumers.AvroEnumConsumer; +import org.apache.arrow.consumers.AvroFixedConsumer; +import org.apache.arrow.consumers.AvroFloatConsumer; +import org.apache.arrow.consumers.AvroIntConsumer; +import org.apache.arrow.consumers.AvroLongConsumer; +import org.apache.arrow.consumers.AvroMapConsumer; +import org.apache.arrow.consumers.AvroNullConsumer; +import org.apache.arrow.consumers.AvroStringConsumer; +import org.apache.arrow.consumers.AvroStructConsumer; +import org.apache.arrow.consumers.AvroUnionsConsumer; +import org.apache.arrow.consumers.CompositeAvroConsumer; +import org.apache.arrow.consumers.Consumer; +import org.apache.arrow.consumers.SkipConsumer; +import org.apache.arrow.consumers.SkipFunction; +import org.apache.arrow.consumers.logical.AvroDateConsumer; +import org.apache.arrow.consumers.logical.AvroDecimalConsumer; +import org.apache.arrow.consumers.logical.AvroTimeMicroConsumer; +import org.apache.arrow.consumers.logical.AvroTimeMillisConsumer; +import org.apache.arrow.consumers.logical.AvroTimestampMicrosConsumer; +import org.apache.arrow.consumers.logical.AvroTimestampMillisConsumer; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.BaseIntVector; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.NullVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.complex.UnionVector; +import org.apache.arrow.vector.dictionary.Dictionary; +import org.apache.arrow.vector.dictionary.DictionaryEncoder; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.UnionMode; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.util.JsonStringArrayList; +import org.apache.arrow.vector.util.ValueVectorUtility; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.io.Decoder; + +/** + * Class that does most of the work to convert Avro data into Arrow columnar format Vector objects. + */ +public class AvroToArrowUtils { + + /** + * Creates a {@link Consumer} from the {@link Schema} + * + <p>This method currently performs following type mapping for Avro data types to corresponding Arrow data types. + * + * <ul> + * <li>STRING --> ArrowType.Utf8</li> + * <li>INT --> ArrowType.Int(32, signed)</li> + * <li>LONG --> ArrowType.Int(64, signed)</li> + * <li>FLOAT --> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)</li> + * <li>DOUBLE --> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)</li> + * <li>BOOLEAN --> ArrowType.Bool</li> + * <li>BYTES --> ArrowType.Binary</li> + * <li>ARRAY --> ArrowType.List</li> + * <li>MAP --> ArrowType.Map</li> + * <li>FIXED --> ArrowType.FixedSizeBinary</li> + * <li>RECORD --> ArrowType.Struct</li> + * <li>UNION --> ArrowType.Union</li> + * <li>ENUM--> ArrowType.Int</li> + * <li>DECIMAL --> ArrowType.Decimal</li> + * <li>Date --> ArrowType.Date(DateUnit.DAY)</li> + * <li>TimeMillis --> ArrowType.Time(TimeUnit.MILLISECOND, 32)</li> + * <li>TimeMicros --> ArrowType.Time(TimeUnit.MICROSECOND, 64)</li> + * <li>TimestampMillis --> ArrowType.Timestamp(TimeUnit.MILLISECOND, null)</li> + * <li>TimestampMicros --> ArrowType.Timestamp(TimeUnit.MICROSECOND, null)</li> + * </ul> + */ + + private static Consumer createConsumer(Schema schema, String name, AvroToArrowConfig config) { + return createConsumer(schema, name, false, config, null); + } + + private static Consumer createConsumer(Schema schema, String name, AvroToArrowConfig config, FieldVector vector) { + return createConsumer(schema, name, false, config, vector); + } + + /** + * Create a consumer with the given Avro schema. + * + * @param schema avro schema + * @param name arrow field name + * @param consumerVector vector to keep in consumer, if v == null, will create a new vector via field. + * @return consumer + */ + private static Consumer createConsumer( + Schema schema, + String name, + boolean nullable, + AvroToArrowConfig config, + FieldVector consumerVector) { + + Preconditions.checkNotNull(schema, "Avro schema object can't be null"); + Preconditions.checkNotNull(config, "Config can't be null"); + + final BufferAllocator allocator = config.getAllocator(); + + final Type type = schema.getType(); + final LogicalType logicalType = schema.getLogicalType(); + + final ArrowType arrowType; + final FieldType fieldType; + final FieldVector vector; + final Consumer consumer; + + switch (type) { + case UNION: + consumer = createUnionConsumer(schema, name, config, consumerVector); + break; + case ARRAY: + consumer = createArrayConsumer(schema, name, config, consumerVector); + break; + case MAP: + consumer = createMapConsumer(schema, name, config, consumerVector); + break; + case RECORD: + consumer = createStructConsumer(schema, name, config, consumerVector); + break; + case ENUM: + consumer = createEnumConsumer(schema, name, config, consumerVector); + break; + case STRING: + arrowType = new ArrowType.Utf8(); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroStringConsumer((VarCharVector) vector); + break; + case FIXED: + Map<String, String> extProps = createExternalProps(schema); + if (logicalType instanceof LogicalTypes.Decimal) { + arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema, extProps)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroDecimalConsumer.FixedDecimalConsumer((DecimalVector) vector, schema.getFixedSize()); + } else { + arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize()); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema, extProps)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroFixedConsumer((FixedSizeBinaryVector) vector, schema.getFixedSize()); + } + break; + case INT: + if (logicalType instanceof LogicalTypes.Date) { + arrowType = new ArrowType.Date(DateUnit.DAY); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroDateConsumer((DateDayVector) vector); + } else if (logicalType instanceof LogicalTypes.TimeMillis) { + arrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroTimeMillisConsumer((TimeMilliVector) vector); + } else { + arrowType = new ArrowType.Int(32, /*signed=*/true); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroIntConsumer((IntVector) vector); + } + break; + case BOOLEAN: + arrowType = new ArrowType.Bool(); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroBooleanConsumer((BitVector) vector); + break; + case LONG: + if (logicalType instanceof LogicalTypes.TimeMicros) { + arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroTimeMicroConsumer((TimeMicroVector) vector); + } else if (logicalType instanceof LogicalTypes.TimestampMillis) { + arrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroTimestampMillisConsumer((TimeStampMilliVector) vector); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroTimestampMicrosConsumer((TimeStampMicroVector) vector); + } else { + arrowType = new ArrowType.Int(64, /*signed=*/true); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroLongConsumer((BigIntVector) vector); + } + break; + case FLOAT: + arrowType = new ArrowType.FloatingPoint(SINGLE); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroFloatConsumer((Float4Vector) vector); + break; + case DOUBLE: + arrowType = new ArrowType.FloatingPoint(DOUBLE); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroDoubleConsumer((Float8Vector) vector); + break; + case BYTES: + if (logicalType instanceof LogicalTypes.Decimal) { + arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroDecimalConsumer.BytesDecimalConsumer((DecimalVector) vector); + } else { + arrowType = new ArrowType.Binary(); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = createVector(consumerVector, fieldType, name, allocator); + consumer = new AvroBytesConsumer((VarBinaryVector) vector); + } + break; + case NULL: + arrowType = new ArrowType.Null(); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = fieldType.createNewSingleVector(name, allocator, /*schemaCallback=*/null); + consumer = new AvroNullConsumer((NullVector) vector); + break; + default: + // no-op, shouldn't get here + throw new UnsupportedOperationException("Can't convert avro type %s to arrow type." + type.getName()); + } + return consumer; + } + + private static ArrowType createDecimalArrowType(LogicalTypes.Decimal logicalType) { + final int scale = logicalType.getScale(); + final int precision = logicalType.getPrecision(); + Preconditions.checkArgument(precision > 0 && precision <= 38, + "Precision must be in range of 1 to 38"); + Preconditions.checkArgument(scale >= 0 && scale <= 38, + "Scale must be in range of 0 to 38."); + Preconditions.checkArgument(scale <= precision, + "Invalid decimal scale: %s (greater than precision: %s)", scale, precision); + + return new ArrowType.Decimal(precision, scale, 128); + + } + + private static Consumer createSkipConsumer(Schema schema) { + + SkipFunction skipFunction; + Type type = schema.getType(); + + switch (type) { + case UNION: + List<Consumer> unionDelegates = schema.getTypes().stream().map(s -> + createSkipConsumer(s)).collect(Collectors.toList()); + skipFunction = decoder -> unionDelegates.get(decoder.readInt()).consume(decoder); + + break; + case ARRAY: + Consumer elementDelegate = createSkipConsumer(schema.getElementType()); + skipFunction = decoder -> { + for (long i = decoder.skipArray(); i != 0; i = decoder.skipArray()) { + for (long j = 0; j < i; j++) { + elementDelegate.consume(decoder); + } + } + }; + break; + case MAP: + Consumer valueDelegate = createSkipConsumer(schema.getValueType()); + skipFunction = decoder -> { + for (long i = decoder.skipMap(); i != 0; i = decoder.skipMap()) { + for (long j = 0; j < i; j++) { + decoder.skipString(); // Discard key + valueDelegate.consume(decoder); + } + } + }; + break; + case RECORD: + List<Consumer> delegates = schema.getFields().stream().map(field -> + createSkipConsumer(field.schema())).collect(Collectors.toList()); + + skipFunction = decoder -> { + for (Consumer consumer : delegates) { + consumer.consume(decoder); + } + }; + + break; + case ENUM: + skipFunction = decoder -> decoder.readEnum(); + break; + case STRING: + skipFunction = decoder -> decoder.skipString(); + break; + case FIXED: + skipFunction = decoder -> decoder.skipFixed(schema.getFixedSize()); + break; + case INT: + skipFunction = decoder -> decoder.readInt(); + break; + case BOOLEAN: + skipFunction = decoder -> decoder.skipFixed(1); + break; + case LONG: + skipFunction = decoder -> decoder.readLong(); + break; + case FLOAT: + skipFunction = decoder -> decoder.readFloat(); + break; + case DOUBLE: + skipFunction = decoder -> decoder.readDouble(); + break; + case BYTES: + skipFunction = decoder -> decoder.skipBytes(); + break; + case NULL: + skipFunction = decoder -> { }; + break; + default: + // no-op, shouldn't get here + throw new UnsupportedOperationException("Invalid avro type: " + type.getName()); + } + + return new SkipConsumer(skipFunction); + } + + static CompositeAvroConsumer createCompositeConsumer( + Schema schema, AvroToArrowConfig config) { + + List<Consumer> consumers = new ArrayList<>(); + final Set<String> skipFieldNames = config.getSkipFieldNames(); + + Schema.Type type = schema.getType(); + if (type == Type.RECORD) { + for (Schema.Field field : schema.getFields()) { + if (skipFieldNames.contains(field.name())) { + consumers.add(createSkipConsumer(field.schema())); + } else { + Consumer consumer = createConsumer(field.schema(), field.name(), config); + consumers.add(consumer); + } + + } + } else { + Consumer consumer = createConsumer(schema, "", config); + consumers.add(consumer); + } + + return new CompositeAvroConsumer(consumers); + } + + private static FieldVector createVector(FieldVector consumerVector, FieldType fieldType, + String name, BufferAllocator allocator) { + return consumerVector != null ? consumerVector : fieldType.createNewSingleVector(name, allocator, null); + } + + private static String getDefaultFieldName(ArrowType type) { + Types.MinorType minorType = Types.getMinorTypeForArrowType(type); + return minorType.name().toLowerCase(); + } + + private static Field avroSchemaToField(Schema schema, String name, AvroToArrowConfig config) { + return avroSchemaToField(schema, name, config, null); + } + + private static Field avroSchemaToField( + Schema schema, + String name, + AvroToArrowConfig config, + Map<String, String> externalProps) { + + final Type type = schema.getType(); + final LogicalType logicalType = schema.getLogicalType(); + final List<Field> children = new ArrayList<>(); + final FieldType fieldType; + + switch (type) { + case UNION: + for (int i = 0; i < schema.getTypes().size(); i++) { + Schema childSchema = schema.getTypes().get(i); + // Union child vector should use default name + children.add(avroSchemaToField(childSchema, null, config)); + } + fieldType = createFieldType(new ArrowType.Union(UnionMode.Sparse, null), schema, externalProps); + break; + case ARRAY: + Schema elementSchema = schema.getElementType(); + children.add(avroSchemaToField(elementSchema, elementSchema.getName(), config)); + fieldType = createFieldType(new ArrowType.List(), schema, externalProps); + break; + case MAP: + // MapVector internal struct field and key field should be non-nullable + FieldType keyFieldType = new FieldType(/*nullable=*/false, new ArrowType.Utf8(), /*dictionary=*/null); + Field keyField = new Field("key", keyFieldType, /*children=*/null); + Field valueField = avroSchemaToField(schema.getValueType(), "value", config); + + FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), /*dictionary=*/null); + Field structField = new Field("internal", structFieldType, Arrays.asList(keyField, valueField)); + children.add(structField); + fieldType = createFieldType(new ArrowType.Map(/*keySorted=*/false), schema, externalProps); + break; + case RECORD: + final Set<String> skipFieldNames = config.getSkipFieldNames(); + for (int i = 0; i < schema.getFields().size(); i++) { + final Schema.Field field = schema.getFields().get(i); + Schema childSchema = field.schema(); + String fullChildName = String.format("%s.%s", name, field.name()); + if (!skipFieldNames.contains(fullChildName)) { + final Map<String, String> extProps = new HashMap<>(); + String doc = field.doc(); + Set<String> aliases = field.aliases(); + if (doc != null) { + extProps.put("doc", doc); + } + if (aliases != null) { + extProps.put("aliases", convertAliases(aliases)); + } + children.add(avroSchemaToField(childSchema, fullChildName, config, extProps)); + } + } + fieldType = createFieldType(new ArrowType.Struct(), schema, externalProps); + break; + case ENUM: + DictionaryProvider.MapDictionaryProvider provider = config.getProvider(); + int current = provider.getDictionaryIds().size(); + int enumCount = schema.getEnumSymbols().size(); + ArrowType.Int indexType = DictionaryEncoder.getIndexType(enumCount); + + fieldType = createFieldType(indexType, schema, externalProps, + new DictionaryEncoding(current, /*ordered=*/false, /*indexType=*/indexType)); + break; + + case STRING: + fieldType = createFieldType(new ArrowType.Utf8(), schema, externalProps); + break; + case FIXED: + final ArrowType fixedArrowType; + if (logicalType instanceof LogicalTypes.Decimal) { + fixedArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); + } else { + fixedArrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize()); + } + fieldType = createFieldType(fixedArrowType, schema, externalProps); + break; + case INT: + final ArrowType intArrowType; + if (logicalType instanceof LogicalTypes.Date) { + intArrowType = new ArrowType.Date(DateUnit.DAY); + } else if (logicalType instanceof LogicalTypes.TimeMillis) { + intArrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32); + } else { + intArrowType = new ArrowType.Int(32, /*signed=*/true); + } + fieldType = createFieldType(intArrowType, schema, externalProps); + break; + case BOOLEAN: + fieldType = createFieldType(new ArrowType.Bool(), schema, externalProps); + break; + case LONG: + final ArrowType longArrowType; + if (logicalType instanceof LogicalTypes.TimeMicros) { + longArrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64); + } else if (logicalType instanceof LogicalTypes.TimestampMillis) { + longArrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + longArrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); + } else { + longArrowType = new ArrowType.Int(64, /*signed=*/true); + } + fieldType = createFieldType(longArrowType, schema, externalProps); + break; + case FLOAT: + fieldType = createFieldType(new ArrowType.FloatingPoint(SINGLE), schema, externalProps); + break; + case DOUBLE: + fieldType = createFieldType(new ArrowType.FloatingPoint(DOUBLE), schema, externalProps); + break; + case BYTES: + final ArrowType bytesArrowType; + if (logicalType instanceof LogicalTypes.Decimal) { + bytesArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); + } else { + bytesArrowType = new ArrowType.Binary(); + } + fieldType = createFieldType(bytesArrowType, schema, externalProps); + break; + case NULL: + fieldType = createFieldType(ArrowType.Null.INSTANCE, schema, externalProps); + break; + default: + // no-op, shouldn't get here + throw new UnsupportedOperationException(); + } + + if (name == null) { + name = getDefaultFieldName(fieldType.getType()); + } + return new Field(name, fieldType, children.size() == 0 ? null : children); + } + + private static Consumer createArrayConsumer(Schema schema, String name, AvroToArrowConfig config, + FieldVector consumerVector) { + + ListVector listVector; + if (consumerVector == null) { + final Field field = avroSchemaToField(schema, name, config); + listVector = (ListVector) field.createVector(config.getAllocator()); + } else { + listVector = (ListVector) consumerVector; + } + + FieldVector dataVector = listVector.getDataVector(); + + // create delegate + Schema childSchema = schema.getElementType(); + Consumer delegate = createConsumer(childSchema, childSchema.getName(), config, dataVector); + + return new AvroArraysConsumer(listVector, delegate); + } + + private static Consumer createStructConsumer(Schema schema, String name, AvroToArrowConfig config, + FieldVector consumerVector) { + + final Set<String> skipFieldNames = config.getSkipFieldNames(); + + StructVector structVector; + if (consumerVector == null) { + final Field field = avroSchemaToField(schema, name, config, createExternalProps(schema)); + structVector = (StructVector) field.createVector(config.getAllocator()); + } else { + structVector = (StructVector) consumerVector; + } + + Consumer[] delegates = new Consumer[schema.getFields().size()]; + int vectorIndex = 0; + for (int i = 0; i < schema.getFields().size(); i++) { + Schema.Field childField = schema.getFields().get(i); + Consumer delegate; + // use full name to distinguish fields have same names between parent and child fields. + final String fullChildName = String.format("%s.%s", name, childField.name()); + if (skipFieldNames.contains(fullChildName)) { + delegate = createSkipConsumer(childField.schema()); + } else { + delegate = createConsumer(childField.schema(), fullChildName, config, + structVector.getChildrenFromFields().get(vectorIndex++)); + } + + delegates[i] = delegate; + } + + return new AvroStructConsumer(structVector, delegates); + + } + + private static Consumer createEnumConsumer(Schema schema, String name, AvroToArrowConfig config, + FieldVector consumerVector) { + + BaseIntVector indexVector; + if (consumerVector == null) { + final Field field = avroSchemaToField(schema, name, config, createExternalProps(schema)); + indexVector = (BaseIntVector) field.createVector(config.getAllocator()); + } else { + indexVector = (BaseIntVector) consumerVector; + } + + final int valueCount = schema.getEnumSymbols().size(); + VarCharVector dictVector = new VarCharVector(name, config.getAllocator()); + dictVector.allocateNewSafe(); + dictVector.setValueCount(valueCount); + for (int i = 0; i < valueCount; i++) { + dictVector.set(i, schema.getEnumSymbols().get(i).getBytes(StandardCharsets.UTF_8)); + } + Dictionary dictionary = + new Dictionary(dictVector, indexVector.getField().getDictionary()); + config.getProvider().put(dictionary); + + return new AvroEnumConsumer(indexVector); + + } + + private static Consumer createMapConsumer(Schema schema, String name, AvroToArrowConfig config, + FieldVector consumerVector) { + + MapVector mapVector; + if (consumerVector == null) { + final Field field = avroSchemaToField(schema, name, config); + mapVector = (MapVector) field.createVector(config.getAllocator()); + } else { + mapVector = (MapVector) consumerVector; + } + + // create delegate struct consumer + StructVector structVector = (StructVector) mapVector.getDataVector(); + + // keys in avro map are always assumed to be strings. + Consumer keyConsumer = new AvroStringConsumer( + (VarCharVector) structVector.getChildrenFromFields().get(0)); + Consumer valueConsumer = createConsumer(schema.getValueType(), schema.getValueType().getName(), + config, structVector.getChildrenFromFields().get(1)); + + AvroStructConsumer internalConsumer = + new AvroStructConsumer(structVector, new Consumer[] {keyConsumer, valueConsumer}); + + return new AvroMapConsumer(mapVector, internalConsumer); + } + + private static Consumer createUnionConsumer(Schema schema, String name, AvroToArrowConfig config, + FieldVector consumerVector) { + final int size = schema.getTypes().size(); + + final boolean nullable = schema.getTypes().stream().anyMatch(t -> t.getType() == Type.NULL); + + UnionVector unionVector; + if (consumerVector == null) { + final Field field = avroSchemaToField(schema, name, config); + unionVector = (UnionVector) field.createVector(config.getAllocator()); + } else { + unionVector = (UnionVector) consumerVector; + } + + List<FieldVector> childVectors = unionVector.getChildrenFromFields(); + + Consumer[] delegates = new Consumer[size]; + Types.MinorType[] types = new Types.MinorType[size]; + + for (int i = 0; i < size; i++) { + FieldVector child = childVectors.get(i); + Schema subSchema = schema.getTypes().get(i); + Consumer delegate = createConsumer(subSchema, subSchema.getName(), nullable, config, child); + delegates[i] = delegate; + types[i] = child.getMinorType(); + } + return new AvroUnionsConsumer(unionVector, delegates, types); + } + + /** + * Read data from {@link Decoder} and generate a {@link VectorSchemaRoot}. + * @param schema avro schema + * @param decoder avro decoder to read data from + */ + static VectorSchemaRoot avroToArrowVectors( + Schema schema, + Decoder decoder, + AvroToArrowConfig config) + throws IOException { + + List<FieldVector> vectors = new ArrayList<>(); + List<Consumer> consumers = new ArrayList<>(); + final Set<String> skipFieldNames = config.getSkipFieldNames(); + + Schema.Type type = schema.getType(); + if (type == Type.RECORD) { + for (Schema.Field field : schema.getFields()) { + if (skipFieldNames.contains(field.name())) { + consumers.add(createSkipConsumer(field.schema())); + } else { + Consumer consumer = createConsumer(field.schema(), field.name(), config); + consumers.add(consumer); + vectors.add(consumer.getVector()); + } + } + } else { + Consumer consumer = createConsumer(schema, "", config); + consumers.add(consumer); + vectors.add(consumer.getVector()); + } + + long validConsumerCount = consumers.stream().filter(c -> !c.skippable()).count(); + Preconditions.checkArgument(vectors.size() == validConsumerCount, + "vectors size not equals consumers size."); + + List<Field> fields = vectors.stream().map(t -> t.getField()).collect(Collectors.toList()); + + VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0); + + CompositeAvroConsumer compositeConsumer = new CompositeAvroConsumer(consumers); + + int valueCount = 0; + try { + while (true) { + ValueVectorUtility.ensureCapacity(root, valueCount + 1); + compositeConsumer.consume(decoder); + valueCount++; + } + } catch (EOFException eof) { + // reach the end of encoder stream. + root.setRowCount(valueCount); + } catch (Exception e) { + compositeConsumer.close(); + throw new UnsupportedOperationException("Error occurs while consume process.", e); + } + + return root; + } + + private static Map<String, String> getMetaData(Schema schema) { + Map<String, String> metadata = new HashMap<>(); + schema.getObjectProps().forEach((k, v) -> metadata.put(k, v.toString())); + return metadata; + } + + private static Map<String, String> getMetaData(Schema schema, Map<String, String> externalProps) { + Map<String, String> metadata = getMetaData(schema); + if (externalProps != null) { + metadata.putAll(externalProps); + } + return metadata; + } + + /** + * Parse avro attributes and convert them to metadata. + */ + private static Map<String, String> createExternalProps(Schema schema) { + final Map<String, String> extProps = new HashMap<>(); + String doc = schema.getDoc(); + Set<String> aliases = schema.getAliases(); + if (doc != null) { + extProps.put("doc", doc); + } + if (aliases != null) { + extProps.put("aliases", convertAliases(aliases)); + } + return extProps; + } + + private static FieldType createFieldType(ArrowType arrowType, Schema schema, Map<String, String> externalProps) { + return createFieldType(arrowType, schema, externalProps, /*dictionary=*/null); + } + + private static FieldType createFieldType( + ArrowType arrowType, + Schema schema, + Map<String, String> externalProps, + DictionaryEncoding dictionary) { + + return new FieldType(/*nullable=*/false, arrowType, dictionary, + getMetaData(schema, externalProps)); + } + + private static String convertAliases(Set<String> aliases) { + JsonStringArrayList jsonList = new JsonStringArrayList(); + aliases.stream().forEach(a -> jsonList.add(a)); + return jsonList.toString(); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java new file mode 100644 index 000000000..1faa7595c --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import java.io.EOFException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.arrow.consumers.CompositeAvroConsumer; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.ValueVectorUtility; +import org.apache.avro.Schema; +import org.apache.avro.io.Decoder; + +/** + * VectorSchemaRoot iterator for partially converting avro data. + */ +public class AvroToArrowVectorIterator implements Iterator<VectorSchemaRoot>, AutoCloseable { + + public static final int NO_LIMIT_BATCH_SIZE = -1; + public static final int DEFAULT_BATCH_SIZE = 1024; + + private final Decoder decoder; + private final Schema schema; + + private final AvroToArrowConfig config; + + private CompositeAvroConsumer compositeConsumer; + + private org.apache.arrow.vector.types.pojo.Schema rootSchema; + + private VectorSchemaRoot nextBatch; + + private final int targetBatchSize; + + /** + * Construct an instance. + */ + private AvroToArrowVectorIterator( + Decoder decoder, + Schema schema, + AvroToArrowConfig config) { + + this.decoder = decoder; + this.schema = schema; + this.config = config; + this.targetBatchSize = config.getTargetBatchSize(); + + } + + /** + * Create a ArrowVectorIterator to partially convert data. + */ + public static AvroToArrowVectorIterator create( + Decoder decoder, + Schema schema, + AvroToArrowConfig config) { + + AvroToArrowVectorIterator iterator = new AvroToArrowVectorIterator(decoder, schema, config); + try { + iterator.initialize(); + return iterator; + } catch (Exception e) { + iterator.close(); + throw new RuntimeException("Error occurs while creating iterator.", e); + } + } + + private void initialize() { + // create consumers + compositeConsumer = AvroToArrowUtils.createCompositeConsumer(schema, config); + List<FieldVector> vectors = new ArrayList<>(); + compositeConsumer.getConsumers().forEach(c -> vectors.add(c.getVector())); + List<Field> fields = vectors.stream().map(t -> t.getField()).collect(Collectors.toList()); + VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0); + rootSchema = root.getSchema(); + + load(root); + } + + private void consumeData(VectorSchemaRoot root) { + int readRowCount = 0; + try { + while ((targetBatchSize == NO_LIMIT_BATCH_SIZE || readRowCount < targetBatchSize)) { + compositeConsumer.consume(decoder); + readRowCount++; + } + + if (targetBatchSize == NO_LIMIT_BATCH_SIZE) { + while (true) { + ValueVectorUtility.ensureCapacity(root, readRowCount + 1); + compositeConsumer.consume(decoder); + readRowCount++; + } + } else { + while (readRowCount < targetBatchSize) { + compositeConsumer.consume(decoder); + readRowCount++; + } + } + + root.setRowCount(readRowCount); + } catch (EOFException eof) { + // reach the end of encoder stream. + root.setRowCount(readRowCount); + } catch (Exception e) { + compositeConsumer.close(); + throw new RuntimeException("Error occurs while consuming data.", e); + } + } + + // Loads the next schema root or null if no more rows are available. + private void load(VectorSchemaRoot root) { + final int targetBatchSize = config.getTargetBatchSize(); + if (targetBatchSize != NO_LIMIT_BATCH_SIZE) { + ValueVectorUtility.preAllocate(root, targetBatchSize); + } + + long validConsumerCount = compositeConsumer.getConsumers().stream().filter(c -> + !c.skippable()).count(); + Preconditions.checkArgument(root.getFieldVectors().size() == validConsumerCount, + "Schema root vectors size not equals to consumers size."); + + compositeConsumer.resetConsumerVectors(root); + + // consume data + consumeData(root); + + if (root.getRowCount() == 0) { + root.close(); + nextBatch = null; + } else { + nextBatch = root; + } + } + + @Override + public boolean hasNext() { + return nextBatch != null; + } + + /** + * Gets the next vector. The user is responsible for freeing its resources. + */ + public VectorSchemaRoot next() { + Preconditions.checkArgument(hasNext()); + VectorSchemaRoot returned = nextBatch; + try { + load(VectorSchemaRoot.create(rootSchema, config.getAllocator())); + } catch (Exception e) { + returned.close(); + throw new RuntimeException("Error occurs while getting next schema root.", e); + } + return returned; + } + + /** + * Clean up resources. + */ + public void close() { + if (nextBatch != null) { + nextBatch.close(); + } + compositeConsumer.close(); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java new file mode 100644 index 000000000..b9d0f84cf --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.complex.ListVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume array type values from avro decoder. + * Write the data to {@link ListVector}. + */ +public class AvroArraysConsumer extends BaseAvroConsumer<ListVector> { + + private final Consumer delegate; + + /** + * Instantiate a ArrayConsumer. + */ + public AvroArraysConsumer(ListVector vector, Consumer delegate) { + super(vector); + this.delegate = delegate; + } + + @Override + public void consume(Decoder decoder) throws IOException { + + vector.startNewValue(currentIndex); + long totalCount = 0; + for (long count = decoder.readArrayStart(); count != 0; count = decoder.arrayNext()) { + totalCount += count; + ensureInnerVectorCapacity(totalCount); + for (int element = 0; element < count; element++) { + delegate.consume(decoder); + } + } + vector.endValue(currentIndex, (int) totalCount); + currentIndex++; + } + + @Override + public void close() throws Exception { + super.close(); + delegate.close(); + } + + @Override + public boolean resetValueVector(ListVector vector) { + this.delegate.resetValueVector(vector.getDataVector()); + return super.resetValueVector(vector); + } + + void ensureInnerVectorCapacity(long targetCapacity) { + while (vector.getDataVector().getValueCapacity() < targetCapacity) { + vector.getDataVector().reAlloc(); + } + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java new file mode 100644 index 000000000..4ca5f2445 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.BitVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume boolean type values from avro decoder. + * Write the data to {@link BitVector}. + */ +public class AvroBooleanConsumer extends BaseAvroConsumer<BitVector> { + + /** + * Instantiate a AvroBooleanConsumer. + */ + public AvroBooleanConsumer(BitVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex, decoder.readBoolean() ? 1 : 0); + currentIndex++; + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java new file mode 100644 index 000000000..eede68ebd --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume bytes type values from avro decoder. + * Write the data to {@link VarBinaryVector}. + */ +public class AvroBytesConsumer extends BaseAvroConsumer<VarBinaryVector> { + + private ByteBuffer cacheBuffer; + + /** + * Instantiate a AvroBytesConsumer. + */ + public AvroBytesConsumer(VarBinaryVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + // cacheBuffer is initialized null and create in the first consume, + // if its capacity < size to read, decoder will create a new one with new capacity. + cacheBuffer = decoder.readBytes(cacheBuffer); + vector.setSafe(currentIndex, cacheBuffer, 0, cacheBuffer.limit()); + currentIndex++; + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java new file mode 100644 index 000000000..356707a14 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.Float8Vector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume double type values from avro decoder. + * Write the data to {@link Float8Vector}. + */ +public class AvroDoubleConsumer extends BaseAvroConsumer<Float8Vector> { + + /** + * Instantiate a AvroDoubleConsumer. + */ + public AvroDoubleConsumer(Float8Vector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readDouble()); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroEnumConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroEnumConsumer.java new file mode 100644 index 000000000..2f4443b74 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroEnumConsumer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.BaseIntVector; +import org.apache.arrow.vector.IntVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume enum type values from avro decoder. + * Write the data to {@link IntVector}. + */ +public class AvroEnumConsumer extends BaseAvroConsumer<BaseIntVector> { + + /** + * Instantiate a AvroEnumConsumer. + */ + public AvroEnumConsumer(BaseIntVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.setWithPossibleTruncate(currentIndex++, decoder.readEnum()); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java new file mode 100644 index 000000000..a065466e3 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume fixed type values from avro decoder. + * Write the data to {@link org.apache.arrow.vector.FixedSizeBinaryVector}. + */ +public class AvroFixedConsumer extends BaseAvroConsumer<FixedSizeBinaryVector> { + + private final byte[] reuseBytes; + + /** + * Instantiate a AvroFixedConsumer. + */ + public AvroFixedConsumer(FixedSizeBinaryVector vector, int size) { + super(vector); + reuseBytes = new byte[size]; + } + + @Override + public void consume(Decoder decoder) throws IOException { + decoder.readFixed(reuseBytes); + vector.setSafe(currentIndex++, reuseBytes); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java new file mode 100644 index 000000000..c8de4a21a --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.Float4Vector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume float type values from avro decoder. + * Write the data to {@link Float4Vector}. + */ +public class AvroFloatConsumer extends BaseAvroConsumer<Float4Vector> { + + /** + * Instantiate a AvroFloatConsumer. + */ + public AvroFloatConsumer(Float4Vector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readFloat()); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java new file mode 100644 index 000000000..bc8d4de78 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.IntVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume int type values from avro decoder. + * Write the data to {@link IntVector}. + */ +public class AvroIntConsumer extends BaseAvroConsumer<IntVector> { + + /** + * Instantiate a AvroIntConsumer. + */ + public AvroIntConsumer(IntVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readInt()); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java new file mode 100644 index 000000000..b9016c58f --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume long type values from avro decoder. + * Write the data to {@link BigIntVector}. + */ +public class AvroLongConsumer extends BaseAvroConsumer<BigIntVector> { + + /** + * Instantiate a AvroLongConsumer. + */ + public AvroLongConsumer(BigIntVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readLong()); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java new file mode 100644 index 000000000..b8e8bd585 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume map type values from avro decoder. + * Write the data to {@link MapVector}. + */ +public class AvroMapConsumer extends BaseAvroConsumer<MapVector> { + + private final Consumer delegate; + + /** + * Instantiate a AvroMapConsumer. + */ + public AvroMapConsumer(MapVector vector, Consumer delegate) { + super(vector); + this.delegate = delegate; + } + + @Override + public void consume(Decoder decoder) throws IOException { + + vector.startNewValue(currentIndex); + long totalCount = 0; + for (long count = decoder.readMapStart(); count != 0; count = decoder.mapNext()) { + totalCount += count; + ensureInnerVectorCapacity(totalCount); + for (int element = 0; element < count; element++) { + delegate.consume(decoder); + } + } + vector.endValue(currentIndex, (int) totalCount); + currentIndex++; + } + + @Override + public void close() throws Exception { + super.close(); + delegate.close(); + } + + @Override + public boolean resetValueVector(MapVector vector) { + this.delegate.resetValueVector(vector.getDataVector()); + return super.resetValueVector(vector); + } + + void ensureInnerVectorCapacity(long targetCapacity) { + StructVector innerVector = (StructVector) vector.getDataVector(); + for (FieldVector v : innerVector.getChildrenFromFields()) { + while (v.getValueCapacity() < targetCapacity) { + v.reAlloc(); + } + } + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java new file mode 100644 index 000000000..64768008a --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.NullVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume null type values from avro decoder. + * Corresponding to {@link org.apache.arrow.vector.NullVector}. + */ +public class AvroNullConsumer extends BaseAvroConsumer<NullVector> { + + public AvroNullConsumer(NullVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + currentIndex++; + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java new file mode 100644 index 000000000..10fe234ac --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.arrow.vector.VarCharVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume string type values from avro decoder. + * Write the data to {@link VarCharVector}. + */ +public class AvroStringConsumer extends BaseAvroConsumer<VarCharVector> { + + private ByteBuffer cacheBuffer; + + /** + * Instantiate a AvroStringConsumer. + */ + public AvroStringConsumer(VarCharVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + // cacheBuffer is initialized null and create in the first consume, + // if its capacity < size to read, decoder will create a new one with new capacity. + cacheBuffer = decoder.readBytes(cacheBuffer); + vector.setSafe(currentIndex++, cacheBuffer, 0, cacheBuffer.limit()); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java new file mode 100644 index 000000000..792d01ee5 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume nested record type values from avro decoder. + * Write the data to {@link org.apache.arrow.vector.complex.StructVector}. + */ +public class AvroStructConsumer extends BaseAvroConsumer<StructVector> { + + private final Consumer[] delegates; + + /** + * Instantiate a AvroStructConsumer. + */ + public AvroStructConsumer(StructVector vector, Consumer[] delegates) { + super(vector); + this.delegates = delegates; + } + + @Override + public void consume(Decoder decoder) throws IOException { + + ensureInnerVectorCapacity(currentIndex + 1); + for (int i = 0; i < delegates.length; i++) { + delegates[i].consume(decoder); + } + vector.setIndexDefined(currentIndex); + currentIndex++; + + } + + @Override + public void close() throws Exception { + super.close(); + AutoCloseables.close(delegates); + } + + @Override + public boolean resetValueVector(StructVector vector) { + for (int i = 0; i < delegates.length; i++) { + delegates[i].resetValueVector(vector.getChildrenFromFields().get(i)); + } + return super.resetValueVector(vector); + } + + void ensureInnerVectorCapacity(long targetCapacity) { + for (FieldVector v : vector.getChildrenFromFields()) { + while (v.getValueCapacity() < targetCapacity) { + v.reAlloc(); + } + } + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java new file mode 100644 index 000000000..c0bb0200f --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.complex.UnionVector; +import org.apache.arrow.vector.types.Types; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume unions type values from avro decoder. + * Write the data to {@link org.apache.arrow.vector.complex.UnionVector}. + */ +public class AvroUnionsConsumer extends BaseAvroConsumer<UnionVector> { + + private Consumer[] delegates; + private Types.MinorType[] types; + + /** + * Instantiate an AvroUnionConsumer. + */ + public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorType[] types) { + + super(vector); + this.delegates = delegates; + this.types = types; + } + + @Override + public void consume(Decoder decoder) throws IOException { + int fieldIndex = decoder.readInt(); + + ensureInnerVectorCapacity(currentIndex + 1, fieldIndex); + Consumer delegate = delegates[fieldIndex]; + + vector.setType(currentIndex, types[fieldIndex]); + // In UnionVector we need to set sub vector writer position before consume a value + // because in the previous iterations we might not have written to the specific union sub vector. + delegate.setPosition(currentIndex); + delegate.consume(decoder); + + currentIndex++; + } + + @Override + public void close() throws Exception { + super.close(); + AutoCloseables.close(delegates); + } + + @Override + public boolean resetValueVector(UnionVector vector) { + for (int i = 0; i < delegates.length; i++) { + delegates[i].resetValueVector(vector.getChildrenFromFields().get(i)); + } + return super.resetValueVector(vector); + } + + void ensureInnerVectorCapacity(long targetCapacity, int fieldIndex) { + ValueVector fieldVector = vector.getChildrenFromFields().get(fieldIndex); + if (fieldVector.getMinorType() == Types.MinorType.NULL) { + return; + } + while (fieldVector.getValueCapacity() < targetCapacity) { + fieldVector.reAlloc(); + } + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/BaseAvroConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/BaseAvroConsumer.java new file mode 100644 index 000000000..303be8e50 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/BaseAvroConsumer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import org.apache.arrow.vector.FieldVector; + +/** + * Base class for non-skippable avro consumers. + * @param <T> vector type. + */ +public abstract class BaseAvroConsumer<T extends FieldVector> implements Consumer<T> { + + protected T vector; + protected int currentIndex; + + /** + * Constructs a base avro consumer. + * @param vector the vector to consume. + */ + public BaseAvroConsumer(T vector) { + this.vector = vector; + } + + @Override + public void addNull() { + currentIndex++; + } + + @Override + public void setPosition(int index) { + currentIndex = index; + } + + @Override + public FieldVector getVector() { + return vector; + } + + @Override + public void close() throws Exception { + vector.close(); + } + + @Override + public boolean resetValueVector(T vector) { + this.vector = vector; + this.currentIndex = 0; + return true; + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java new file mode 100644 index 000000000..af476d27c --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; +import java.util.List; + +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.avro.io.Decoder; + +/** + * Composite consumer which hold all consumers. + * It manages the consume and cleanup process. + */ +public class CompositeAvroConsumer implements AutoCloseable { + + private final List<Consumer> consumers; + + public List<Consumer> getConsumers() { + return consumers; + } + + public CompositeAvroConsumer(List<Consumer> consumers) { + this.consumers = consumers; + } + + /** + * Consume decoder data. + */ + public void consume(Decoder decoder) throws IOException { + for (Consumer consumer : consumers) { + consumer.consume(decoder); + } + } + + /** + * Reset vector of consumers with the given {@link VectorSchemaRoot}. + */ + public void resetConsumerVectors(VectorSchemaRoot root) { + int index = 0; + for (Consumer consumer : consumers) { + if (consumer.resetValueVector(root.getFieldVectors().get(index))) { + index++; + } + } + } + + @Override + public void close() { + // clean up + try { + AutoCloseables.close(consumers); + } catch (Exception e) { + throw new RuntimeException("Error occurs in close.", e); + } + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java new file mode 100644 index 000000000..8c4ee9a96 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.FieldVector; +import org.apache.avro.io.Decoder; + +/** + * Interface that is used to consume values from avro decoder. + * @param <T> The vector within consumer or its delegate, used for partially consume purpose. + */ +public interface Consumer<T extends FieldVector> extends AutoCloseable { + + /** + * Consume a specific type value from avro decoder and write it to vector. + * @param decoder avro decoder to read data + * @throws IOException on error + */ + void consume(Decoder decoder) throws IOException; + + /** + * Add null value to vector by making writer position + 1. + */ + void addNull(); + + /** + * Set the position to write value into vector. + */ + void setPosition(int index); + + /** + * Get the vector within the consumer. + */ + FieldVector getVector(); + + /** + * Close this consumer when occurs exception to avoid potential leak. + */ + void close() throws Exception; + + /** + * Reset the vector within consumer for partial read purpose. + * @return true if reset is successful, false if reset is not needed. + */ + boolean resetValueVector(T vector); + + /** + * Indicates whether the consumer is type of {@link SkipConsumer}. + */ + default boolean skippable() { + return false; + } + +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipConsumer.java new file mode 100644 index 000000000..94c5b339d --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipConsumer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.FieldVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which skip (throw away) data from the decoder. + */ +public class SkipConsumer implements Consumer { + + private final SkipFunction skipFunction; + + public SkipConsumer(SkipFunction skipFunction) { + this.skipFunction = skipFunction; + } + + @Override + public void consume(Decoder decoder) throws IOException { + skipFunction.apply(decoder); + } + + @Override + public void addNull() { + } + + @Override + public void setPosition(int index) { + } + + @Override + public FieldVector getVector() { + return null; + } + + @Override + public void close() throws Exception { + } + + @Override + public boolean resetValueVector(FieldVector vector) { + return false; + } + + @Override + public boolean skippable() { + return true; + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipFunction.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipFunction.java new file mode 100644 index 000000000..61938916a --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/SkipFunction.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.avro.io.Decoder; + +/** + * Adapter function to skip (throw away) data from the decoder. + */ +@FunctionalInterface +public interface SkipFunction { + void apply(Decoder decoder) throws IOException; +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDateConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDateConsumer.java new file mode 100644 index 000000000..3aa8970d9 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDateConsumer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers.logical; + +import java.io.IOException; + +import org.apache.arrow.consumers.BaseAvroConsumer; +import org.apache.arrow.vector.DateDayVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume date type values from avro decoder. + * Write the data to {@link DateDayVector}. + */ +public class AvroDateConsumer extends BaseAvroConsumer<DateDayVector> { + + /** + * Instantiate a AvroDateConsumer. + */ + public AvroDateConsumer(DateDayVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readInt()); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDecimalConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDecimalConsumer.java new file mode 100644 index 000000000..24d73cf82 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDecimalConsumer.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers.logical; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.arrow.consumers.BaseAvroConsumer; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.DecimalVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume decimal type values from avro decoder. + * Write the data to {@link DecimalVector}. + */ +public abstract class AvroDecimalConsumer extends BaseAvroConsumer<DecimalVector> { + + /** + * Instantiate a AvroDecimalConsumer. + */ + public AvroDecimalConsumer(DecimalVector vector) { + super(vector); + } + + /** + * Consumer for decimal logical type with original bytes type. + */ + public static class BytesDecimalConsumer extends AvroDecimalConsumer { + + private ByteBuffer cacheBuffer; + + /** + * Instantiate a BytesDecimalConsumer. + */ + public BytesDecimalConsumer(DecimalVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + cacheBuffer = decoder.readBytes(cacheBuffer); + byte[] bytes = new byte[cacheBuffer.limit()]; + Preconditions.checkArgument(bytes.length <= 16, "Decimal bytes length should <= 16."); + cacheBuffer.get(bytes); + vector.setBigEndian(currentIndex++, bytes); + } + + } + + /** + * Consumer for decimal logical type with original fixed type. + */ + public static class FixedDecimalConsumer extends AvroDecimalConsumer { + + private byte[] reuseBytes; + + /** + * Instantiate a FixedDecimalConsumer. + */ + public FixedDecimalConsumer(DecimalVector vector, int size) { + super(vector); + Preconditions.checkArgument(size <= 16, "Decimal bytes length should <= 16."); + reuseBytes = new byte[size]; + } + + @Override + public void consume(Decoder decoder) throws IOException { + decoder.readFixed(reuseBytes); + vector.setBigEndian(currentIndex++, reuseBytes); + } + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMicroConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMicroConsumer.java new file mode 100644 index 000000000..e68ba158f --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMicroConsumer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers.logical; + +import java.io.IOException; + +import org.apache.arrow.consumers.BaseAvroConsumer; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume date time-micro values from avro decoder. + * Write the data to {@link TimeMicroVector}. + */ +public class AvroTimeMicroConsumer extends BaseAvroConsumer<TimeMicroVector> { + + /** + * Instantiate a AvroTimeMicroConsumer. + */ + public AvroTimeMicroConsumer(TimeMicroVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readLong()); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMillisConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMillisConsumer.java new file mode 100644 index 000000000..f76186fc3 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMillisConsumer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers.logical; + +import java.io.IOException; + +import org.apache.arrow.consumers.BaseAvroConsumer; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume date time-millis values from avro decoder. + * Write the data to {@link TimeMilliVector}. + */ +public class AvroTimeMillisConsumer extends BaseAvroConsumer<TimeMilliVector> { + + /** + * Instantiate a AvroTimeMilliConsumer. + */ + public AvroTimeMillisConsumer(TimeMilliVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readInt()); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMicrosConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMicrosConsumer.java new file mode 100644 index 000000000..82da0e805 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMicrosConsumer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers.logical; + +import java.io.IOException; + +import org.apache.arrow.consumers.BaseAvroConsumer; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume date timestamp-micro values from avro decoder. + * Write the data to {@link TimeStampMicroVector}. + */ +public class AvroTimestampMicrosConsumer extends BaseAvroConsumer<TimeStampMicroVector> { + + /** + * Instantiate a AvroTimestampMicroConsumer. + */ + public AvroTimestampMicrosConsumer(TimeStampMicroVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readLong()); + } +} diff --git a/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMillisConsumer.java b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMillisConsumer.java new file mode 100644 index 000000000..159f49e14 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMillisConsumer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers.logical; + +import java.io.IOException; + +import org.apache.arrow.consumers.BaseAvroConsumer; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume date timestamp-millis values from avro decoder. + * Write the data to {@link TimeStampMilliVector}. + */ +public class AvroTimestampMillisConsumer extends BaseAvroConsumer<TimeStampMilliVector> { + + /** + * Instantiate a AvroTimestampMillisConsumer. + */ + public AvroTimestampMillisConsumer(TimeStampMilliVector vector) { + super(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + vector.set(currentIndex++, decoder.readLong()); + } +} diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroLogicalTypesTest.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroLogicalTypesTest.java new file mode 100644 index 000000000..050a50dda --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroLogicalTypesTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static junit.framework.TestCase.assertNull; +import static junit.framework.TestCase.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.util.DateUtility; +import org.apache.avro.Conversions; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericFixed; +import org.junit.Test; + +public class AvroLogicalTypesTest extends AvroTestBase { + + @Test + public void testTimestampMicros() throws Exception { + Schema schema = getSchema("logical/test_timestamp_micros.avsc"); + + List<Long> data = Arrays.asList(10000L, 20000L, 30000L, 40000L, 50000L); + List<LocalDateTime> expected = Arrays.asList( + DateUtility.getLocalDateTimeFromEpochMicro(10000), + DateUtility.getLocalDateTimeFromEpochMicro(20000), + DateUtility.getLocalDateTimeFromEpochMicro(30000), + DateUtility.getLocalDateTimeFromEpochMicro(40000), + DateUtility.getLocalDateTimeFromEpochMicro(50000) + ); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(expected, vector); + } + + @Test + public void testTimestampMillis() throws Exception { + Schema schema = getSchema("logical/test_timestamp_millis.avsc"); + + List<Long> data = Arrays.asList(10000L, 20000L, 30000L, 40000L, 50000L); + List<LocalDateTime> expected = Arrays.asList( + DateUtility.getLocalDateTimeFromEpochMilli(10000), + DateUtility.getLocalDateTimeFromEpochMilli(20000), + DateUtility.getLocalDateTimeFromEpochMilli(30000), + DateUtility.getLocalDateTimeFromEpochMilli(40000), + DateUtility.getLocalDateTimeFromEpochMilli(50000) + ); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(expected, vector); + } + + @Test + public void testTimeMicros() throws Exception { + Schema schema = getSchema("logical/test_time_micros.avsc"); + + List<Long> data = Arrays.asList(10000L, 20000L, 30000L, 40000L, 50000L); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(data, vector); + } + + @Test + public void testTimeMillis() throws Exception { + Schema schema = getSchema("logical/test_time_millis.avsc"); + + List<Integer> data = Arrays.asList(100, 200, 300, 400, 500); + List<LocalDateTime> expected = Arrays.asList( + DateUtility.getLocalDateTimeFromEpochMilli(100), + DateUtility.getLocalDateTimeFromEpochMilli(200), + DateUtility.getLocalDateTimeFromEpochMilli(300), + DateUtility.getLocalDateTimeFromEpochMilli(400), + DateUtility.getLocalDateTimeFromEpochMilli(500) + ); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(expected, vector); + } + + @Test + public void testDate() throws Exception { + Schema schema = getSchema("logical/test_date.avsc"); + + List<Integer> data = Arrays.asList(100, 200, 300, 400, 500); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(data, vector); + } + + @Test + public void testDecimalWithOriginalBytes() throws Exception { + Schema schema = getSchema("logical/test_decimal_with_original_bytes.avsc"); + List<ByteBuffer> data = new ArrayList<>(); + List<BigDecimal> expected = new ArrayList<>(); + + Conversions.DecimalConversion conversion = new Conversions.DecimalConversion(); + + for (int i = 0; i < 5; i++) { + BigDecimal value = new BigDecimal(i * i).setScale(2); + ByteBuffer buffer = conversion.toBytes(value, schema, schema.getLogicalType()); + data.add(buffer); + expected.add(value); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + checkPrimitiveResult(expected, vector); + + } + + @Test + public void testDecimalWithOriginalFixed() throws Exception { + Schema schema = getSchema("logical/test_decimal_with_original_fixed.avsc"); + + List<GenericFixed> data = new ArrayList<>(); + List<BigDecimal> expected = new ArrayList<>(); + + Conversions.DecimalConversion conversion = new Conversions.DecimalConversion(); + + for (int i = 0; i < 5; i++) { + BigDecimal value = new BigDecimal(i * i).setScale(2); + GenericFixed fixed = conversion.toFixed(value, schema, schema.getLogicalType()); + data.add(fixed); + expected.add(value); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + checkPrimitiveResult(expected, vector); + } + + @Test + public void testInvalidDecimalPrecision() throws Exception { + Schema schema = getSchema("logical/test_decimal_invalid1.avsc"); + List<ByteBuffer> data = new ArrayList<>(); + + Conversions.DecimalConversion conversion = new Conversions.DecimalConversion(); + + for (int i = 0; i < 5; i++) { + BigDecimal value = new BigDecimal(i * i).setScale(2); + ByteBuffer buffer = conversion.toBytes(value, schema, schema.getLogicalType()); + data.add(buffer); + } + + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, + () -> writeAndRead(schema, data)); + assertTrue(e.getMessage().contains("Precision must be in range of 1 to 38")); + + } + + @Test + public void testFailedToCreateDecimalLogicalType() throws Exception { + // For decimal logical type, if avro validate schema failed, it will not create logical type, + // and the schema will be treated as its original type. + + // java.lang.IllegalArgumentException: Invalid decimal scale: -1 (must be positive) + Schema schema1 = getSchema("logical/test_decimal_invalid2.avsc"); + assertNull(schema1.getLogicalType()); + + // java.lang.IllegalArgumentException: Invalid decimal scale: 40 (greater than precision: 20) + Schema schema2 = getSchema("logical/test_decimal_invalid3.avsc"); + assertNull(schema2.getLogicalType()); + + // java.lang.IllegalArgumentException: fixed(1) cannot store 30 digits (max 2) + Schema schema3 = getSchema("logical/test_decimal_invalid4.avsc"); + assertNull(schema3.getLogicalType()); + } + +} diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroSkipFieldTest.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroSkipFieldTest.java new file mode 100644 index 000000000..b946dbd86 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroSkipFieldTest.java @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; + +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.types.Types; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.Test; + +public class AvroSkipFieldTest extends AvroTestBase { + + @Test + public void testSkipUnionWithOneField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f0"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_union_before.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_union_one_field_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, "test" + i); + record.put(1, i % 2 == 0 ? "test" + i : null); + record.put(2, i % 2 == 0 ? "test" + i : i); + record.put(3, i); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, record.get(1)); + expectedRecord.put(1, record.get(2)); + expectedRecord.put(2, record.get(3)); + expectedData.add(expectedRecord); + } + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipUnionWithNullableOneField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f1"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_union_before.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_union_nullable_field_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, "test" + i); + record.put(1, i % 2 == 0 ? "test" + i : null); + record.put(2, i % 2 == 0 ? "test" + i : i); + record.put(3, i); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, record.get(0)); + expectedRecord.put(1, record.get(2)); + expectedRecord.put(2, record.get(3)); + expectedData.add(expectedRecord); + } + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipUnionWithMultiFields() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f2"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_union_before.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_union_multi_fields_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, "test" + i); + record.put(1, i % 2 == 0 ? "test" + i : null); + record.put(2, i % 2 == 0 ? "test" + i : i); + record.put(3, i); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, record.get(0)); + expectedRecord.put(1, record.get(1)); + expectedRecord.put(2, record.get(3)); + expectedData.add(expectedRecord); + } + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipMapField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f1"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_map_before.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_map_expected.avsc"); + + HashMap map = new HashMap(); + map.put("key1", "value1"); + map.put("key2", "value3"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, "test" + i); + record.put(1, map); + record.put(2, i % 2 == 0); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, record.get(0)); + expectedRecord.put(1, record.get(2)); + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipArrayField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f1"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_array_before.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_array_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, "test" + i); + record.put(1, Arrays.asList("test" + i, "test" + i)); + record.put(2, i % 2 == 0); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, record.get(0)); + expectedRecord.put(1, record.get(2)); + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipMultiFields() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f1"); + skipFieldNames.add("f2"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("test_record.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_multi_fields_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, "test" + i); + record.put(1, i); + record.put(2, i % 2 == 0); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, record.get(0)); + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipStringField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f2"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_base1.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_string_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + final byte[] testBytes = ("test" + i).getBytes(); + GenericRecord record = new GenericData.Record(schema); + GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema()); + fixed.bytes(testBytes); + record.put(0, fixed); + GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(schema.getField("f1").schema(), "TEST" + i % 2); + record.put(1, symbol); + record.put(2, "testtest" + i); + record.put(3, ByteBuffer.wrap(testBytes)); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, testBytes); + expectedRecord.put(1, (byte) i % 2); + expectedRecord.put(2, testBytes); + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipBytesField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f3"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_base1.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_bytes_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + final byte[] testBytes = ("test" + i).getBytes(); + GenericRecord record = new GenericData.Record(schema); + GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema()); + fixed.bytes(testBytes); + record.put(0, fixed); + GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(schema.getField("f1").schema(), "TEST" + i % 2); + record.put(1, symbol); + record.put(2, "testtest" + i); + record.put(3, ByteBuffer.wrap(testBytes)); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, testBytes); + expectedRecord.put(1, (byte) i % 2); + expectedRecord.put(2, record.get(2)); + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipFixedField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f0"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_base1.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_fixed_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + final byte[] testBytes = ("test" + i).getBytes(); + GenericRecord record = new GenericData.Record(schema); + GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema()); + fixed.bytes(testBytes); + record.put(0, fixed); + GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(schema.getField("f1").schema(), "TEST" + i % 2); + record.put(1, symbol); + record.put(2, "testtest" + i); + record.put(3, ByteBuffer.wrap(testBytes)); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, (byte) i % 2); + expectedRecord.put(1, record.get(2)); + expectedRecord.put(2, record.get(3)); + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipEnumField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f1"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_base1.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_fixed_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + final byte[] testBytes = ("test" + i).getBytes(); + GenericRecord record = new GenericData.Record(schema); + GenericData.Fixed fixed = new GenericData.Fixed(schema.getField("f0").schema()); + fixed.bytes(testBytes); + record.put(0, fixed); + GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(schema.getField("f1").schema(), "TEST" + i % 2); + record.put(1, symbol); + record.put(2, "testtest" + i); + record.put(3, ByteBuffer.wrap(testBytes)); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, testBytes); + expectedRecord.put(1, record.get(2)); + expectedRecord.put(2, record.get(3)); + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipBooleanField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f0"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_base2.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_boolean_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0); + record.put(1, i); + record.put(2, (long) i); + record.put(3, (float) i); + record.put(4, (double) i); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, record.get(1)); + expectedRecord.put(1, record.get(2)); + expectedRecord.put(2, record.get(3)); + expectedRecord.put(3, record.get(4)); + + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipIntField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f1"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_base2.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_int_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0); + record.put(1, i); + record.put(2, (long) i); + record.put(3, (float) i); + record.put(4, (double) i); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, record.get(0)); + expectedRecord.put(1, record.get(2)); + expectedRecord.put(2, record.get(3)); + expectedRecord.put(3, record.get(4)); + + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipLongField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f2"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_base2.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_long_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0); + record.put(1, i); + record.put(2, (long) i); + record.put(3, (float) i); + record.put(4, (double) i); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, record.get(0)); + expectedRecord.put(1, record.get(1)); + expectedRecord.put(2, record.get(3)); + expectedRecord.put(3, record.get(4)); + + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipFloatField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f3"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_base2.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_float_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0); + record.put(1, i); + record.put(2, (long) i); + record.put(3, (float) i); + record.put(4, (double) i); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, record.get(0)); + expectedRecord.put(1, record.get(1)); + expectedRecord.put(2, record.get(2)); + expectedRecord.put(3, record.get(4)); + + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipDoubleField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f4"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_base2.avsc"); + Schema expectedSchema = getSchema("skip/test_skip_double_expected.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0); + record.put(1, i); + record.put(2, (long) i); + record.put(3, (float) i); + record.put(4, (double) i); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, record.get(0)); + expectedRecord.put(1, record.get(1)); + expectedRecord.put(2, record.get(2)); + expectedRecord.put(3, record.get(3)); + + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipRecordField() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f0"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("skip/test_skip_record_before.avsc"); + Schema nestedSchema = schema.getFields().get(0).schema(); + ArrayList<GenericRecord> data = new ArrayList<>(); + + Schema expectedSchema = getSchema("skip/test_skip_record_expected.avsc"); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + GenericRecord nestedRecord = new GenericData.Record(nestedSchema); + nestedRecord.put(0, "test" + i); + nestedRecord.put(1, i); + record.put(0, nestedRecord); + record.put(1, i); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + expectedRecord.put(0, i); + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipNestedFields() throws Exception { + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f0.f0"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + Schema schema = getSchema("test_nested_record.avsc"); + Schema nestedSchema = schema.getFields().get(0).schema(); + ArrayList<GenericRecord> data = new ArrayList<>(); + + Schema expectedSchema = getSchema("skip/test_skip_second_level_expected.avsc"); + Schema expectedNestedSchema = expectedSchema.getFields().get(0).schema(); + ArrayList<GenericRecord> expectedData = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + GenericRecord nestedRecord = new GenericData.Record(nestedSchema); + nestedRecord.put(0, "test" + i); + nestedRecord.put(1, i); + record.put(0, nestedRecord); + data.add(record); + + GenericRecord expectedRecord = new GenericData.Record(expectedSchema); + GenericRecord expectedNestedRecord = new GenericData.Record(expectedNestedSchema); + expectedNestedRecord.put(0, nestedRecord.get(1)); + expectedRecord.put(0, expectedNestedRecord); + expectedData.add(expectedRecord); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkNestedRecordResult(expectedSchema, expectedData, root); + } + + @Test + public void testSkipThirdLevelField() throws Exception { + Schema firstLevelSchema = getSchema("skip/test_skip_third_level_expected.avsc"); + Schema secondLevelSchema = firstLevelSchema.getFields().get(0).schema(); + Schema thirdLevelSchema = secondLevelSchema.getFields().get(0).schema(); + + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord firstLevelRecord = new GenericData.Record(firstLevelSchema); + GenericRecord secondLevelRecord = new GenericData.Record(secondLevelSchema); + GenericRecord thirdLevelRecord = new GenericData.Record(thirdLevelSchema); + + thirdLevelRecord.put(0, i); + thirdLevelRecord.put(1, "test" + i); + thirdLevelRecord.put(2, i % 2 == 0); + + secondLevelRecord.put(0, thirdLevelRecord); + firstLevelRecord.put(0, secondLevelRecord); + data.add(firstLevelRecord); + } + + // do not skip any fields first + VectorSchemaRoot root1 = writeAndRead(firstLevelSchema, data); + + assertEquals(1, root1.getFieldVectors().size()); + assertEquals(Types.MinorType.STRUCT, root1.getFieldVectors().get(0).getMinorType()); + StructVector secondLevelVector = (StructVector) root1.getFieldVectors().get(0); + assertEquals(1, secondLevelVector.getChildrenFromFields().size()); + assertEquals(Types.MinorType.STRUCT, secondLevelVector.getChildrenFromFields().get(0).getMinorType()); + StructVector thirdLevelVector = (StructVector) secondLevelVector.getChildrenFromFields().get(0); + assertEquals(3, thirdLevelVector.getChildrenFromFields().size()); + + // skip third level field and validate + Set<String> skipFieldNames = new HashSet<>(); + skipFieldNames.add("f0.f0.f0"); + config = new AvroToArrowConfigBuilder(config.getAllocator()).setSkipFieldNames(skipFieldNames).build(); + VectorSchemaRoot root2 = writeAndRead(firstLevelSchema, data); + + assertEquals(1, root2.getFieldVectors().size()); + assertEquals(Types.MinorType.STRUCT, root2.getFieldVectors().get(0).getMinorType()); + StructVector secondStruct = (StructVector) root2.getFieldVectors().get(0); + assertEquals(1, secondStruct.getChildrenFromFields().size()); + assertEquals(Types.MinorType.STRUCT, secondStruct.getChildrenFromFields().get(0).getMinorType()); + StructVector thirdStruct = (StructVector) secondStruct.getChildrenFromFields().get(0); + assertEquals(2, thirdStruct.getChildrenFromFields().size()); + + assertEquals(Types.MinorType.INT, thirdStruct.getChildrenFromFields().get(0).getMinorType()); + assertEquals(Types.MinorType.BIT, thirdStruct.getChildrenFromFields().get(1).getMinorType()); + } +} diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroTestBase.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroTestBase.java new file mode 100644 index 000000000..a00cd7704 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroTestBase.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.util.Text; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +public class AvroTestBase { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + protected AvroToArrowConfig config; + + @Before + public void init() { + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + config = new AvroToArrowConfigBuilder(allocator).build(); + } + + protected Schema getSchema(String schemaName) throws Exception { + Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), + "schema", schemaName); + return new Schema.Parser().parse(schemaPath.toFile()); + } + + protected VectorSchemaRoot writeAndRead(Schema schema, List data) throws Exception { + File dataFile = TMP.newFile(); + + BinaryEncoder + encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); + DatumWriter writer = new GenericDatumWriter(schema); + BinaryDecoder + decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); + + for (Object value : data) { + writer.write(value, encoder); + } + + return AvroToArrow.avroToArrow(schema, decoder, config); + } + + protected void checkArrayResult(List<List<?>> expected, ListVector vector) { + assertEquals(expected.size(), vector.getValueCount()); + for (int i = 0; i < expected.size(); i++) { + checkArrayElement(expected.get(i), vector.getObject(i)); + } + } + + protected void checkArrayElement(List expected, List actual) { + assertEquals(expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i++) { + Object value1 = expected.get(i); + Object value2 = actual.get(i); + if (value1 == null) { + assertTrue(value2 == null); + continue; + } + if (value2 instanceof byte[]) { + value2 = ByteBuffer.wrap((byte[]) value2); + } else if (value2 instanceof Text) { + value2 = value2.toString(); + } + assertEquals(value1, value2); + } + } + + protected void checkPrimitiveResult(List data, FieldVector vector) { + assertEquals(data.size(), vector.getValueCount()); + for (int i = 0; i < data.size(); i++) { + Object value1 = data.get(i); + Object value2 = vector.getObject(i); + if (value1 == null) { + assertTrue(value2 == null); + continue; + } + if (value2 instanceof byte[]) { + value2 = ByteBuffer.wrap((byte[]) value2); + if (value1 instanceof byte[]) { + value1 = ByteBuffer.wrap((byte[]) value1); + } + } else if (value2 instanceof Text) { + value2 = value2.toString(); + } else if (value2 instanceof Byte) { + value2 = ((Byte) value2).intValue(); + } + assertEquals(value1, value2); + } + } + + protected void checkRecordResult(Schema schema, ArrayList<GenericRecord> data, VectorSchemaRoot root) { + assertEquals(data.size(), root.getRowCount()); + assertEquals(schema.getFields().size(), root.getFieldVectors().size()); + + for (int i = 0; i < schema.getFields().size(); i++) { + ArrayList fieldData = new ArrayList(); + for (GenericRecord record : data) { + fieldData.add(record.get(i)); + } + + checkPrimitiveResult(fieldData, root.getFieldVectors().get(i)); + } + + } + + protected void checkNestedRecordResult(Schema schema, List<GenericRecord> data, VectorSchemaRoot root) { + assertEquals(data.size(), root.getRowCount()); + assertTrue(schema.getFields().size() == 1); + + final Schema nestedSchema = schema.getFields().get(0).schema(); + final StructVector structVector = (StructVector) root.getFieldVectors().get(0); + + for (int i = 0; i < nestedSchema.getFields().size(); i++) { + ArrayList fieldData = new ArrayList(); + for (GenericRecord record : data) { + GenericRecord nestedRecord = (GenericRecord) record.get(0); + fieldData.add(nestedRecord.get(i)); + } + + checkPrimitiveResult(fieldData, structVector.getChildrenFromFields().get(i)); + } + + } + + + // belows are for iterator api + + protected void checkArrayResult(List<List<?>> expected, List<ListVector> vectors) { + int valueCount = vectors.stream().mapToInt(v -> v.getValueCount()).sum(); + assertEquals(expected.size(), valueCount); + + int index = 0; + for (ListVector vector : vectors) { + for (int i = 0; i < vector.getValueCount(); i++) { + checkArrayElement(expected.get(index++), vector.getObject(i)); + } + } + } + + protected void checkRecordResult(Schema schema, ArrayList<GenericRecord> data, List<VectorSchemaRoot> roots) { + roots.forEach(root -> { + assertEquals(schema.getFields().size(), root.getFieldVectors().size()); + }); + + for (int i = 0; i < schema.getFields().size(); i++) { + List fieldData = new ArrayList(); + List<FieldVector> vectors = new ArrayList<>(); + for (GenericRecord record : data) { + fieldData.add(record.get(i)); + } + final int columnIndex = i; + roots.forEach(root -> vectors.add(root.getFieldVectors().get(columnIndex))); + + checkPrimitiveResult(fieldData, vectors); + } + + } + + protected void checkPrimitiveResult(List data, List<FieldVector> vectors) { + int valueCount = vectors.stream().mapToInt(v -> v.getValueCount()).sum(); + assertEquals(data.size(), valueCount); + + int index = 0; + for (FieldVector vector : vectors) { + for (int i = 0; i < vector.getValueCount(); i++) { + Object value1 = data.get(index++); + Object value2 = vector.getObject(i); + if (value1 == null) { + assertNull(value2); + continue; + } + if (value2 instanceof byte[]) { + value2 = ByteBuffer.wrap((byte[]) value2); + if (value1 instanceof byte[]) { + value1 = ByteBuffer.wrap((byte[]) value1); + } + } else if (value2 instanceof Text) { + value2 = value2.toString(); + } + assertEquals(value1, value2); + } + } + } +} diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowIteratorTest.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowIteratorTest.java new file mode 100644 index 000000000..2b05a19f3 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowIteratorTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.junit.Assert.assertEquals; + +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.util.Utf8; +import org.junit.Test; + +public class AvroToArrowIteratorTest extends AvroTestBase { + + @Override + public void init() { + final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + this.config = new AvroToArrowConfigBuilder(allocator).setTargetBatchSize(3).build(); + } + + private AvroToArrowVectorIterator convert(Schema schema, List data) throws Exception { + File dataFile = TMP.newFile(); + + BinaryEncoder + encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); + DatumWriter writer = new GenericDatumWriter(schema); + BinaryDecoder + decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); + + for (Object value : data) { + writer.write(value, encoder); + } + + return AvroToArrow.avroToArrowIterator(schema, decoder, config); + } + + @Test + public void testStringType() throws Exception { + Schema schema = getSchema("test_primitive_string.avsc"); + List<String> data = Arrays.asList("v1", "v2", "v3", "v4", "v5"); + + List<VectorSchemaRoot> roots = new ArrayList<>(); + List<FieldVector> vectors = new ArrayList<>(); + try (AvroToArrowVectorIterator iterator = convert(schema, data)) { + while (iterator.hasNext()) { + VectorSchemaRoot root = iterator.next(); + FieldVector vector = root.getFieldVectors().get(0); + roots.add(root); + vectors.add(vector); + } + } + checkPrimitiveResult(data, vectors); + AutoCloseables.close(roots); + } + + @Test + public void testNullableStringType() throws Exception { + Schema schema = getSchema("test_nullable_string.avsc"); + + List<GenericRecord> data = new ArrayList<>(); + List<String> expected = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + String value = i % 2 == 0 ? "test" + i : null; + record.put(0, value); + expected.add(value); + data.add(record); + } + + List<VectorSchemaRoot> roots = new ArrayList<>(); + List<FieldVector> vectors = new ArrayList<>(); + try (AvroToArrowVectorIterator iterator = convert(schema, data);) { + while (iterator.hasNext()) { + VectorSchemaRoot root = iterator.next(); + FieldVector vector = root.getFieldVectors().get(0); + roots.add(root); + vectors.add(vector); + } + } + checkPrimitiveResult(expected, vectors); + AutoCloseables.close(roots); + + } + + @Test + public void testRecordType() throws Exception { + Schema schema = getSchema("test_record.avsc"); + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, "test" + i); + record.put(1, i); + record.put(2, i % 2 == 0); + data.add(record); + } + + List<VectorSchemaRoot> roots = new ArrayList<>(); + try (AvroToArrowVectorIterator iterator = convert(schema, data)) { + while (iterator.hasNext()) { + roots.add(iterator.next()); + } + } + checkRecordResult(schema, data, roots); + AutoCloseables.close(roots); + + } + + @Test + public void testArrayType() throws Exception { + Schema schema = getSchema("test_array.avsc"); + List<List<?>> data = Arrays.asList( + Arrays.asList("11", "222", "999"), + Arrays.asList("12222", "2333", "1000"), + Arrays.asList("1rrr", "2ggg"), + Arrays.asList("1vvv", "2bbb"), + Arrays.asList("1fff", "2")); + + List<VectorSchemaRoot> roots = new ArrayList<>(); + List<ListVector> vectors = new ArrayList<>(); + try (AvroToArrowVectorIterator iterator = convert(schema, data)) { + while (iterator.hasNext()) { + VectorSchemaRoot root = iterator.next(); + roots.add(root); + vectors.add((ListVector) root.getFieldVectors().get(0)); + } + } + checkArrayResult(data, vectors); + AutoCloseables.close(roots); + } + + @Test + public void runLargeNumberOfRows() throws Exception { + Schema schema = getSchema("test_large_data.avsc"); + int x = 0; + final int targetRows = 600000; + Decoder fakeDecoder = new FakeDecoder(targetRows); + try (AvroToArrowVectorIterator iter = AvroToArrow.avroToArrowIterator(schema, fakeDecoder, + new AvroToArrowConfigBuilder(config.getAllocator()).build())) { + while (iter.hasNext()) { + VectorSchemaRoot root = iter.next(); + x += root.getRowCount(); + root.close(); + } + } + + assertEquals(x, targetRows); + } + + /** + * Fake avro decoder to test large data. + */ + private class FakeDecoder extends Decoder { + + private int numRows; + + FakeDecoder(int numRows) { + this.numRows = numRows; + } + + // note that Decoder has no hasNext() API, assume enum is the first type in schema + // and fixed is the last type in schema and they are unique. + private void validate() throws EOFException { + if (numRows <= 0) { + throw new EOFException(); + } + } + + @Override + public void readNull() throws IOException { + } + + @Override + public boolean readBoolean() throws IOException { + return false; + } + + @Override + public int readInt() throws IOException { + return 0; + } + + @Override + public long readLong() throws IOException { + return 0; + } + + @Override + public float readFloat() throws IOException { + return 0; + } + + @Override + public double readDouble() throws IOException { + return 0; + } + + @Override + public Utf8 readString(Utf8 old) throws IOException { + return new Utf8("test123test123" + numRows); + } + + @Override + public String readString() throws IOException { + return "test123test123" + numRows; + } + + @Override + public void skipString() throws IOException { + + } + + @Override + public ByteBuffer readBytes(ByteBuffer old) throws IOException { + return ByteBuffer.allocate(0); + } + + @Override + public void skipBytes() throws IOException { + + } + + @Override + public void readFixed(byte[] bytes, int start, int length) throws IOException { + // fixed type is last column, after read value, decrease numRows + numRows--; + } + + @Override + public void skipFixed(int length) throws IOException { + + } + + @Override + public int readEnum() throws IOException { + // enum type is first column, validate numRows first. + validate(); + return 0; + } + + @Override + public long readArrayStart() throws IOException { + return 5; + } + + @Override + public long arrayNext() throws IOException { + return 0; + } + + @Override + public long skipArray() throws IOException { + return 0; + } + + @Override + public long readMapStart() throws IOException { + return 5; + } + + @Override + public long mapNext() throws IOException { + return 0; + } + + @Override + public long skipMap() throws IOException { + return 0; + } + + @Override + public int readIndex() throws IOException { + return 0; + } + } +} diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java new file mode 100644 index 000000000..c007e1ac7 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.Test; + +public class AvroToArrowTest extends AvroTestBase { + + @Test + public void testStringType() throws Exception { + Schema schema = getSchema("test_primitive_string.avsc"); + List<String> data = Arrays.asList("v1", "v2", "v3", "v4", "v5"); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(data, vector); + } + + @Test + public void testNullableStringType() throws Exception { + Schema schema = getSchema("test_nullable_string.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0 ? "test" + i : null); + data.add(record); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(schema, data, root); + } + + @Test + public void testRecordType() throws Exception { + Schema schema = getSchema("test_record.avsc"); + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, "test" + i); + record.put(1, i); + record.put(2, i % 2 == 0); + data.add(record); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(schema, data, root); + } + + @Test + public void testFixedAttributes() throws Exception { + Schema schema = getSchema("attrs/test_fixed_attr.avsc"); + + List<GenericData.Fixed> data = new ArrayList<>(); + List<byte[]> expected = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + byte[] value = ("value" + i).getBytes(StandardCharsets.UTF_8); + expected.add(value); + GenericData.Fixed fixed = new GenericData.Fixed(schema); + fixed.bytes(value); + data.add(fixed); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + Map<String, String> metadata = vector.getField().getMetadata(); + assertEquals("fixed doc", metadata.get("doc")); + assertEquals("[\"alias1\",\"alias2\"]", metadata.get("aliases")); + } + + @Test + public void testEnumAttributes() throws Exception { + Schema schema = getSchema("attrs/test_enum_attrs.avsc"); + List<GenericData.EnumSymbol> data = Arrays.asList( + new GenericData.EnumSymbol(schema, "SPADES"), + new GenericData.EnumSymbol(schema, "HEARTS"), + new GenericData.EnumSymbol(schema, "DIAMONDS"), + new GenericData.EnumSymbol(schema, "CLUBS"), + new GenericData.EnumSymbol(schema, "SPADES")); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + Map<String, String> metadata = vector.getField().getMetadata(); + assertEquals("enum doc", metadata.get("doc")); + assertEquals("[\"alias1\",\"alias2\"]", metadata.get("aliases")); + } + + @Test + public void testRecordAttributes() throws Exception { + Schema schema = getSchema("attrs/test_record_attrs.avsc"); + Schema nestedSchema = schema.getFields().get(0).schema(); + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + GenericRecord nestedRecord = new GenericData.Record(nestedSchema); + nestedRecord.put(0, "test" + i); + nestedRecord.put(1, i); + record.put(0, nestedRecord); + + data.add(record); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + + StructVector structVector = (StructVector) root.getFieldVectors().get(0); + Map<String, String> structMeta = structVector.getField().getMetadata(); + Map<String, String> childMeta1 = structVector.getChildByOrdinal(0).getField().getMetadata(); + Map<String, String> childMeta2 = structVector.getChildByOrdinal(1).getField().getMetadata(); + + assertEquals("f0 doc", structMeta.get("doc")); + assertEquals("[\"f0.a1\"]", structMeta.get("aliases")); + assertEquals("f1 doc", childMeta1.get("doc")); + assertEquals("[\"f1.a1\",\"f1.a2\"]", childMeta1.get("aliases")); + assertEquals("f2 doc", childMeta2.get("doc")); + assertEquals("[\"f2.a1\",\"f2.a2\"]", childMeta2.get("aliases")); + } + + @Test + public void testNestedRecordType() throws Exception { + Schema schema = getSchema("test_nested_record.avsc"); + Schema nestedSchema = schema.getFields().get(0).schema(); + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + GenericRecord nestedRecord = new GenericData.Record(nestedSchema); + nestedRecord.put(0, "test" + i); + nestedRecord.put(1, i); + record.put(0, nestedRecord); + + data.add(record); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkNestedRecordResult(schema, data, root); + } + + @Test + public void testEnumType() throws Exception { + Schema schema = getSchema("test_primitive_enum.avsc"); + List<GenericData.EnumSymbol> data = Arrays.asList( + new GenericData.EnumSymbol(schema, "SPADES"), + new GenericData.EnumSymbol(schema, "HEARTS"), + new GenericData.EnumSymbol(schema, "DIAMONDS"), + new GenericData.EnumSymbol(schema, "CLUBS"), + new GenericData.EnumSymbol(schema, "SPADES")); + + List<Integer> expectedIndices = Arrays.asList(0, 1, 2, 3, 0); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(expectedIndices, vector); + + VarCharVector dictVector = (VarCharVector) config.getProvider().lookup(0).getVector(); + assertEquals(4, dictVector.getValueCount()); + + assertEquals("SPADES", dictVector.getObject(0).toString()); + assertEquals("HEARTS", dictVector.getObject(1).toString()); + assertEquals("DIAMONDS", dictVector.getObject(2).toString()); + assertEquals("CLUBS", dictVector.getObject(3).toString()); + } + + @Test + public void testIntType() throws Exception { + Schema schema = getSchema("test_primitive_int.avsc"); + List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(data, vector); + } + + @Test + public void testNullableIntType() throws Exception { + Schema schema = getSchema("test_nullable_int.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0 ? i : null); + data.add(record); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(schema, data, root); + } + + @Test + public void testLongType() throws Exception { + Schema schema = getSchema("test_primitive_long.avsc"); + List<Long> data = Arrays.asList(1L, 2L, 3L, 4L, 5L); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(data, vector); + } + + @Test + public void testNullableLongType() throws Exception { + Schema schema = getSchema("test_nullable_long.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0 ? (long) i : null); + data.add(record); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(schema, data, root); + } + + @Test + public void testFloatType() throws Exception { + Schema schema = getSchema("test_primitive_float.avsc"); + List<Float> data = Arrays.asList(1.1f, 2.2f, 3.3f, 4.4f, 5.5f); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(data, vector); + } + + @Test + public void testNullableFloatType() throws Exception { + Schema schema = getSchema("test_nullable_float.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0 ? i + 0.1f : null); + data.add(record); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(schema, data, root); + } + + @Test + public void testDoubleType() throws Exception { + Schema schema = getSchema("test_primitive_double.avsc"); + List<Double> data = Arrays.asList(1.1, 2.2, 3.3, 4.4, 5.5); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(data, vector); + } + + @Test + public void testNullableDoubleType() throws Exception { + Schema schema = getSchema("test_nullable_double.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0 ? i + 0.1 : null); + data.add(record); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(schema, data, root); + } + + @Test + public void testBytesType() throws Exception { + Schema schema = getSchema("test_primitive_bytes.avsc"); + List<ByteBuffer> data = Arrays.asList( + ByteBuffer.wrap("value1".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value2".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value3".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value4".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value5".getBytes(StandardCharsets.UTF_8))); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(data, vector); + } + + @Test + public void testNullableBytesType() throws Exception { + Schema schema = getSchema("test_nullable_bytes.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0 ? ByteBuffer.wrap(("test" + i).getBytes(StandardCharsets.UTF_8)) : null); + data.add(record); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(schema, data, root); + } + + @Test + public void testBooleanType() throws Exception { + Schema schema = getSchema("test_primitive_boolean.avsc"); + List<Boolean> data = Arrays.asList(true, false, true, false, true); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(data, vector); + } + + @Test + public void testNullableBooleanType() throws Exception { + Schema schema = getSchema("test_nullable_boolean.avsc"); + + ArrayList<GenericRecord> data = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0 ? true : null); + data.add(record); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + checkRecordResult(schema, data, root); + } + + @Test + public void testArrayType() throws Exception { + Schema schema = getSchema("test_array.avsc"); + List<List<?>> data = Arrays.asList( + Arrays.asList("11", "222", "999"), + Arrays.asList("12222", "2333", "1000"), + Arrays.asList("1rrr", "2ggg"), + Arrays.asList("1vvv", "2bbb"), + Arrays.asList("1fff", "2")); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkArrayResult(data, (ListVector) vector); + } + + @Test + public void testMapType() throws Exception { + Schema schema = getSchema("test_map.avsc"); + + List keys = Arrays.asList("key1", "key2", "key3", "key4", "key5", "key6"); + List vals = Arrays.asList("val1", "val2", "val3", "val4", "val5", "val6"); + + List<LinkedHashMap> data = new ArrayList<>(); + LinkedHashMap map1 = new LinkedHashMap(); + map1.put(keys.get(0), vals.get(0)); + map1.put(keys.get(1), vals.get(1)); + data.add(map1); + + LinkedHashMap map2 = new LinkedHashMap(); + map2.put(keys.get(2), vals.get(2)); + map2.put(keys.get(3), vals.get(3)); + data.add(map2); + + LinkedHashMap map3 = new LinkedHashMap(); + map3.put(keys.get(4), vals.get(4)); + map3.put(keys.get(5), vals.get(5)); + data.add(map3); + + VectorSchemaRoot root = writeAndRead(schema, data); + MapVector vector = (MapVector) root.getFieldVectors().get(0); + + checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0)); + checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1)); + assertEquals(0, vector.getOffsetBuffer().getInt(0)); + assertEquals(2, vector.getOffsetBuffer().getInt(1 * 4)); + assertEquals(4, vector.getOffsetBuffer().getInt(2 * 4)); + assertEquals(6, vector.getOffsetBuffer().getInt(3 * 4)); + } + + @Test + public void testFixedType() throws Exception { + Schema schema = getSchema("test_fixed.avsc"); + + List<GenericData.Fixed> data = new ArrayList<>(); + List<byte[]> expected = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + byte[] value = ("value" + i).getBytes(StandardCharsets.UTF_8); + expected.add(value); + GenericData.Fixed fixed = new GenericData.Fixed(schema); + fixed.bytes(value); + data.add(fixed); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(expected, vector); + } + + @Test + public void testUnionType() throws Exception { + Schema schema = getSchema("test_union.avsc"); + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<Object> expected = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put(0, i % 2 == 0 ? "test" + i : i); + expected.add(i % 2 == 0 ? "test" + i : i); + data.add(record); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(expected, vector); + } + + @Test + public void testNullableUnionType() throws Exception { + Schema schema = getSchema("test_nullable_union.avsc"); + ArrayList<GenericRecord> data = new ArrayList<>(); + ArrayList<Object> expected = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GenericRecord record = new GenericData.Record(schema); + if (i % 3 == 0) { + record.put(0, "test" + i); + expected.add("test" + i); + data.add(record); + } else if (i % 3 == 1) { + record.put(0, i); + expected.add(i); + data.add(record); + } else { + record.put(0, null); + expected.add(null); + data.add(record); + } + } + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(expected, vector); + } + +} diff --git a/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/TestWriteReadAvroRecord.java b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/TestWriteReadAvroRecord.java new file mode 100644 index 000000000..bf695d193 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/java/org/apache/arrow/TestWriteReadAvroRecord.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + + +public class TestWriteReadAvroRecord { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + @Test + public void testWriteAndRead() throws Exception { + + File dataFile = TMP.newFile(); + Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test.avsc"); + Schema schema = new Schema.Parser().parse(schemaPath.toFile()); + + //write data to disk + GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + GenericRecord user2 = new GenericData.Record(schema); + user2.put("name", "Ben"); + user2.put("favorite_number", 7); + user2.put("favorite_color", "red"); + + DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); + DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); + dataFileWriter.create(schema, dataFile); + dataFileWriter.append(user1); + dataFileWriter.append(user2); + dataFileWriter.close(); + + //read data from disk + DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema); + DataFileReader<GenericRecord> + dataFileReader = new DataFileReader<GenericRecord>(dataFile, datumReader); + List<GenericRecord> result = new ArrayList<>(); + while (dataFileReader.hasNext()) { + GenericRecord user = dataFileReader.next(); + result.add(user); + } + + assertEquals(2, result.size()); + GenericRecord deUser1 = result.get(0); + assertEquals("Alyssa", deUser1.get("name").toString()); + assertEquals(256, deUser1.get("favorite_number")); + assertEquals(null, deUser1.get("favorite_color")); + + GenericRecord deUser2 = result.get(1); + assertEquals("Ben", deUser2.get("name").toString()); + assertEquals(7, deUser2.get("favorite_number")); + assertEquals("red", deUser2.get("favorite_color").toString()); + } + +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_enum_attrs.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_enum_attrs.avsc new file mode 100644 index 000000000..afd00b8d9 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_enum_attrs.avsc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "type": "enum", + "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"], + "name": "testEnum", + "doc" : "enum doc", + "aliases" : ["alias1", "alias2"] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_fixed_attr.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_fixed_attr.avsc new file mode 100644 index 000000000..55e504def --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_fixed_attr.avsc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "type": "fixed", + "size": 6, + "name": "testFixed", + "doc" : "fixed doc", + "aliases" : ["alias1", "alias2"] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_record_attrs.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_record_attrs.avsc new file mode 100644 index 000000000..2e2e311a9 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/attrs/test_record_attrs.avsc @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testAttrs", + "fields": [ + { + "name" : "f0", + "type" : { + "type" : "record", + "name" : "nestedInRecord", + "doc" : "f0 doc", + "aliases" : ["f0.a1"], + "fields": [ + {"name": "f1", "type": "string", "doc": "f1 doc", "aliases" : ["f1.a1", "f1.a2"]}, + {"name": "f2", "type": "int", "doc": "f2 doc", "aliases" : ["f2.a1", "f2.a2"]} + ] + } + } + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_date.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_date.avsc new file mode 100644 index 000000000..f661e6506 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_date.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "int", + "logicalType" : "date" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid1.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid1.avsc new file mode 100644 index 000000000..18d7d63fc --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid1.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "bytes", + "logicalType" : "decimal", + "precision": 39, + "scale": 2 +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid2.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid2.avsc new file mode 100644 index 000000000..eed7bd781 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid2.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "bytes", + "logicalType" : "decimal", + "precision": 20, + "scale": -1 +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid3.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid3.avsc new file mode 100644 index 000000000..1667b8aff --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid3.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "bytes", + "logicalType" : "decimal", + "precision": 20, + "scale": 40 +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid4.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid4.avsc new file mode 100644 index 000000000..e1f710416 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_invalid4.avsc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "fixed", + "size" : 1, + "logicalType" : "decimal", + "precision": 30, + "scale": 2 +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_bytes.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_bytes.avsc new file mode 100644 index 000000000..944b5d85d --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_bytes.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "bytes", + "logicalType" : "decimal", + "precision": 10, + "scale": 2 +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_fixed.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_fixed.avsc new file mode 100644 index 000000000..1901f90a9 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_decimal_with_original_fixed.avsc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "fixed", + "size" : 10, + "logicalType" : "decimal", + "precision": 10, + "scale": 2 +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_micros.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_micros.avsc new file mode 100644 index 000000000..ee7d4e937 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_micros.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "long", + "logicalType" : "time-micros" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_millis.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_millis.avsc new file mode 100644 index 000000000..54877babc --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_time_millis.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "int", + "logicalType" : "time-millis" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_micros.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_micros.avsc new file mode 100644 index 000000000..15c0bf53d --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_micros.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "long", + "logicalType" : "timestamp-micros" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_millis.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_millis.avsc new file mode 100644 index 000000000..822a2c360 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/logical/test_timestamp_millis.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "name": "test", + "type": "long", + "logicalType" : "timestamp-millis" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_before.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_before.avsc new file mode 100644 index 000000000..e836aa768 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_before.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "test", + "fields": [ + {"name": "f0", "type": "string"}, + {"name": "f1", "type": {"type" : "array", "items": "string"}}, + {"name": "f2", "type": "boolean"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_expected.avsc new file mode 100644 index 000000000..36e7fdfb0 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_array_expected.avsc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "test", + "fields": [ + {"name": "f0", "type": "string"}, + {"name": "f2", "type": "boolean"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base1.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base1.avsc new file mode 100644 index 000000000..5338253f4 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base1.avsc @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f0", "type": {"type" : "fixed", "size":5, "name" : "fix"}}, + {"name": "f1", "type": {"type" : "enum", "name" : "enum", "symbols": ["TEST0", "TEST1"]}}, + {"name": "f2", "type": "string"}, + {"name": "f3", "type": "bytes"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base2.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base2.avsc new file mode 100644 index 000000000..50655a70e --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_base2.avsc @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f0", "type": "boolean"}, + {"name": "f1", "type": "int"}, + {"name": "f2", "type": "long"}, + {"name": "f3", "type": "float"}, + {"name": "f4", "type": "double"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_boolean_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_boolean_expected.avsc new file mode 100644 index 000000000..9b62e3149 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_boolean_expected.avsc @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f1", "type": "int"}, + {"name": "f2", "type": "long"}, + {"name": "f3", "type": "float"}, + {"name": "f4", "type": "double"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_bytes_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_bytes_expected.avsc new file mode 100644 index 000000000..8a1903b34 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_bytes_expected.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f0", "type": {"type" : "fixed", "size":5, "name" : "fix"}}, + {"name": "f1", "type": {"type" : "enum", "name" : "enum", "symbols": ["TEST0", "TEST1"]}}, + {"name": "f2", "type": "string"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_double_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_double_expected.avsc new file mode 100644 index 000000000..6021c4454 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_double_expected.avsc @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f0", "type": "boolean"}, + {"name": "f1", "type": "int"}, + {"name": "f2", "type": "long"}, + {"name": "f3", "type": "float"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_enum_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_enum_expected.avsc new file mode 100644 index 000000000..f5ed86a28 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_enum_expected.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f0", "type": {"type" : "fixed", "size":5, "name" : "fix"}}, + {"name": "f2", "type": "string"}, + {"name": "f3", "type": "bytes"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_fixed_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_fixed_expected.avsc new file mode 100644 index 000000000..5423a7977 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_fixed_expected.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f1", "type": {"type" : "enum", "name" : "enum", "symbols": ["TEST0", "TEST1"]}}, + {"name": "f2", "type": "string"}, + {"name": "f3", "type": "bytes"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_float_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_float_expected.avsc new file mode 100644 index 000000000..dea106331 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_float_expected.avsc @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f0", "type": "boolean"}, + {"name": "f1", "type": "int"}, + {"name": "f2", "type": "long"}, + {"name": "f4", "type": "double"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_int_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_int_expected.avsc new file mode 100644 index 000000000..53d4f1025 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_int_expected.avsc @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f0", "type": "boolean"}, + {"name": "f2", "type": "long"}, + {"name": "f3", "type": "float"}, + {"name": "f4", "type": "double"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_long_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_long_expected.avsc new file mode 100644 index 000000000..bf16601dd --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_long_expected.avsc @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f0", "type": "boolean"}, + {"name": "f1", "type": "int"}, + {"name": "f3", "type": "float"}, + {"name": "f4", "type": "double"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_before.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_before.avsc new file mode 100644 index 000000000..8cbb1a1d7 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_before.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "test", + "fields": [ + {"name": "f0", "type": "string"}, + {"name": "f1", "type": {"type" : "map", "values": "string"}}, + {"name": "f2", "type": "boolean"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_expected.avsc new file mode 100644 index 000000000..36e7fdfb0 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_map_expected.avsc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "test", + "fields": [ + {"name": "f0", "type": "string"}, + {"name": "f2", "type": "boolean"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_multi_fields_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_multi_fields_expected.avsc new file mode 100644 index 000000000..b5d637b1d --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_multi_fields_expected.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testSkip", + "fields": [ + {"name": "f0", "type": "string"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_before.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_before.avsc new file mode 100644 index 000000000..7aee92b92 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_before.avsc @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "test", + "fields": [ + { + "name" : "f0", + "type" : { + "type" : "record", + "name" : "nestedInRecord", + "fields": [ + {"name": "f00", "type": "string"}, + {"name": "f01", "type": "int"} + ] + } + }, + { + "name" : "f1", "type" : "int" + } + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_expected.avsc new file mode 100644 index 000000000..3e2495203 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_record_expected.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "test", + "fields": [ + { "name" : "f1", "type" : "int"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_second_level_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_second_level_expected.avsc new file mode 100644 index 000000000..f3b7f8c09 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_second_level_expected.avsc @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testSkipNested", + "fields": [ + { + "name" : "nested", + "type" : { + "type" : "record", + "name" : "nestedInRecord", + "fields": [ + {"name": "f1", "type": "int"} + ] + } + } + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_single_field_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_single_field_expected.avsc new file mode 100644 index 000000000..553525847 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_single_field_expected.avsc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testSkip", + "fields": [ + {"name": "f0", "type": "string"}, + {"name": "f2", "type": "boolean"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_string_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_string_expected.avsc new file mode 100644 index 000000000..2d2c08174 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_string_expected.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f0", "type": {"type" : "fixed", "size":5, "name" : "fix"}}, + {"name": "f1", "type": {"type" : "enum", "name" : "enum", "symbols": ["TEST0", "TEST1"]}}, + {"name": "f3", "type": "bytes"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_third_level_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_third_level_expected.avsc new file mode 100644 index 000000000..6f42da893 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_third_level_expected.avsc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "firstLevel", + "fields": [ + { + "name" : "f0", + "type" : { + "type" : "record", + "name" : "secondLevel", + "fields": [ + { + "name" : "f0", + "type" : { + "type" : "record", + "name" : "thirdLevel", + "fields" : [ + {"name": "f1", "type": "int"}, + {"name": "f0", "type": "string"}, + {"name": "f2", "type": "boolean"} + ] + } + } + ] + } + } + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_before.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_before.avsc new file mode 100644 index 000000000..fc1105911 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_before.avsc @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "test", + "fields": [ + {"name": "f0", "type": ["string"]}, + {"name": "f1", "type": ["string", "null"]}, + {"name": "f2", "type": ["string", "int"]}, + {"name": "f3", "type": "int"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_multi_fields_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_multi_fields_expected.avsc new file mode 100644 index 000000000..308e027a2 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_multi_fields_expected.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "test", + "fields": [ + {"name": "f0", "type": ["string"]}, + {"name": "f1", "type": ["string", "null"]}, + {"name": "f3", "type": "int"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_nullable_field_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_nullable_field_expected.avsc new file mode 100644 index 000000000..cbc83e566 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_nullable_field_expected.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "test", + "fields": [ + {"name": "f0", "type": ["string"]}, + {"name": "f2", "type": ["string", "int"]}, + {"name": "f3", "type": "int"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_one_field_expected.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_one_field_expected.avsc new file mode 100644 index 000000000..0f72fb432 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/skip/test_skip_union_one_field_expected.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "test", + "fields": [ + {"name": "f1", "type": ["string", "null"]}, + {"name": "f2", "type": ["string", "int"]}, + {"name": "f3", "type": ["string", "int"]} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test.avsc new file mode 100644 index 000000000..92c0873de --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_array.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_array.avsc new file mode 100644 index 000000000..5b75a4031 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_array.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "array", + "items": "string", + "name": "testArray" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_fixed.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_fixed.avsc new file mode 100644 index 000000000..a4d96e9ab --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_fixed.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "fixed", + "size": 6, + "name": "testFixed" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_large_data.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_large_data.avsc new file mode 100644 index 000000000..f784ae623 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_large_data.avsc @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testLargeData", + "fields": [ + { + "name": "f0", + "type": { + "name" : "f0", + "type" : "enum", + "symbols" : ["value1", "value2", "value3", "value4", "value5"] + } + }, + { + "name" : "f1", + "type" : { + "type" : "record", + "name" : "nestedRecord", + "fields": [ + {"name": "f1_0", "type": "string"}, + {"name": "f1_1", "type": "int"} + ] + } + }, + + {"name": "f2", "type": "string"}, + {"name": "f3", "type": "int"}, + {"name": "f4", "type": "boolean"}, + {"name": "f5", "type": "float"}, + {"name": "f6", "type": "double"}, + {"name": "f7", "type": "bytes"}, + {"name": "f8", "type": ["string", "int"]}, + { + "name": "f9", + "type": { + "name" : "f9", + "type" : "array", + "items" : "string" + } + }, + { + "name": "f10", + "type": { + "name" : "f10", + "type" : "map", + "values" : "string" + } + }, + { + "name": "f11", + "type": { + "type" : "fixed", + "name" : "f11", + "size" : 5 + } + } + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_map.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_map.avsc new file mode 100644 index 000000000..0dfa3a595 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_map.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "map", + "values": "string", + "name": "testMap" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nested_record.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nested_record.avsc new file mode 100644 index 000000000..29dddfd1a --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nested_record.avsc @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testNestedRecord", + "fields": [ + { + "name" : "f0", + "type" : { + "type" : "record", + "name" : "nestedInRecord", + "fields": [ + {"name": "f0", "type": "string"}, + {"name": "f1", "type": "int"} + ] + } + } + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc new file mode 100644 index 000000000..62af1a85d --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "nullableBoolean", + "fields": [ + {"name": "f0", "type": ["null", "boolean"]} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc new file mode 100644 index 000000000..002bc7ce2 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "nullableBytes", + "fields": [ + {"name": "f0", "type": ["null", "bytes"]} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc new file mode 100644 index 000000000..642b7aa16 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "nullableDouble", + "fields": [ + {"name": "f0", "type": ["null", "double"]} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc new file mode 100644 index 000000000..dff285909 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "nullableFloat", + "fields": [ + {"name": "f0", "type": ["null", "float"]} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc new file mode 100644 index 000000000..abb2fc48a --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "nullableInt", + "fields": [ + {"name": "f0", "type": ["null", "int"]} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc new file mode 100644 index 000000000..0624d2737 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "nullableLong", + "fields": [ + {"name": "f0", "type": ["null", "long"]} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc new file mode 100644 index 000000000..347808ce6 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "nullableString", + "fields": [ + {"name": "f0", "type": ["null", "string"]} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc new file mode 100644 index 000000000..af94812d7 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testNullableUnions", + "fields": [ + {"name": "f0", "type": ["string", "int", "null"]} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc new file mode 100644 index 000000000..7652ce723 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "boolean", + "name": "TestBoolean" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc new file mode 100644 index 000000000..5102430b6 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "bytes", + "name": "TestBytes" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc new file mode 100644 index 000000000..d1ae0b605 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "double", + "name": "TestDouble" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_enum.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_enum.avsc new file mode 100644 index 000000000..bd8df6102 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_enum.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "enum", + "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"], + "name": "testEnum" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc new file mode 100644 index 000000000..675d1090d --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "float", + "name": "TestFloat" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc new file mode 100644 index 000000000..8fc848828 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "int", + "name": "TestInt" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc new file mode 100644 index 000000000..b9706107c --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "long", + "name": "TestLong" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc new file mode 100644 index 000000000..b4a89a7f6 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "string", + "name": "TestString" +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_record.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_record.avsc new file mode 100644 index 000000000..e83cf1180 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_record.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testRecord", + "fields": [ + {"name": "f0", "type": "string"}, + {"name": "f1", "type": "int"}, + {"name": "f2", "type": "boolean"} + ] +} diff --git a/src/arrow/java/adapter/avro/src/test/resources/schema/test_union.avsc b/src/arrow/java/adapter/avro/src/test/resources/schema/test_union.avsc new file mode 100644 index 000000000..f181e36e3 --- /dev/null +++ b/src/arrow/java/adapter/avro/src/test/resources/schema/test_union.avsc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "record", + "name": "testUnions", + "fields": [ + {"name": "f0", "type": ["string", "int"]} + ] +} |