/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to you under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. */ #include #include #include #include "avro/allocation.h" #include "avro/consumer.h" #include "avro/data.h" #include "avro/errors.h" #include "avro/legacy.h" #include "avro/schema.h" #include "avro_private.h" #include "st.h" #if !defined(DEBUG_RESOLVER) #define DEBUG_RESOLVER 0 #endif #if DEBUG_RESOLVER #include #define debug(...) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, "\n"); } #else #define debug(...) /* no debug output */ #endif typedef struct avro_resolver_t avro_resolver_t; struct avro_resolver_t { avro_consumer_t parent; /* The reader schema for this resolver. */ avro_schema_t rschema; /* An array of any child resolvers needed for the subschemas of * wschema */ avro_consumer_t **child_resolvers; /* If the reader and writer schemas are records, this field * contains a mapping from writer field indices to reader field * indices. */ int *index_mapping; /* The number of elements in the child_resolvers and * index_mapping arrays. */ size_t num_children; /* If the reader schema is a union, but the writer schema is * not, this field indicates which branch of the reader union * should be selected. */ int reader_union_branch; }; /** * Frees a resolver object, while ensuring that all of the resolvers in * a graph of resolvers is only freed once. */ static void avro_resolver_free_cycles(avro_consumer_t *consumer, st_table *freeing) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; /* * First check if we've already started freeing this resolver. */ if (st_lookup(freeing, (st_data_t) resolver, NULL)) { return; } /* * Otherwise add this resolver to the freeing set, and then * actually free the thing. */ st_insert(freeing, (st_data_t) resolver, (st_data_t) NULL); avro_schema_decref(resolver->parent.schema); avro_schema_decref(resolver->rschema); if (resolver->child_resolvers) { unsigned int i; for (i = 0; i < resolver->num_children; i++) { avro_consumer_t *child = resolver->child_resolvers[i]; if (child) { avro_resolver_free_cycles(child, freeing); } } avro_free(resolver->child_resolvers, sizeof(avro_resolver_t *) * resolver->num_children); } if (resolver->index_mapping) { avro_free(resolver->index_mapping, sizeof(int) * resolver->num_children); } avro_freet(avro_resolver_t, resolver); } static void avro_resolver_free(avro_consumer_t *consumer) { st_table *freeing = st_init_numtable(); avro_resolver_free_cycles(consumer, freeing); st_free_table(freeing); } /** * Create a new avro_resolver_t instance. You must fill in the callback * pointers that are appropriate for the writer schema after this * function returns. */ static avro_resolver_t * avro_resolver_create(avro_schema_t wschema, avro_schema_t rschema) { avro_resolver_t *resolver = (avro_resolver_t *) avro_new(avro_resolver_t); memset(resolver, 0, sizeof(avro_resolver_t)); resolver->parent.free = avro_resolver_free; resolver->parent.schema = avro_schema_incref(wschema); resolver->rschema = avro_schema_incref(rschema); resolver->reader_union_branch = -1; return resolver; } static avro_datum_t avro_resolver_get_real_dest(avro_resolver_t *resolver, avro_datum_t dest) { if (resolver->reader_union_branch < 0) { /* * The reader schema isn't a union, so use the dest * field as-is. */ return dest; } debug("Retrieving union branch %d for %s value", resolver->reader_union_branch, avro_schema_type_name(resolver->parent.schema)); avro_datum_t branch = NULL; avro_union_set_discriminant (dest, resolver->reader_union_branch, &branch); return branch; } #define skip_links(schema) \ while (is_avro_link(schema)) { \ schema = avro_schema_link_target(schema); \ } /*----------------------------------------------------------------------- * Memoized resolvers */ static avro_consumer_t * avro_resolver_new_memoized(avro_memoize_t *mem, avro_schema_t wschema, avro_schema_t rschema); /*----------------------------------------------------------------------- * Reader unions */ /* * For each Avro type, we have to check whether the reader schema on its * own is compatible, and whether the reader is a union that contains a * compatible type. The macros in this section help us perform both of * these checks with less code. */ /** * A helper macro that handles the case where neither writer nor reader * are unions. Uses @ref check_func to see if the two schemas are * compatible. */ #define check_non_union(saved, wschema, rschema, check_func) \ do { \ avro_resolver_t *self = NULL; \ int rc = check_func(saved, &self, wschema, rschema, \ rschema); \ if (self) { \ debug("Non-union schemas %s (writer) " \ "and %s (reader) match", \ avro_schema_type_name(wschema), \ avro_schema_type_name(rschema)); \ \ self->reader_union_branch = -1; \ return &self->parent; \ } \ \ if (rc) { \ return NULL; \ } \ } while (0) /** * Helper macro that handles the case where the reader is a union, and * the writer is not. Checks each branch of the reader union schema, * looking for the first branch that is compatible with the writer * schema. The @ref check_func argument should be a function that can * check the compatiblity of each branch schema. */ #define check_reader_union(saved, wschema, rschema, check_func) \ do { \ if (!is_avro_union(rschema)) { \ break; \ } \ \ debug("Checking reader union schema"); \ size_t num_branches = avro_schema_union_size(rschema); \ unsigned int i; \ \ for (i = 0; i < num_branches; i++) { \ avro_schema_t branch_schema = \ avro_schema_union_branch(rschema, i); \ skip_links(branch_schema); \ avro_resolver_t *self = NULL; \ int rc = check_func(saved, &self, \ wschema, branch_schema, \ rschema); \ if (self) { \ debug("Reader union branch %d (%s) " \ "and writer %s match", \ i, avro_schema_type_name(branch_schema), \ avro_schema_type_name(wschema)); \ self->reader_union_branch = i; \ return &self->parent; \ } else { \ debug("Reader union branch %d (%s) " \ "doesn't match", \ i, avro_schema_type_name(branch_schema)); \ } \ \ if (rc) { \ return NULL; \ } \ } \ \ debug("No reader union branches match"); \ } while (0) /** * A helper macro that defines wraps together check_non_union and * check_reader_union for a simple (non-union) writer schema type. */ #define check_simple_writer(saved, wschema, rschema, type_name) \ do { \ check_non_union(saved, wschema, rschema, try_##type_name); \ check_reader_union(saved, wschema, rschema, try_##type_name); \ debug("Writer %s doesn't match reader %s", \ avro_schema_type_name(wschema), \ avro_schema_type_name(rschema)); \ avro_set_error("Cannot store " #type_name " into %s", \ avro_schema_type_name(rschema)); \ return NULL; \ } while (0) /*----------------------------------------------------------------------- * primitives */ static int avro_resolver_boolean_value(avro_consumer_t *consumer, int value, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %s into %p", value? "TRUE": "FALSE", dest); return avro_boolean_set(dest, value); } static int try_boolean(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { if (is_avro_boolean(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.boolean_value = avro_resolver_boolean_value; } return 0; } static void free_bytes(void *ptr, size_t sz) { /* * The binary encoder class allocates bytes values with an extra * byte, so that they're NUL terminated. */ avro_free(ptr, sz+1); } static int avro_resolver_bytes_value(avro_consumer_t *consumer, const void *value, size_t value_len, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %" PRIsz " bytes into %p", value_len, dest); return avro_givebytes_set(dest, (const char *) value, value_len, free_bytes); } static int try_bytes(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { if (is_avro_bytes(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.bytes_value = avro_resolver_bytes_value; } return 0; } static int avro_resolver_double_value(avro_consumer_t *consumer, double value, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %le into %p", value, dest); return avro_double_set(dest, value); } static int try_double(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { if (is_avro_double(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.double_value = avro_resolver_double_value; } return 0; } static int avro_resolver_float_value(avro_consumer_t *consumer, float value, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %e into %p", value, dest); return avro_float_set(dest, value); } static int avro_resolver_float_double_value(avro_consumer_t *consumer, float value, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %e into %p (promoting float to double)", value, dest); return avro_double_set(dest, value); } static int try_float(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { if (is_avro_float(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.float_value = avro_resolver_float_value; } else if (is_avro_double(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.float_value = avro_resolver_float_double_value; } return 0; } static int avro_resolver_int_value(avro_consumer_t *consumer, int32_t value, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %" PRId32 " into %p", value, dest); return avro_int32_set(dest, value); } static int avro_resolver_int_long_value(avro_consumer_t *consumer, int32_t value, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %" PRId32 " into %p (promoting int to long)", value, dest); return avro_int64_set(dest, value); } static int avro_resolver_int_double_value(avro_consumer_t *consumer, int32_t value, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %" PRId32 " into %p (promoting int to double)", value, dest); return avro_double_set(dest, value); } static int avro_resolver_int_float_value(avro_consumer_t *consumer, int32_t value, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %" PRId32 " into %p (promoting int to float)", value, dest); return avro_float_set(dest, (const float) value); } static int try_int(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { if (is_avro_int32(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.int_value = avro_resolver_int_value; } else if (is_avro_int64(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.int_value = avro_resolver_int_long_value; } else if (is_avro_double(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.int_value = avro_resolver_int_double_value; } else if (is_avro_float(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.int_value = avro_resolver_int_float_value; } return 0; } static int avro_resolver_long_value(avro_consumer_t *consumer, int64_t value, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %" PRId64 " into %p", value, dest); return avro_int64_set(dest, value); } static int avro_resolver_long_float_value(avro_consumer_t *consumer, int64_t value, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %" PRId64 " into %p (promoting long to float)", value, dest); return avro_float_set(dest, (const float) value); } static int avro_resolver_long_double_value(avro_consumer_t *consumer, int64_t value, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing %" PRId64 " into %p (promoting long to double)", value, dest); return avro_double_set(dest, (const double) value); } static int try_long(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { if (is_avro_int64(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.long_value = avro_resolver_long_value; } else if (is_avro_double(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.long_value = avro_resolver_long_double_value; } else if (is_avro_float(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.long_value = avro_resolver_long_float_value; } return 0; } static int avro_resolver_null_value(avro_consumer_t *consumer, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); AVRO_UNUSED(dest); debug("Storing null into %p", dest); return 0; } static int try_null(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { if (is_avro_null(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.null_value = avro_resolver_null_value; } return 0; } static int avro_resolver_string_value(avro_consumer_t *consumer, const void *value, size_t value_len, void *user_data) { AVRO_UNUSED(value_len); avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing \"%s\" into %p", (const char *) value, dest); return avro_givestring_set(dest, (const char *) value, avro_alloc_free_func); } static int try_string(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { if (is_avro_string(rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.string_value = avro_resolver_string_value; } return 0; } /*----------------------------------------------------------------------- * arrays */ static int avro_resolver_array_start_block(avro_consumer_t *consumer, int is_first_block, unsigned int block_count, void *user_data) { if (is_first_block) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); AVRO_UNUSED(dest); debug("Starting array %p", dest); } AVRO_UNUSED(block_count); return 0; } static int avro_resolver_array_element(avro_consumer_t *consumer, unsigned int index, avro_consumer_t **element_consumer, void **element_user_data, void *user_data) { AVRO_UNUSED(index); avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Adding element to array %p", dest); /* * Allocate a new element datum and add it to the array. */ avro_schema_t array_schema = avro_datum_get_schema(dest); avro_schema_t item_schema = avro_schema_array_items(array_schema); avro_datum_t element = avro_datum_from_schema(item_schema); avro_array_append_datum(dest, element); avro_datum_decref(element); /* * Return the consumer that we allocated to process the array's * children. */ *element_consumer = resolver->child_resolvers[0]; *element_user_data = element; return 0; } static int try_array(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { /* * First verify that the reader is an array. */ if (!is_avro_array(rschema)) { return 0; } /* * Array schemas have to have compatible element schemas to be * compatible themselves. Try to create an avro_resolver_t to * check the compatibility. */ *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); avro_schema_t witems = avro_schema_array_items(wschema); avro_schema_t ritems = avro_schema_array_items(rschema); avro_consumer_t *item_consumer = avro_resolver_new_memoized(mem, witems, ritems); if (!item_consumer) { avro_memoize_delete(mem, wschema, root_rschema); avro_consumer_free(&(*resolver)->parent); avro_prefix_error("Array values aren't compatible: "); return EINVAL; } /* * The two schemas are compatible, so go ahead and create a * GavroResolver for the array. Store the item schema's * resolver into the child_resolvers field. */ (*resolver)->num_children = 1; (*resolver)->child_resolvers = (avro_consumer_t **) avro_calloc(1, sizeof(avro_consumer_t *)); (*resolver)->child_resolvers[0] = item_consumer; (*resolver)->parent.array_start_block = avro_resolver_array_start_block; (*resolver)->parent.array_element = avro_resolver_array_element; return 0; } /*----------------------------------------------------------------------- * enums */ static int avro_resolver_enum_value(avro_consumer_t *consumer, int value, void *user_data) { AVRO_UNUSED(value); avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); const char *symbol_name = avro_schema_enum_get(resolver->parent.schema, value); debug("Storing symbol %s into %p", symbol_name, dest); return avro_enum_set_name(dest, symbol_name); } static int try_enum(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { /* * Enum schemas have to have the same name — but not the same * list of symbols — to be compatible. */ if (is_avro_enum(rschema)) { const char *wname = avro_schema_name(wschema); const char *rname = avro_schema_name(rschema); if (!strcmp(wname, rname)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.enum_value = avro_resolver_enum_value; } } return 0; } /*----------------------------------------------------------------------- * fixed */ static int avro_resolver_fixed_value(avro_consumer_t *consumer, const void *value, size_t value_len, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Storing (fixed) %" PRIsz " bytes into %p", value_len, dest); return avro_givefixed_set(dest, (const char *) value, value_len, avro_alloc_free_func); } static int try_fixed(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { /* * Fixed schemas need the same name and size to be compatible. */ if (avro_schema_equal(wschema, rschema)) { *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); (*resolver)->parent.fixed_value = avro_resolver_fixed_value; } return 0; } /*----------------------------------------------------------------------- * maps */ static int avro_resolver_map_start_block(avro_consumer_t *consumer, int is_first_block, unsigned int block_count, void *user_data) { if (is_first_block) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); AVRO_UNUSED(dest); debug("Starting map %p", dest); } AVRO_UNUSED(block_count); return 0; } static int avro_resolver_map_element(avro_consumer_t *consumer, unsigned int index, const char *key, avro_consumer_t **value_consumer, void **value_user_data, void *user_data) { AVRO_UNUSED(index); avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); debug("Adding element to map %p", dest); /* * Allocate a new element datum and add it to the map. */ avro_schema_t map_schema = avro_datum_get_schema(dest); avro_schema_t value_schema = avro_schema_map_values(map_schema); avro_datum_t value = avro_datum_from_schema(value_schema); avro_map_set(dest, key, value); avro_datum_decref(value); /* * Return the consumer that we allocated to process the map's * children. */ *value_consumer = resolver->child_resolvers[0]; *value_user_data = value; return 0; } static int try_map(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { /* * First verify that the reader is an map. */ if (!is_avro_map(rschema)) { return 0; } /* * Array schemas have to have compatible element schemas to be * compatible themselves. Try to create an avro_resolver_t to * check the compatibility. */ *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); avro_schema_t wvalues = avro_schema_map_values(wschema); avro_schema_t rvalues = avro_schema_map_values(rschema); avro_consumer_t *value_consumer = avro_resolver_new_memoized(mem, wvalues, rvalues); if (!value_consumer) { avro_memoize_delete(mem, wschema, root_rschema); avro_consumer_free(&(*resolver)->parent); avro_prefix_error("Map values aren't compatible: "); return EINVAL; } /* * The two schemas are compatible, so go ahead and create a * GavroResolver for the map. Store the value schema's * resolver into the child_resolvers field. */ (*resolver)->num_children = 1; (*resolver)->child_resolvers = (avro_consumer_t **) avro_calloc(1, sizeof(avro_consumer_t *)); (*resolver)->child_resolvers[0] = value_consumer; (*resolver)->parent.map_start_block = avro_resolver_map_start_block; (*resolver)->parent.map_element = avro_resolver_map_element; return 0; } /*----------------------------------------------------------------------- * records */ static int avro_resolver_record_start(avro_consumer_t *consumer, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); AVRO_UNUSED(dest); debug("Starting record at %p", dest); /* * TODO: Eventually, we'll fill in default values for the extra * reader fields here. */ return 0; } static int avro_resolver_record_field(avro_consumer_t *consumer, unsigned int index, avro_consumer_t **field_consumer, void **field_user_data, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; avro_datum_t ud_dest = (avro_datum_t) user_data; avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest); const char *field_name = avro_schema_record_field_name(consumer->schema, index); /* * Grab the resolver for this field of the writer record. If * it's NULL, this this field doesn't exist in the reader * record, and should be skipped. */ debug("Retrieving resolver for writer field %i (%s)", index, field_name); if (!resolver->child_resolvers[index]) { debug("Reader doesn't have field %s, skipping", field_name); return 0; } /* * TODO: Once we can retrieve record fields by index (quickly), * use the index_mapping. */ avro_datum_t field = NULL; avro_record_get(dest, field_name, &field); *field_consumer = resolver->child_resolvers[index]; *field_user_data = field; return 0; } static int try_record(avro_memoize_t *mem, avro_resolver_t **resolver, avro_schema_t wschema, avro_schema_t rschema, avro_schema_t root_rschema) { /* * First verify that the reader is also a record, and has the * same name as the writer. */ if (!is_avro_record(rschema)) { return 0; } const char *wname = avro_schema_name(wschema); const char *rname = avro_schema_name(rschema); if (strcmp(wname, rname)) { return 0; } /* * Categorize the fields in the record schemas. Fields that are * only in the writer are ignored. Fields that are only in the * reader raise a schema mismatch error, unless the field has a * default value. Fields that are in both are resolved * recursively. * * The child_resolver array will contain an avro_resolver_t for * each field in the writer schema. To build this array, we * loop through the fields of the reader schema. If that field * is also in the writer schema, we resolve them recursively, * and store the resolver into the array. If the field isn't in * the writer schema, we raise an error. (TODO: Eventually, * we'll handle default values here.) After this loop finishes, * any NULLs in the child_resolver array will represent fields * in the writer but not the reader; these fields will be * skipped when processing the input. */ *resolver = avro_resolver_create(wschema, root_rschema); avro_memoize_set(mem, wschema, root_rschema, *resolver); size_t wfields = avro_schema_record_size(wschema); size_t rfields = avro_schema_record_size(rschema); debug("Checking writer record schema %s", wname); avro_consumer_t **child_resolvers = (avro_consumer_t **) avro_calloc(wfields, sizeof(avro_consumer_t *)); int *index_mapping = (int *) avro_calloc(wfields, sizeof(int)); unsigned int ri; for (ri = 0; ri < rfields; ri++) { avro_schema_t rfield = avro_schema_record_field_get_by_index(rschema, ri); const char *field_name = avro_schema_record_field_name(rschema, ri); debug("Resolving reader record field %u (%s)", ri, field_name); /* * See if this field is also in the writer schema. */ int wi = avro_schema_record_field_get_index(wschema, field_name); if (wi == -1) { /* * This field isn't in the writer schema — * that's an error! TODO: Handle default * values! */ debug("Field %s isn't in writer", field_name); avro_set_error("Reader field %s doesn't appear in writer", field_name); goto error; } /* * Try to recursively resolve the schemas for this * field. If they're not compatible, that's an error. */ avro_schema_t wfield = avro_schema_record_field_get_by_index(wschema, wi); avro_consumer_t *field_resolver = avro_resolver_new_memoized(mem, wfield, rfield); if (!field_resolver) { avro_prefix_error("Field %s isn't compatible: ", field_name); goto error; } /* * Save the details for this field. */ debug("Found match for field %s (%u in reader, %d in writer)", field_name, ri, wi); child_resolvers[wi] = field_resolver; index_mapping[wi] = ri; } /* * We might not have found matches for all of the writer fields, * but that's okay — any extras will be ignored. */ (*resolver)->num_children = wfields; (*resolver)->child_resolvers = child_resolvers; (*resolver)->index_mapping = index_mapping; (*resolver)->parent.record_start = avro_resolver_record_start; (*resolver)->parent.record_field = avro_resolver_record_field; return 0; error: /* * Clean up any consumer we might have already created. */ avro_memoize_delete(mem, wschema, root_rschema); avro_consumer_free(&(*resolver)->parent); { unsigned int i; for (i = 0; i < wfields; i++) { if (child_resolvers[i]) { avro_consumer_free(child_resolvers[i]); } } } avro_free(child_resolvers, wfields * sizeof(avro_consumer_t *)); avro_free(index_mapping, wfields * sizeof(int)); return EINVAL; } /*----------------------------------------------------------------------- * union */ static int avro_resolver_union_branch(avro_consumer_t *consumer, unsigned int discriminant, avro_consumer_t **branch_consumer, void **branch_user_data, void *user_data) { avro_resolver_t *resolver = (avro_resolver_t *) consumer; /* * Grab the resolver for this branch of the writer union. If * it's NULL, then this branch is incompatible with the reader. */ debug("Retrieving resolver for writer branch %u", discriminant); if (!resolver->child_resolvers[discriminant]) { avro_set_error("Writer union branch %u is incompatible " "with reader schema \"%s\"", discriminant, avro_schema_type_name(resolver->rschema)); return EINVAL; } /* * Return the branch's resolver. */ *branch_consumer = resolver->child_resolvers[discriminant]; *branch_user_data = user_data; return 0; } static avro_consumer_t * try_union(avro_memoize_t *mem, avro_schema_t wschema, avro_schema_t rschema) { /* * For a writer union, we recursively try to resolve each branch * against the reader schema. This will work correctly whether * or not the reader is also a union — if the reader is a union, * then we'll resolve each (non-union) writer branch against the * reader union, which will be checked in our calls to * check_simple_writer below. The net result is that we might * end up trying every combination of writer and reader * branches, when looking for compatible schemas. * * Regardless of what the reader schema is, for each writer * branch, we stash away the recursive avro_resolver_t into the * child_resolvers array. A NULL entry in this array means that * that branch isn't compatible with the reader. This isn't an * immediate schema resolution error, since we allow * incompatible branches in the types as long as that branch * never appears in the actual data. We only return an error if * there are *no* branches that are compatible. */ size_t num_branches = avro_schema_union_size(wschema); debug("Checking %" PRIsz "-branch writer union schema", num_branches); avro_resolver_t *resolver = avro_resolver_create(wschema, rschema); avro_memoize_set(mem, wschema, rschema, resolver); avro_consumer_t **child_resolvers = (avro_consumer_t **) avro_calloc(num_branches, sizeof(avro_consumer_t *)); int some_branch_compatible = 0; unsigned int i; for (i = 0; i < num_branches; i++) { avro_schema_t branch_schema = avro_schema_union_branch(wschema, i); debug("Resolving writer union branch %u (%s)", i, avro_schema_type_name(branch_schema)); /* * Try to recursively resolve this branch of the writer * union. Don't raise an error if this fails — it's * okay for some of the branches to not be compatible * with the reader, as long as those branches never * appear in the input. */ child_resolvers[i] = avro_resolver_new_memoized(mem, branch_schema, rschema); if (child_resolvers[i]) { debug("Found match for writer union branch %u", i); some_branch_compatible = 1; } else { debug("No match for writer union branch %u", i); } } /* * As long as there's at least one branch that's compatible with * the reader, then we consider this schema resolution a * success. */ if (!some_branch_compatible) { debug("No writer union branches match"); avro_set_error("No branches in the writer are compatible " "with reader schema %s", avro_schema_type_name(rschema)); goto error; } resolver->num_children = num_branches; resolver->child_resolvers = child_resolvers; resolver->parent.union_branch = avro_resolver_union_branch; return &resolver->parent; error: /* * Clean up any consumer we might have already created. */ avro_memoize_delete(mem, wschema, rschema); avro_consumer_free(&resolver->parent); for (i = 0; i < num_branches; i++) { if (child_resolvers[i]) { avro_consumer_free(child_resolvers[i]); } } avro_free(child_resolvers, num_branches * sizeof(avro_consumer_t *)); return NULL; } /*----------------------------------------------------------------------- * schema type dispatcher */ static avro_consumer_t * avro_resolver_new_memoized(avro_memoize_t *mem, avro_schema_t wschema, avro_schema_t rschema) { check_param(NULL, is_avro_schema(wschema), "writer schema"); check_param(NULL, is_avro_schema(rschema), "reader schema"); skip_links(wschema); skip_links(rschema); /* * First see if we've already matched these two schemas. If so, * just return that resolver. */ avro_resolver_t *saved = NULL; if (avro_memoize_get(mem, wschema, rschema, (void **) &saved)) { debug("Already resolved %s and %s", avro_schema_type_name(wschema), avro_schema_type_name(rschema)); return &saved->parent; } /* * Otherwise we have some work to do. */ switch (avro_typeof(wschema)) { case AVRO_BOOLEAN: check_simple_writer(mem, wschema, rschema, boolean); return NULL; case AVRO_BYTES: check_simple_writer(mem, wschema, rschema, bytes); return NULL; case AVRO_DOUBLE: check_simple_writer(mem, wschema, rschema, double); return NULL; case AVRO_FLOAT: check_simple_writer(mem, wschema, rschema, float); return NULL; case AVRO_INT32: check_simple_writer(mem, wschema, rschema, int); return NULL; case AVRO_INT64: check_simple_writer(mem, wschema, rschema, long); return NULL; case AVRO_NULL: check_simple_writer(mem, wschema, rschema, null); return NULL; case AVRO_STRING: check_simple_writer(mem, wschema, rschema, string); return NULL; case AVRO_ARRAY: check_simple_writer(mem, wschema, rschema, array); return NULL; case AVRO_ENUM: check_simple_writer(mem, wschema, rschema, enum); return NULL; case AVRO_FIXED: check_simple_writer(mem, wschema, rschema, fixed); return NULL; case AVRO_MAP: check_simple_writer(mem, wschema, rschema, map); return NULL; case AVRO_RECORD: check_simple_writer(mem, wschema, rschema, record); return NULL; case AVRO_UNION: return try_union(mem, wschema, rschema); default: avro_set_error("Unknown schema type"); return NULL; } return NULL; } avro_consumer_t * avro_resolver_new(avro_schema_t wschema, avro_schema_t rschema) { avro_memoize_t mem; avro_memoize_init(&mem); avro_consumer_t *result = avro_resolver_new_memoized(&mem, wschema, rschema); avro_memoize_done(&mem); return result; }