/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to you under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. */ #include "avro_private.h" #include "avro/allocation.h" #include "avro/consumer.h" #include "avro/errors.h" #include "avro/resolver.h" #include "avro/value.h" #include #include #include #include "encoding.h" #include "schema.h" #include "datum.h" static int read_enum(avro_reader_t reader, const avro_encoding_t * enc, avro_consumer_t *consumer, void *ud) { int rval; int64_t index; check_prefix(rval, enc->read_long(reader, &index), "Cannot read enum value: "); return avro_consumer_call(consumer, enum_value, index, ud); } static int read_array(avro_reader_t reader, const avro_encoding_t * enc, avro_consumer_t *consumer, void *ud) { int rval; int64_t i; /* index within the current block */ int64_t index = 0; /* index within the entire array */ int64_t block_count; int64_t block_size; check_prefix(rval, enc->read_long(reader, &block_count), "Cannot read array block count: "); check(rval, avro_consumer_call(consumer, array_start_block, 1, block_count, ud)); while (block_count != 0) { if (block_count < 0) { block_count = block_count * -1; check_prefix(rval, enc->read_long(reader, &block_size), "Cannot read array block size: "); } for (i = 0; i < block_count; i++, index++) { avro_consumer_t *element_consumer = NULL; void *element_ud = NULL; check(rval, avro_consumer_call(consumer, array_element, index, &element_consumer, &element_ud, ud)); check(rval, avro_consume_binary(reader, element_consumer, element_ud)); } check_prefix(rval, enc->read_long(reader, &block_count), "Cannot read array block count: "); check(rval, avro_consumer_call(consumer, array_start_block, 0, block_count, ud)); } return 0; } static int read_map(avro_reader_t reader, const avro_encoding_t * enc, avro_consumer_t *consumer, void *ud) { int rval; int64_t i; /* index within the current block */ int64_t index = 0; /* index within the entire array */ int64_t block_count; int64_t block_size; check_prefix(rval, enc->read_long(reader, &block_count), "Cannot read map block count: "); check(rval, avro_consumer_call(consumer, map_start_block, 1, block_count, ud)); while (block_count != 0) { if (block_count < 0) { block_count = block_count * -1; check_prefix(rval, enc->read_long(reader, &block_size), "Cannot read map block size: "); } for (i = 0; i < block_count; i++, index++) { char *key; int64_t key_size; avro_consumer_t *element_consumer = NULL; void *element_ud = NULL; check_prefix(rval, enc->read_string(reader, &key, &key_size), "Cannot read map key: "); rval = avro_consumer_call(consumer, map_element, index, key, &element_consumer, &element_ud, ud); if (rval) { avro_free(key, key_size); return rval; } rval = avro_consume_binary(reader, element_consumer, element_ud); if (rval) { avro_free(key, key_size); return rval; } avro_free(key, key_size); } check_prefix(rval, enc->read_long(reader, &block_count), "Cannot read map block count: "); check(rval, avro_consumer_call(consumer, map_start_block, 0, block_count, ud)); } return 0; } static int read_union(avro_reader_t reader, const avro_encoding_t * enc, avro_consumer_t *consumer, void *ud) { int rval; int64_t discriminant; avro_consumer_t *branch_consumer = NULL; void *branch_ud = NULL; check_prefix(rval, enc->read_long(reader, &discriminant), "Cannot read union discriminant: "); check(rval, avro_consumer_call(consumer, union_branch, discriminant, &branch_consumer, &branch_ud, ud)); return avro_consume_binary(reader, branch_consumer, branch_ud); } static int read_record(avro_reader_t reader, const avro_encoding_t * enc, avro_consumer_t *consumer, void *ud) { int rval; size_t num_fields; unsigned int i; AVRO_UNUSED(enc); check(rval, avro_consumer_call(consumer, record_start, ud)); num_fields = avro_schema_record_size(consumer->schema); for (i = 0; i < num_fields; i++) { avro_consumer_t *field_consumer = NULL; void *field_ud = NULL; check(rval, avro_consumer_call(consumer, record_field, i, &field_consumer, &field_ud, ud)); if (field_consumer) { check(rval, avro_consume_binary(reader, field_consumer, field_ud)); } else { avro_schema_t field_schema = avro_schema_record_field_get_by_index(consumer->schema, i); check(rval, avro_skip_data(reader, field_schema)); } } return 0; } int avro_consume_binary(avro_reader_t reader, avro_consumer_t *consumer, void *ud) { int rval; const avro_encoding_t *enc = &avro_binary_encoding; check_param(EINVAL, reader, "reader"); check_param(EINVAL, consumer, "consumer"); switch (avro_typeof(consumer->schema)) { case AVRO_NULL: check_prefix(rval, enc->read_null(reader), "Cannot read null value: "); check(rval, avro_consumer_call(consumer, null_value, ud)); break; case AVRO_BOOLEAN: { int8_t b; check_prefix(rval, enc->read_boolean(reader, &b), "Cannot read boolean value: "); check(rval, avro_consumer_call(consumer, boolean_value, b, ud)); } break; case AVRO_STRING: { int64_t len; char *s; check_prefix(rval, enc->read_string(reader, &s, &len), "Cannot read string value: "); check(rval, avro_consumer_call(consumer, string_value, s, len, ud)); } break; case AVRO_INT32: { int32_t i; check_prefix(rval, enc->read_int(reader, &i), "Cannot read int value: "); check(rval, avro_consumer_call(consumer, int_value, i, ud)); } break; case AVRO_INT64: { int64_t l; check_prefix(rval, enc->read_long(reader, &l), "Cannot read long value: "); check(rval, avro_consumer_call(consumer, long_value, l, ud)); } break; case AVRO_FLOAT: { float f; check_prefix(rval, enc->read_float(reader, &f), "Cannot read float value: "); check(rval, avro_consumer_call(consumer, float_value, f, ud)); } break; case AVRO_DOUBLE: { double d; check_prefix(rval, enc->read_double(reader, &d), "Cannot read double value: "); check(rval, avro_consumer_call(consumer, double_value, d, ud)); } break; case AVRO_BYTES: { char *bytes; int64_t len; check_prefix(rval, enc->read_bytes(reader, &bytes, &len), "Cannot read bytes value: "); check(rval, avro_consumer_call(consumer, bytes_value, bytes, len, ud)); } break; case AVRO_FIXED: { char *bytes; int64_t size = avro_schema_to_fixed(consumer->schema)->size; bytes = (char *) avro_malloc(size); if (!bytes) { avro_prefix_error("Cannot allocate new fixed value"); return ENOMEM; } rval = avro_read(reader, bytes, size); if (rval) { avro_prefix_error("Cannot read fixed value: "); avro_free(bytes, size); return rval; } rval = avro_consumer_call(consumer, fixed_value, bytes, size, ud); if (rval) { avro_free(bytes, size); return rval; } } break; case AVRO_ENUM: check(rval, read_enum(reader, enc, consumer, ud)); break; case AVRO_ARRAY: check(rval, read_array(reader, enc, consumer, ud)); break; case AVRO_MAP: check(rval, read_map(reader, enc, consumer, ud)); break; case AVRO_UNION: check(rval, read_union(reader, enc, consumer, ud)); break; case AVRO_RECORD: check(rval, read_record(reader, enc, consumer, ud)); break; case AVRO_LINK: avro_set_error("Consumer can't consume a link schema directly"); return EINVAL; } return 0; }