diff options
Diffstat (limited to 'fluent-bit/lib/avro/src/resolver.c')
-rw-r--r-- | fluent-bit/lib/avro/src/resolver.c | 1338 |
1 files changed, 1338 insertions, 0 deletions
diff --git a/fluent-bit/lib/avro/src/resolver.c b/fluent-bit/lib/avro/src/resolver.c new file mode 100644 index 000000000..f0256c265 --- /dev/null +++ b/fluent-bit/lib/avro/src/resolver.c @@ -0,0 +1,1338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include <avro/platform.h> +#include <stdlib.h> +#include <string.h> + +#include "avro/allocation.h" +#include "avro/consumer.h" +#include "avro/data.h" +#include "avro/errors.h" +#include "avro/legacy.h" +#include "avro/schema.h" +#include "avro_private.h" +#include "st.h" + + +#if !defined(DEBUG_RESOLVER) +#define DEBUG_RESOLVER 0 +#endif + +#if DEBUG_RESOLVER +#include <stdio.h> +#define debug(...) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, "\n"); } +#else +#define debug(...) /* no debug output */ +#endif + + +typedef struct avro_resolver_t avro_resolver_t; + +struct avro_resolver_t { + avro_consumer_t parent; + + /* The reader schema for this resolver. */ + avro_schema_t rschema; + + /* An array of any child resolvers needed for the subschemas of + * wschema */ + avro_consumer_t **child_resolvers; + + /* If the reader and writer schemas are records, this field + * contains a mapping from writer field indices to reader field + * indices. */ + int *index_mapping; + + /* The number of elements in the child_resolvers and + * index_mapping arrays. */ + size_t num_children; + + /* If the reader schema is a union, but the writer schema is + * not, this field indicates which branch of the reader union + * should be selected. */ + int reader_union_branch; +}; + + +/** + * Frees a resolver object, while ensuring that all of the resolvers in + * a graph of resolvers is only freed once. + */ + +static void +avro_resolver_free_cycles(avro_consumer_t *consumer, st_table *freeing) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + + /* + * First check if we've already started freeing this resolver. + */ + + if (st_lookup(freeing, (st_data_t) resolver, NULL)) { + return; + } + + /* + * Otherwise add this resolver to the freeing set, and then + * actually free the thing. + */ + + st_insert(freeing, (st_data_t) resolver, (st_data_t) NULL); + + avro_schema_decref(resolver->parent.schema); + avro_schema_decref(resolver->rschema); + if (resolver->child_resolvers) { + unsigned int i; + for (i = 0; i < resolver->num_children; i++) { + avro_consumer_t *child = resolver->child_resolvers[i]; + if (child) { + avro_resolver_free_cycles(child, freeing); + } + } + avro_free(resolver->child_resolvers, + sizeof(avro_resolver_t *) * resolver->num_children); + } + if (resolver->index_mapping) { + avro_free(resolver->index_mapping, + sizeof(int) * resolver->num_children); + } + avro_freet(avro_resolver_t, resolver); +} + + +static void +avro_resolver_free(avro_consumer_t *consumer) +{ + st_table *freeing = st_init_numtable(); + avro_resolver_free_cycles(consumer, freeing); + st_free_table(freeing); +} + +/** + * Create a new avro_resolver_t instance. You must fill in the callback + * pointers that are appropriate for the writer schema after this + * function returns. + */ + +static avro_resolver_t * +avro_resolver_create(avro_schema_t wschema, + avro_schema_t rschema) +{ + avro_resolver_t *resolver = (avro_resolver_t *) avro_new(avro_resolver_t); + memset(resolver, 0, sizeof(avro_resolver_t)); + + resolver->parent.free = avro_resolver_free; + resolver->parent.schema = avro_schema_incref(wschema); + resolver->rschema = avro_schema_incref(rschema); + resolver->reader_union_branch = -1; + return resolver; +} + + +static avro_datum_t +avro_resolver_get_real_dest(avro_resolver_t *resolver, avro_datum_t dest) +{ + if (resolver->reader_union_branch < 0) { + /* + * The reader schema isn't a union, so use the dest + * field as-is. + */ + + return dest; + } + + debug("Retrieving union branch %d for %s value", + resolver->reader_union_branch, + avro_schema_type_name(resolver->parent.schema)); + + avro_datum_t branch = NULL; + avro_union_set_discriminant + (dest, resolver->reader_union_branch, &branch); + return branch; +} + + +#define skip_links(schema) \ + while (is_avro_link(schema)) { \ + schema = avro_schema_link_target(schema); \ + } + + +/*----------------------------------------------------------------------- + * Memoized resolvers + */ + +static avro_consumer_t * +avro_resolver_new_memoized(avro_memoize_t *mem, + avro_schema_t wschema, avro_schema_t rschema); + + +/*----------------------------------------------------------------------- + * Reader unions + */ + +/* + * For each Avro type, we have to check whether the reader schema on its + * own is compatible, and whether the reader is a union that contains a + * compatible type. The macros in this section help us perform both of + * these checks with less code. + */ + + +/** + * A helper macro that handles the case where neither writer nor reader + * are unions. Uses @ref check_func to see if the two schemas are + * compatible. + */ + +#define check_non_union(saved, wschema, rschema, check_func) \ +do { \ + avro_resolver_t *self = NULL; \ + int rc = check_func(saved, &self, wschema, rschema, \ + rschema); \ + if (self) { \ + debug("Non-union schemas %s (writer) " \ + "and %s (reader) match", \ + avro_schema_type_name(wschema), \ + avro_schema_type_name(rschema)); \ + \ + self->reader_union_branch = -1; \ + return &self->parent; \ + } \ + \ + if (rc) { \ + return NULL; \ + } \ +} while (0) + + +/** + * Helper macro that handles the case where the reader is a union, and + * the writer is not. Checks each branch of the reader union schema, + * looking for the first branch that is compatible with the writer + * schema. The @ref check_func argument should be a function that can + * check the compatiblity of each branch schema. + */ + +#define check_reader_union(saved, wschema, rschema, check_func) \ +do { \ + if (!is_avro_union(rschema)) { \ + break; \ + } \ + \ + debug("Checking reader union schema"); \ + size_t num_branches = avro_schema_union_size(rschema); \ + unsigned int i; \ + \ + for (i = 0; i < num_branches; i++) { \ + avro_schema_t branch_schema = \ + avro_schema_union_branch(rschema, i); \ + skip_links(branch_schema); \ + avro_resolver_t *self = NULL; \ + int rc = check_func(saved, &self, \ + wschema, branch_schema, \ + rschema); \ + if (self) { \ + debug("Reader union branch %d (%s) " \ + "and writer %s match", \ + i, avro_schema_type_name(branch_schema), \ + avro_schema_type_name(wschema)); \ + self->reader_union_branch = i; \ + return &self->parent; \ + } else { \ + debug("Reader union branch %d (%s) " \ + "doesn't match", \ + i, avro_schema_type_name(branch_schema)); \ + } \ + \ + if (rc) { \ + return NULL; \ + } \ + } \ + \ + debug("No reader union branches match"); \ +} while (0) + +/** + * A helper macro that defines wraps together check_non_union and + * check_reader_union for a simple (non-union) writer schema type. + */ + +#define check_simple_writer(saved, wschema, rschema, type_name) \ +do { \ + check_non_union(saved, wschema, rschema, try_##type_name); \ + check_reader_union(saved, wschema, rschema, try_##type_name); \ + debug("Writer %s doesn't match reader %s", \ + avro_schema_type_name(wschema), \ + avro_schema_type_name(rschema)); \ + avro_set_error("Cannot store " #type_name " into %s", \ + avro_schema_type_name(rschema)); \ + return NULL; \ +} while (0) + + +/*----------------------------------------------------------------------- + * primitives + */ + +static int +avro_resolver_boolean_value(avro_consumer_t *consumer, int value, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %s into %p", value? "TRUE": "FALSE", dest); + return avro_boolean_set(dest, value); +} + +static int +try_boolean(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_boolean(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.boolean_value = avro_resolver_boolean_value; + } + return 0; +} + + +static void +free_bytes(void *ptr, size_t sz) +{ + /* + * The binary encoder class allocates bytes values with an extra + * byte, so that they're NUL terminated. + */ + avro_free(ptr, sz+1); +} + +static int +avro_resolver_bytes_value(avro_consumer_t *consumer, + const void *value, size_t value_len, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %" PRIsz " bytes into %p", value_len, dest); + return avro_givebytes_set(dest, (const char *) value, value_len, free_bytes); +} + +static int +try_bytes(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_bytes(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.bytes_value = avro_resolver_bytes_value; + } + return 0; +} + + +static int +avro_resolver_double_value(avro_consumer_t *consumer, double value, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %le into %p", value, dest); + return avro_double_set(dest, value); +} + +static int +try_double(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_double(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.double_value = avro_resolver_double_value; + } + return 0; +} + + +static int +avro_resolver_float_value(avro_consumer_t *consumer, float value, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %e into %p", value, dest); + return avro_float_set(dest, value); +} + +static int +avro_resolver_float_double_value(avro_consumer_t *consumer, float value, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %e into %p (promoting float to double)", value, dest); + return avro_double_set(dest, value); +} + +static int +try_float(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_float(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.float_value = avro_resolver_float_value; + } + else if (is_avro_double(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.float_value = avro_resolver_float_double_value; + } + return 0; +} + + +static int +avro_resolver_int_value(avro_consumer_t *consumer, int32_t value, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %" PRId32 " into %p", value, dest); + return avro_int32_set(dest, value); +} + +static int +avro_resolver_int_long_value(avro_consumer_t *consumer, int32_t value, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %" PRId32 " into %p (promoting int to long)", value, dest); + return avro_int64_set(dest, value); +} + +static int +avro_resolver_int_double_value(avro_consumer_t *consumer, int32_t value, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %" PRId32 " into %p (promoting int to double)", value, dest); + return avro_double_set(dest, value); +} + +static int +avro_resolver_int_float_value(avro_consumer_t *consumer, int32_t value, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %" PRId32 " into %p (promoting int to float)", value, dest); + return avro_float_set(dest, (const float) value); +} + +static int +try_int(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_int32(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.int_value = avro_resolver_int_value; + } + else if (is_avro_int64(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.int_value = avro_resolver_int_long_value; + } + else if (is_avro_double(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.int_value = avro_resolver_int_double_value; + } + else if (is_avro_float(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.int_value = avro_resolver_int_float_value; + } + return 0; +} + + +static int +avro_resolver_long_value(avro_consumer_t *consumer, int64_t value, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %" PRId64 " into %p", value, dest); + return avro_int64_set(dest, value); +} + +static int +avro_resolver_long_float_value(avro_consumer_t *consumer, int64_t value, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %" PRId64 " into %p (promoting long to float)", value, dest); + return avro_float_set(dest, (const float) value); +} + +static int +avro_resolver_long_double_value(avro_consumer_t *consumer, int64_t value, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing %" PRId64 " into %p (promoting long to double)", value, dest); + return avro_double_set(dest, (const double) value); +} + +static int +try_long(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_int64(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.long_value = avro_resolver_long_value; + } + else if (is_avro_double(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.long_value = avro_resolver_long_double_value; + } + else if (is_avro_float(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.long_value = avro_resolver_long_float_value; + } + return 0; +} + + +static int +avro_resolver_null_value(avro_consumer_t *consumer, void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + + AVRO_UNUSED(dest); + debug("Storing null into %p", dest); + return 0; +} + +static int +try_null(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_null(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.null_value = avro_resolver_null_value; + } + return 0; +} + + +static int +avro_resolver_string_value(avro_consumer_t *consumer, + const void *value, size_t value_len, + void *user_data) +{ + AVRO_UNUSED(value_len); + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing \"%s\" into %p", (const char *) value, dest); + return avro_givestring_set(dest, (const char *) value, avro_alloc_free_func); +} + +static int +try_string(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_string(rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.string_value = avro_resolver_string_value; + } + return 0; +} + + +/*----------------------------------------------------------------------- + * arrays + */ + +static int +avro_resolver_array_start_block(avro_consumer_t *consumer, + int is_first_block, + unsigned int block_count, + void *user_data) +{ + if (is_first_block) { + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + AVRO_UNUSED(dest); + + debug("Starting array %p", dest); + } + + AVRO_UNUSED(block_count); + return 0; +} + +static int +avro_resolver_array_element(avro_consumer_t *consumer, + unsigned int index, + avro_consumer_t **element_consumer, + void **element_user_data, + void *user_data) +{ + AVRO_UNUSED(index); + + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Adding element to array %p", dest); + + /* + * Allocate a new element datum and add it to the array. + */ + + avro_schema_t array_schema = avro_datum_get_schema(dest); + avro_schema_t item_schema = avro_schema_array_items(array_schema); + avro_datum_t element = avro_datum_from_schema(item_schema); + avro_array_append_datum(dest, element); + avro_datum_decref(element); + + /* + * Return the consumer that we allocated to process the array's + * children. + */ + + *element_consumer = resolver->child_resolvers[0]; + *element_user_data = element; + return 0; +} + +static int +try_array(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + /* + * First verify that the reader is an array. + */ + + if (!is_avro_array(rschema)) { + return 0; + } + + /* + * Array schemas have to have compatible element schemas to be + * compatible themselves. Try to create an avro_resolver_t to + * check the compatibility. + */ + + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + + avro_schema_t witems = avro_schema_array_items(wschema); + avro_schema_t ritems = avro_schema_array_items(rschema); + + avro_consumer_t *item_consumer = + avro_resolver_new_memoized(mem, witems, ritems); + if (!item_consumer) { + avro_memoize_delete(mem, wschema, root_rschema); + avro_consumer_free(&(*resolver)->parent); + avro_prefix_error("Array values aren't compatible: "); + return EINVAL; + } + + /* + * The two schemas are compatible, so go ahead and create a + * GavroResolver for the array. Store the item schema's + * resolver into the child_resolvers field. + */ + + (*resolver)->num_children = 1; + (*resolver)->child_resolvers = (avro_consumer_t **) avro_calloc(1, sizeof(avro_consumer_t *)); + (*resolver)->child_resolvers[0] = item_consumer; + (*resolver)->parent.array_start_block = avro_resolver_array_start_block; + (*resolver)->parent.array_element = avro_resolver_array_element; + + return 0; +} + + +/*----------------------------------------------------------------------- + * enums + */ + +static int +avro_resolver_enum_value(avro_consumer_t *consumer, int value, + void *user_data) +{ + AVRO_UNUSED(value); + + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + + const char *symbol_name = avro_schema_enum_get(resolver->parent.schema, value); + debug("Storing symbol %s into %p", symbol_name, dest); + return avro_enum_set_name(dest, symbol_name); +} + +static int +try_enum(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + /* + * Enum schemas have to have the same name — but not the same + * list of symbols — to be compatible. + */ + + if (is_avro_enum(rschema)) { + const char *wname = avro_schema_name(wschema); + const char *rname = avro_schema_name(rschema); + + if (!strcmp(wname, rname)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.enum_value = avro_resolver_enum_value; + } + } + return 0; +} + + +/*----------------------------------------------------------------------- + * fixed + */ + +static int +avro_resolver_fixed_value(avro_consumer_t *consumer, + const void *value, size_t value_len, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Storing (fixed) %" PRIsz " bytes into %p", value_len, dest); + return avro_givefixed_set(dest, (const char *) value, value_len, avro_alloc_free_func); +} + +static int +try_fixed(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + /* + * Fixed schemas need the same name and size to be compatible. + */ + + if (avro_schema_equal(wschema, rschema)) { + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + (*resolver)->parent.fixed_value = avro_resolver_fixed_value; + } + return 0; +} + + +/*----------------------------------------------------------------------- + * maps + */ + +static int +avro_resolver_map_start_block(avro_consumer_t *consumer, + int is_first_block, + unsigned int block_count, + void *user_data) +{ + if (is_first_block) { + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + AVRO_UNUSED(dest); + + debug("Starting map %p", dest); + } + + AVRO_UNUSED(block_count); + return 0; +} + +static int +avro_resolver_map_element(avro_consumer_t *consumer, + unsigned int index, + const char *key, + avro_consumer_t **value_consumer, + void **value_user_data, + void *user_data) +{ + AVRO_UNUSED(index); + + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + debug("Adding element to map %p", dest); + + /* + * Allocate a new element datum and add it to the map. + */ + + avro_schema_t map_schema = avro_datum_get_schema(dest); + avro_schema_t value_schema = avro_schema_map_values(map_schema); + avro_datum_t value = avro_datum_from_schema(value_schema); + avro_map_set(dest, key, value); + avro_datum_decref(value); + + /* + * Return the consumer that we allocated to process the map's + * children. + */ + + *value_consumer = resolver->child_resolvers[0]; + *value_user_data = value; + return 0; +} + +static int +try_map(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + /* + * First verify that the reader is an map. + */ + + if (!is_avro_map(rschema)) { + return 0; + } + + /* + * Array schemas have to have compatible element schemas to be + * compatible themselves. Try to create an avro_resolver_t to + * check the compatibility. + */ + + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + + avro_schema_t wvalues = avro_schema_map_values(wschema); + avro_schema_t rvalues = avro_schema_map_values(rschema); + + avro_consumer_t *value_consumer = + avro_resolver_new_memoized(mem, wvalues, rvalues); + if (!value_consumer) { + avro_memoize_delete(mem, wschema, root_rschema); + avro_consumer_free(&(*resolver)->parent); + avro_prefix_error("Map values aren't compatible: "); + return EINVAL; + } + + /* + * The two schemas are compatible, so go ahead and create a + * GavroResolver for the map. Store the value schema's + * resolver into the child_resolvers field. + */ + + (*resolver)->num_children = 1; + (*resolver)->child_resolvers = (avro_consumer_t **) avro_calloc(1, sizeof(avro_consumer_t *)); + (*resolver)->child_resolvers[0] = value_consumer; + (*resolver)->parent.map_start_block = avro_resolver_map_start_block; + (*resolver)->parent.map_element = avro_resolver_map_element; + + return 0; +} + + +/*----------------------------------------------------------------------- + * records + */ + +static int +avro_resolver_record_start(avro_consumer_t *consumer, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + AVRO_UNUSED(dest); + + debug("Starting record at %p", dest); + + /* + * TODO: Eventually, we'll fill in default values for the extra + * reader fields here. + */ + + return 0; +} + +static int +avro_resolver_record_field(avro_consumer_t *consumer, + unsigned int index, + avro_consumer_t **field_consumer, + void **field_user_data, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + avro_datum_t ud_dest = (avro_datum_t) user_data; + avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); + + const char *field_name = + avro_schema_record_field_name(consumer->schema, index); + + /* + * Grab the resolver for this field of the writer record. If + * it's NULL, this this field doesn't exist in the reader + * record, and should be skipped. + */ + + debug("Retrieving resolver for writer field %i (%s)", + index, field_name); + + if (!resolver->child_resolvers[index]) { + debug("Reader doesn't have field %s, skipping", field_name); + return 0; + } + + /* + * TODO: Once we can retrieve record fields by index (quickly), + * use the index_mapping. + */ + + avro_datum_t field = NULL; + avro_record_get(dest, field_name, &field); + + *field_consumer = resolver->child_resolvers[index]; + *field_user_data = field; + return 0; +} + +static int +try_record(avro_memoize_t *mem, avro_resolver_t **resolver, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + /* + * First verify that the reader is also a record, and has the + * same name as the writer. + */ + + if (!is_avro_record(rschema)) { + return 0; + } + + const char *wname = avro_schema_name(wschema); + const char *rname = avro_schema_name(rschema); + + if (strcmp(wname, rname)) { + return 0; + } + + /* + * Categorize the fields in the record schemas. Fields that are + * only in the writer are ignored. Fields that are only in the + * reader raise a schema mismatch error, unless the field has a + * default value. Fields that are in both are resolved + * recursively. + * + * The child_resolver array will contain an avro_resolver_t for + * each field in the writer schema. To build this array, we + * loop through the fields of the reader schema. If that field + * is also in the writer schema, we resolve them recursively, + * and store the resolver into the array. If the field isn't in + * the writer schema, we raise an error. (TODO: Eventually, + * we'll handle default values here.) After this loop finishes, + * any NULLs in the child_resolver array will represent fields + * in the writer but not the reader; these fields will be + * skipped when processing the input. + */ + + *resolver = avro_resolver_create(wschema, root_rschema); + avro_memoize_set(mem, wschema, root_rschema, *resolver); + + size_t wfields = avro_schema_record_size(wschema); + size_t rfields = avro_schema_record_size(rschema); + + debug("Checking writer record schema %s", wname); + + avro_consumer_t **child_resolvers = + (avro_consumer_t **) avro_calloc(wfields, sizeof(avro_consumer_t *)); + int *index_mapping = (int *) avro_calloc(wfields, sizeof(int)); + + unsigned int ri; + for (ri = 0; ri < rfields; ri++) { + avro_schema_t rfield = + avro_schema_record_field_get_by_index(rschema, ri); + const char *field_name = + avro_schema_record_field_name(rschema, ri); + + debug("Resolving reader record field %u (%s)", ri, field_name); + + /* + * See if this field is also in the writer schema. + */ + + int wi = avro_schema_record_field_get_index(wschema, field_name); + + if (wi == -1) { + /* + * This field isn't in the writer schema — + * that's an error! TODO: Handle default + * values! + */ + + debug("Field %s isn't in writer", field_name); + avro_set_error("Reader field %s doesn't appear in writer", + field_name); + goto error; + } + + /* + * Try to recursively resolve the schemas for this + * field. If they're not compatible, that's an error. + */ + + avro_schema_t wfield = + avro_schema_record_field_get_by_index(wschema, wi); + avro_consumer_t *field_resolver = + avro_resolver_new_memoized(mem, wfield, rfield); + + if (!field_resolver) { + avro_prefix_error("Field %s isn't compatible: ", field_name); + goto error; + } + + /* + * Save the details for this field. + */ + + debug("Found match for field %s (%u in reader, %d in writer)", + field_name, ri, wi); + child_resolvers[wi] = field_resolver; + index_mapping[wi] = ri; + } + + /* + * We might not have found matches for all of the writer fields, + * but that's okay — any extras will be ignored. + */ + + (*resolver)->num_children = wfields; + (*resolver)->child_resolvers = child_resolvers; + (*resolver)->index_mapping = index_mapping; + (*resolver)->parent.record_start = avro_resolver_record_start; + (*resolver)->parent.record_field = avro_resolver_record_field; + return 0; + +error: + /* + * Clean up any consumer we might have already created. + */ + + avro_memoize_delete(mem, wschema, root_rschema); + avro_consumer_free(&(*resolver)->parent); + + { + unsigned int i; + for (i = 0; i < wfields; i++) { + if (child_resolvers[i]) { + avro_consumer_free(child_resolvers[i]); + } + } + } + + avro_free(child_resolvers, wfields * sizeof(avro_consumer_t *)); + avro_free(index_mapping, wfields * sizeof(int)); + return EINVAL; +} + + +/*----------------------------------------------------------------------- + * union + */ + +static int +avro_resolver_union_branch(avro_consumer_t *consumer, + unsigned int discriminant, + avro_consumer_t **branch_consumer, + void **branch_user_data, + void *user_data) +{ + avro_resolver_t *resolver = (avro_resolver_t *) consumer; + + /* + * Grab the resolver for this branch of the writer union. If + * it's NULL, then this branch is incompatible with the reader. + */ + + debug("Retrieving resolver for writer branch %u", discriminant); + + if (!resolver->child_resolvers[discriminant]) { + avro_set_error("Writer union branch %u is incompatible " + "with reader schema \"%s\"", + discriminant, avro_schema_type_name(resolver->rschema)); + return EINVAL; + } + + /* + * Return the branch's resolver. + */ + + *branch_consumer = resolver->child_resolvers[discriminant]; + *branch_user_data = user_data; + return 0; +} + +static avro_consumer_t * +try_union(avro_memoize_t *mem, avro_schema_t wschema, avro_schema_t rschema) +{ + /* + * For a writer union, we recursively try to resolve each branch + * against the reader schema. This will work correctly whether + * or not the reader is also a union — if the reader is a union, + * then we'll resolve each (non-union) writer branch against the + * reader union, which will be checked in our calls to + * check_simple_writer below. The net result is that we might + * end up trying every combination of writer and reader + * branches, when looking for compatible schemas. + * + * Regardless of what the reader schema is, for each writer + * branch, we stash away the recursive avro_resolver_t into the + * child_resolvers array. A NULL entry in this array means that + * that branch isn't compatible with the reader. This isn't an + * immediate schema resolution error, since we allow + * incompatible branches in the types as long as that branch + * never appears in the actual data. We only return an error if + * there are *no* branches that are compatible. + */ + + size_t num_branches = avro_schema_union_size(wschema); + debug("Checking %" PRIsz "-branch writer union schema", num_branches); + + avro_resolver_t *resolver = avro_resolver_create(wschema, rschema); + avro_memoize_set(mem, wschema, rschema, resolver); + + avro_consumer_t **child_resolvers = + (avro_consumer_t **) avro_calloc(num_branches, sizeof(avro_consumer_t *)); + int some_branch_compatible = 0; + + unsigned int i; + for (i = 0; i < num_branches; i++) { + avro_schema_t branch_schema = + avro_schema_union_branch(wschema, i); + + debug("Resolving writer union branch %u (%s)", + i, avro_schema_type_name(branch_schema)); + + /* + * Try to recursively resolve this branch of the writer + * union. Don't raise an error if this fails — it's + * okay for some of the branches to not be compatible + * with the reader, as long as those branches never + * appear in the input. + */ + + child_resolvers[i] = + avro_resolver_new_memoized(mem, branch_schema, rschema); + if (child_resolvers[i]) { + debug("Found match for writer union branch %u", i); + some_branch_compatible = 1; + } else { + debug("No match for writer union branch %u", i); + } + } + + /* + * As long as there's at least one branch that's compatible with + * the reader, then we consider this schema resolution a + * success. + */ + + if (!some_branch_compatible) { + debug("No writer union branches match"); + avro_set_error("No branches in the writer are compatible " + "with reader schema %s", + avro_schema_type_name(rschema)); + goto error; + } + + resolver->num_children = num_branches; + resolver->child_resolvers = child_resolvers; + resolver->parent.union_branch = avro_resolver_union_branch; + return &resolver->parent; + +error: + /* + * Clean up any consumer we might have already created. + */ + + avro_memoize_delete(mem, wschema, rschema); + avro_consumer_free(&resolver->parent); + + for (i = 0; i < num_branches; i++) { + if (child_resolvers[i]) { + avro_consumer_free(child_resolvers[i]); + } + } + + avro_free(child_resolvers, num_branches * sizeof(avro_consumer_t *)); + return NULL; +} + + +/*----------------------------------------------------------------------- + * schema type dispatcher + */ + +static avro_consumer_t * +avro_resolver_new_memoized(avro_memoize_t *mem, + avro_schema_t wschema, avro_schema_t rschema) +{ + check_param(NULL, is_avro_schema(wschema), "writer schema"); + check_param(NULL, is_avro_schema(rschema), "reader schema"); + + skip_links(wschema); + skip_links(rschema); + + /* + * First see if we've already matched these two schemas. If so, + * just return that resolver. + */ + + avro_resolver_t *saved = NULL; + if (avro_memoize_get(mem, wschema, rschema, (void **) &saved)) { + debug("Already resolved %s and %s", + avro_schema_type_name(wschema), + avro_schema_type_name(rschema)); + return &saved->parent; + } + + /* + * Otherwise we have some work to do. + */ + + switch (avro_typeof(wschema)) + { + case AVRO_BOOLEAN: + check_simple_writer(mem, wschema, rschema, boolean); + return NULL; + + case AVRO_BYTES: + check_simple_writer(mem, wschema, rschema, bytes); + return NULL; + + case AVRO_DOUBLE: + check_simple_writer(mem, wschema, rschema, double); + return NULL; + + case AVRO_FLOAT: + check_simple_writer(mem, wschema, rschema, float); + return NULL; + + case AVRO_INT32: + check_simple_writer(mem, wschema, rschema, int); + return NULL; + + case AVRO_INT64: + check_simple_writer(mem, wschema, rschema, long); + return NULL; + + case AVRO_NULL: + check_simple_writer(mem, wschema, rschema, null); + return NULL; + + case AVRO_STRING: + check_simple_writer(mem, wschema, rschema, string); + return NULL; + + case AVRO_ARRAY: + check_simple_writer(mem, wschema, rschema, array); + return NULL; + + case AVRO_ENUM: + check_simple_writer(mem, wschema, rschema, enum); + return NULL; + + case AVRO_FIXED: + check_simple_writer(mem, wschema, rschema, fixed); + return NULL; + + case AVRO_MAP: + check_simple_writer(mem, wschema, rschema, map); + return NULL; + + case AVRO_RECORD: + check_simple_writer(mem, wschema, rschema, record); + return NULL; + + case AVRO_UNION: + return try_union(mem, wschema, rschema); + + default: + avro_set_error("Unknown schema type"); + return NULL; + } + + return NULL; +} + + +avro_consumer_t * +avro_resolver_new(avro_schema_t wschema, avro_schema_t rschema) +{ + avro_memoize_t mem; + avro_memoize_init(&mem); + avro_consumer_t *result = + avro_resolver_new_memoized(&mem, wschema, rschema); + avro_memoize_done(&mem); + return result; +} |