summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/avro/src/resolver.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/avro/src/resolver.c')
-rw-r--r--fluent-bit/lib/avro/src/resolver.c1338
1 files changed, 1338 insertions, 0 deletions
diff --git a/fluent-bit/lib/avro/src/resolver.c b/fluent-bit/lib/avro/src/resolver.c
new file mode 100644
index 000000000..f0256c265
--- /dev/null
+++ b/fluent-bit/lib/avro/src/resolver.c
@@ -0,0 +1,1338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include <avro/platform.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "avro/allocation.h"
+#include "avro/consumer.h"
+#include "avro/data.h"
+#include "avro/errors.h"
+#include "avro/legacy.h"
+#include "avro/schema.h"
+#include "avro_private.h"
+#include "st.h"
+
+
+#if !defined(DEBUG_RESOLVER)
+#define DEBUG_RESOLVER 0
+#endif
+
+#if DEBUG_RESOLVER
+#include <stdio.h>
+#define debug(...) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, "\n"); }
+#else
+#define debug(...) /* no debug output */
+#endif
+
+
+typedef struct avro_resolver_t avro_resolver_t;
+
+struct avro_resolver_t {
+ avro_consumer_t parent;
+
+ /* The reader schema for this resolver. */
+ avro_schema_t rschema;
+
+ /* An array of any child resolvers needed for the subschemas of
+ * wschema */
+ avro_consumer_t **child_resolvers;
+
+ /* If the reader and writer schemas are records, this field
+ * contains a mapping from writer field indices to reader field
+ * indices. */
+ int *index_mapping;
+
+ /* The number of elements in the child_resolvers and
+ * index_mapping arrays. */
+ size_t num_children;
+
+ /* If the reader schema is a union, but the writer schema is
+ * not, this field indicates which branch of the reader union
+ * should be selected. */
+ int reader_union_branch;
+};
+
+
+/**
+ * Frees a resolver object, while ensuring that all of the resolvers in
+ * a graph of resolvers is only freed once.
+ */
+
+static void
+avro_resolver_free_cycles(avro_consumer_t *consumer, st_table *freeing)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+
+ /*
+ * First check if we've already started freeing this resolver.
+ */
+
+ if (st_lookup(freeing, (st_data_t) resolver, NULL)) {
+ return;
+ }
+
+ /*
+ * Otherwise add this resolver to the freeing set, and then
+ * actually free the thing.
+ */
+
+ st_insert(freeing, (st_data_t) resolver, (st_data_t) NULL);
+
+ avro_schema_decref(resolver->parent.schema);
+ avro_schema_decref(resolver->rschema);
+ if (resolver->child_resolvers) {
+ unsigned int i;
+ for (i = 0; i < resolver->num_children; i++) {
+ avro_consumer_t *child = resolver->child_resolvers[i];
+ if (child) {
+ avro_resolver_free_cycles(child, freeing);
+ }
+ }
+ avro_free(resolver->child_resolvers,
+ sizeof(avro_resolver_t *) * resolver->num_children);
+ }
+ if (resolver->index_mapping) {
+ avro_free(resolver->index_mapping,
+ sizeof(int) * resolver->num_children);
+ }
+ avro_freet(avro_resolver_t, resolver);
+}
+
+
+static void
+avro_resolver_free(avro_consumer_t *consumer)
+{
+ st_table *freeing = st_init_numtable();
+ avro_resolver_free_cycles(consumer, freeing);
+ st_free_table(freeing);
+}
+
+/**
+ * Create a new avro_resolver_t instance. You must fill in the callback
+ * pointers that are appropriate for the writer schema after this
+ * function returns.
+ */
+
+static avro_resolver_t *
+avro_resolver_create(avro_schema_t wschema,
+ avro_schema_t rschema)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) avro_new(avro_resolver_t);
+ memset(resolver, 0, sizeof(avro_resolver_t));
+
+ resolver->parent.free = avro_resolver_free;
+ resolver->parent.schema = avro_schema_incref(wschema);
+ resolver->rschema = avro_schema_incref(rschema);
+ resolver->reader_union_branch = -1;
+ return resolver;
+}
+
+
+static avro_datum_t
+avro_resolver_get_real_dest(avro_resolver_t *resolver, avro_datum_t dest)
+{
+ if (resolver->reader_union_branch < 0) {
+ /*
+ * The reader schema isn't a union, so use the dest
+ * field as-is.
+ */
+
+ return dest;
+ }
+
+ debug("Retrieving union branch %d for %s value",
+ resolver->reader_union_branch,
+ avro_schema_type_name(resolver->parent.schema));
+
+ avro_datum_t branch = NULL;
+ avro_union_set_discriminant
+ (dest, resolver->reader_union_branch, &branch);
+ return branch;
+}
+
+
+#define skip_links(schema) \
+ while (is_avro_link(schema)) { \
+ schema = avro_schema_link_target(schema); \
+ }
+
+
+/*-----------------------------------------------------------------------
+ * Memoized resolvers
+ */
+
+static avro_consumer_t *
+avro_resolver_new_memoized(avro_memoize_t *mem,
+ avro_schema_t wschema, avro_schema_t rschema);
+
+
+/*-----------------------------------------------------------------------
+ * Reader unions
+ */
+
+/*
+ * For each Avro type, we have to check whether the reader schema on its
+ * own is compatible, and whether the reader is a union that contains a
+ * compatible type. The macros in this section help us perform both of
+ * these checks with less code.
+ */
+
+
+/**
+ * A helper macro that handles the case where neither writer nor reader
+ * are unions. Uses @ref check_func to see if the two schemas are
+ * compatible.
+ */
+
+#define check_non_union(saved, wschema, rschema, check_func) \
+do { \
+ avro_resolver_t *self = NULL; \
+ int rc = check_func(saved, &self, wschema, rschema, \
+ rschema); \
+ if (self) { \
+ debug("Non-union schemas %s (writer) " \
+ "and %s (reader) match", \
+ avro_schema_type_name(wschema), \
+ avro_schema_type_name(rschema)); \
+ \
+ self->reader_union_branch = -1; \
+ return &self->parent; \
+ } \
+ \
+ if (rc) { \
+ return NULL; \
+ } \
+} while (0)
+
+
+/**
+ * Helper macro that handles the case where the reader is a union, and
+ * the writer is not. Checks each branch of the reader union schema,
+ * looking for the first branch that is compatible with the writer
+ * schema. The @ref check_func argument should be a function that can
+ * check the compatiblity of each branch schema.
+ */
+
+#define check_reader_union(saved, wschema, rschema, check_func) \
+do { \
+ if (!is_avro_union(rschema)) { \
+ break; \
+ } \
+ \
+ debug("Checking reader union schema"); \
+ size_t num_branches = avro_schema_union_size(rschema); \
+ unsigned int i; \
+ \
+ for (i = 0; i < num_branches; i++) { \
+ avro_schema_t branch_schema = \
+ avro_schema_union_branch(rschema, i); \
+ skip_links(branch_schema); \
+ avro_resolver_t *self = NULL; \
+ int rc = check_func(saved, &self, \
+ wschema, branch_schema, \
+ rschema); \
+ if (self) { \
+ debug("Reader union branch %d (%s) " \
+ "and writer %s match", \
+ i, avro_schema_type_name(branch_schema), \
+ avro_schema_type_name(wschema)); \
+ self->reader_union_branch = i; \
+ return &self->parent; \
+ } else { \
+ debug("Reader union branch %d (%s) " \
+ "doesn't match", \
+ i, avro_schema_type_name(branch_schema)); \
+ } \
+ \
+ if (rc) { \
+ return NULL; \
+ } \
+ } \
+ \
+ debug("No reader union branches match"); \
+} while (0)
+
+/**
+ * A helper macro that defines wraps together check_non_union and
+ * check_reader_union for a simple (non-union) writer schema type.
+ */
+
+#define check_simple_writer(saved, wschema, rschema, type_name) \
+do { \
+ check_non_union(saved, wschema, rschema, try_##type_name); \
+ check_reader_union(saved, wschema, rschema, try_##type_name); \
+ debug("Writer %s doesn't match reader %s", \
+ avro_schema_type_name(wschema), \
+ avro_schema_type_name(rschema)); \
+ avro_set_error("Cannot store " #type_name " into %s", \
+ avro_schema_type_name(rschema)); \
+ return NULL; \
+} while (0)
+
+
+/*-----------------------------------------------------------------------
+ * primitives
+ */
+
+static int
+avro_resolver_boolean_value(avro_consumer_t *consumer, int value,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %s into %p", value? "TRUE": "FALSE", dest);
+ return avro_boolean_set(dest, value);
+}
+
+static int
+try_boolean(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ if (is_avro_boolean(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.boolean_value = avro_resolver_boolean_value;
+ }
+ return 0;
+}
+
+
+static void
+free_bytes(void *ptr, size_t sz)
+{
+ /*
+ * The binary encoder class allocates bytes values with an extra
+ * byte, so that they're NUL terminated.
+ */
+ avro_free(ptr, sz+1);
+}
+
+static int
+avro_resolver_bytes_value(avro_consumer_t *consumer,
+ const void *value, size_t value_len,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %" PRIsz " bytes into %p", value_len, dest);
+ return avro_givebytes_set(dest, (const char *) value, value_len, free_bytes);
+}
+
+static int
+try_bytes(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ if (is_avro_bytes(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.bytes_value = avro_resolver_bytes_value;
+ }
+ return 0;
+}
+
+
+static int
+avro_resolver_double_value(avro_consumer_t *consumer, double value,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %le into %p", value, dest);
+ return avro_double_set(dest, value);
+}
+
+static int
+try_double(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ if (is_avro_double(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.double_value = avro_resolver_double_value;
+ }
+ return 0;
+}
+
+
+static int
+avro_resolver_float_value(avro_consumer_t *consumer, float value,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %e into %p", value, dest);
+ return avro_float_set(dest, value);
+}
+
+static int
+avro_resolver_float_double_value(avro_consumer_t *consumer, float value,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %e into %p (promoting float to double)", value, dest);
+ return avro_double_set(dest, value);
+}
+
+static int
+try_float(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ if (is_avro_float(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.float_value = avro_resolver_float_value;
+ }
+ else if (is_avro_double(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.float_value = avro_resolver_float_double_value;
+ }
+ return 0;
+}
+
+
+static int
+avro_resolver_int_value(avro_consumer_t *consumer, int32_t value,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %" PRId32 " into %p", value, dest);
+ return avro_int32_set(dest, value);
+}
+
+static int
+avro_resolver_int_long_value(avro_consumer_t *consumer, int32_t value,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %" PRId32 " into %p (promoting int to long)", value, dest);
+ return avro_int64_set(dest, value);
+}
+
+static int
+avro_resolver_int_double_value(avro_consumer_t *consumer, int32_t value,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %" PRId32 " into %p (promoting int to double)", value, dest);
+ return avro_double_set(dest, value);
+}
+
+static int
+avro_resolver_int_float_value(avro_consumer_t *consumer, int32_t value,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %" PRId32 " into %p (promoting int to float)", value, dest);
+ return avro_float_set(dest, (const float) value);
+}
+
+static int
+try_int(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ if (is_avro_int32(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.int_value = avro_resolver_int_value;
+ }
+ else if (is_avro_int64(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.int_value = avro_resolver_int_long_value;
+ }
+ else if (is_avro_double(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.int_value = avro_resolver_int_double_value;
+ }
+ else if (is_avro_float(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.int_value = avro_resolver_int_float_value;
+ }
+ return 0;
+}
+
+
+static int
+avro_resolver_long_value(avro_consumer_t *consumer, int64_t value,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %" PRId64 " into %p", value, dest);
+ return avro_int64_set(dest, value);
+}
+
+static int
+avro_resolver_long_float_value(avro_consumer_t *consumer, int64_t value,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %" PRId64 " into %p (promoting long to float)", value, dest);
+ return avro_float_set(dest, (const float) value);
+}
+
+static int
+avro_resolver_long_double_value(avro_consumer_t *consumer, int64_t value,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing %" PRId64 " into %p (promoting long to double)", value, dest);
+ return avro_double_set(dest, (const double) value);
+}
+
+static int
+try_long(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ if (is_avro_int64(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.long_value = avro_resolver_long_value;
+ }
+ else if (is_avro_double(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.long_value = avro_resolver_long_double_value;
+ }
+ else if (is_avro_float(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.long_value = avro_resolver_long_float_value;
+ }
+ return 0;
+}
+
+
+static int
+avro_resolver_null_value(avro_consumer_t *consumer, void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+
+ AVRO_UNUSED(dest);
+ debug("Storing null into %p", dest);
+ return 0;
+}
+
+static int
+try_null(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ if (is_avro_null(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.null_value = avro_resolver_null_value;
+ }
+ return 0;
+}
+
+
+static int
+avro_resolver_string_value(avro_consumer_t *consumer,
+ const void *value, size_t value_len,
+ void *user_data)
+{
+ AVRO_UNUSED(value_len);
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing \"%s\" into %p", (const char *) value, dest);
+ return avro_givestring_set(dest, (const char *) value, avro_alloc_free_func);
+}
+
+static int
+try_string(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ if (is_avro_string(rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.string_value = avro_resolver_string_value;
+ }
+ return 0;
+}
+
+
+/*-----------------------------------------------------------------------
+ * arrays
+ */
+
+static int
+avro_resolver_array_start_block(avro_consumer_t *consumer,
+ int is_first_block,
+ unsigned int block_count,
+ void *user_data)
+{
+ if (is_first_block) {
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ AVRO_UNUSED(dest);
+
+ debug("Starting array %p", dest);
+ }
+
+ AVRO_UNUSED(block_count);
+ return 0;
+}
+
+static int
+avro_resolver_array_element(avro_consumer_t *consumer,
+ unsigned int index,
+ avro_consumer_t **element_consumer,
+ void **element_user_data,
+ void *user_data)
+{
+ AVRO_UNUSED(index);
+
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Adding element to array %p", dest);
+
+ /*
+ * Allocate a new element datum and add it to the array.
+ */
+
+ avro_schema_t array_schema = avro_datum_get_schema(dest);
+ avro_schema_t item_schema = avro_schema_array_items(array_schema);
+ avro_datum_t element = avro_datum_from_schema(item_schema);
+ avro_array_append_datum(dest, element);
+ avro_datum_decref(element);
+
+ /*
+ * Return the consumer that we allocated to process the array's
+ * children.
+ */
+
+ *element_consumer = resolver->child_resolvers[0];
+ *element_user_data = element;
+ return 0;
+}
+
+static int
+try_array(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ /*
+ * First verify that the reader is an array.
+ */
+
+ if (!is_avro_array(rschema)) {
+ return 0;
+ }
+
+ /*
+ * Array schemas have to have compatible element schemas to be
+ * compatible themselves. Try to create an avro_resolver_t to
+ * check the compatibility.
+ */
+
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+
+ avro_schema_t witems = avro_schema_array_items(wschema);
+ avro_schema_t ritems = avro_schema_array_items(rschema);
+
+ avro_consumer_t *item_consumer =
+ avro_resolver_new_memoized(mem, witems, ritems);
+ if (!item_consumer) {
+ avro_memoize_delete(mem, wschema, root_rschema);
+ avro_consumer_free(&(*resolver)->parent);
+ avro_prefix_error("Array values aren't compatible: ");
+ return EINVAL;
+ }
+
+ /*
+ * The two schemas are compatible, so go ahead and create a
+ * GavroResolver for the array. Store the item schema's
+ * resolver into the child_resolvers field.
+ */
+
+ (*resolver)->num_children = 1;
+ (*resolver)->child_resolvers = (avro_consumer_t **) avro_calloc(1, sizeof(avro_consumer_t *));
+ (*resolver)->child_resolvers[0] = item_consumer;
+ (*resolver)->parent.array_start_block = avro_resolver_array_start_block;
+ (*resolver)->parent.array_element = avro_resolver_array_element;
+
+ return 0;
+}
+
+
+/*-----------------------------------------------------------------------
+ * enums
+ */
+
+static int
+avro_resolver_enum_value(avro_consumer_t *consumer, int value,
+ void *user_data)
+{
+ AVRO_UNUSED(value);
+
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+
+ const char *symbol_name = avro_schema_enum_get(resolver->parent.schema, value);
+ debug("Storing symbol %s into %p", symbol_name, dest);
+ return avro_enum_set_name(dest, symbol_name);
+}
+
+static int
+try_enum(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ /*
+ * Enum schemas have to have the same name — but not the same
+ * list of symbols — to be compatible.
+ */
+
+ if (is_avro_enum(rschema)) {
+ const char *wname = avro_schema_name(wschema);
+ const char *rname = avro_schema_name(rschema);
+
+ if (!strcmp(wname, rname)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.enum_value = avro_resolver_enum_value;
+ }
+ }
+ return 0;
+}
+
+
+/*-----------------------------------------------------------------------
+ * fixed
+ */
+
+static int
+avro_resolver_fixed_value(avro_consumer_t *consumer,
+ const void *value, size_t value_len,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Storing (fixed) %" PRIsz " bytes into %p", value_len, dest);
+ return avro_givefixed_set(dest, (const char *) value, value_len, avro_alloc_free_func);
+}
+
+static int
+try_fixed(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ /*
+ * Fixed schemas need the same name and size to be compatible.
+ */
+
+ if (avro_schema_equal(wschema, rschema)) {
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+ (*resolver)->parent.fixed_value = avro_resolver_fixed_value;
+ }
+ return 0;
+}
+
+
+/*-----------------------------------------------------------------------
+ * maps
+ */
+
+static int
+avro_resolver_map_start_block(avro_consumer_t *consumer,
+ int is_first_block,
+ unsigned int block_count,
+ void *user_data)
+{
+ if (is_first_block) {
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ AVRO_UNUSED(dest);
+
+ debug("Starting map %p", dest);
+ }
+
+ AVRO_UNUSED(block_count);
+ return 0;
+}
+
+static int
+avro_resolver_map_element(avro_consumer_t *consumer,
+ unsigned int index,
+ const char *key,
+ avro_consumer_t **value_consumer,
+ void **value_user_data,
+ void *user_data)
+{
+ AVRO_UNUSED(index);
+
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ debug("Adding element to map %p", dest);
+
+ /*
+ * Allocate a new element datum and add it to the map.
+ */
+
+ avro_schema_t map_schema = avro_datum_get_schema(dest);
+ avro_schema_t value_schema = avro_schema_map_values(map_schema);
+ avro_datum_t value = avro_datum_from_schema(value_schema);
+ avro_map_set(dest, key, value);
+ avro_datum_decref(value);
+
+ /*
+ * Return the consumer that we allocated to process the map's
+ * children.
+ */
+
+ *value_consumer = resolver->child_resolvers[0];
+ *value_user_data = value;
+ return 0;
+}
+
+static int
+try_map(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ /*
+ * First verify that the reader is an map.
+ */
+
+ if (!is_avro_map(rschema)) {
+ return 0;
+ }
+
+ /*
+ * Array schemas have to have compatible element schemas to be
+ * compatible themselves. Try to create an avro_resolver_t to
+ * check the compatibility.
+ */
+
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+
+ avro_schema_t wvalues = avro_schema_map_values(wschema);
+ avro_schema_t rvalues = avro_schema_map_values(rschema);
+
+ avro_consumer_t *value_consumer =
+ avro_resolver_new_memoized(mem, wvalues, rvalues);
+ if (!value_consumer) {
+ avro_memoize_delete(mem, wschema, root_rschema);
+ avro_consumer_free(&(*resolver)->parent);
+ avro_prefix_error("Map values aren't compatible: ");
+ return EINVAL;
+ }
+
+ /*
+ * The two schemas are compatible, so go ahead and create a
+ * GavroResolver for the map. Store the value schema's
+ * resolver into the child_resolvers field.
+ */
+
+ (*resolver)->num_children = 1;
+ (*resolver)->child_resolvers = (avro_consumer_t **) avro_calloc(1, sizeof(avro_consumer_t *));
+ (*resolver)->child_resolvers[0] = value_consumer;
+ (*resolver)->parent.map_start_block = avro_resolver_map_start_block;
+ (*resolver)->parent.map_element = avro_resolver_map_element;
+
+ return 0;
+}
+
+
+/*-----------------------------------------------------------------------
+ * records
+ */
+
+static int
+avro_resolver_record_start(avro_consumer_t *consumer,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ AVRO_UNUSED(dest);
+
+ debug("Starting record at %p", dest);
+
+ /*
+ * TODO: Eventually, we'll fill in default values for the extra
+ * reader fields here.
+ */
+
+ return 0;
+}
+
+static int
+avro_resolver_record_field(avro_consumer_t *consumer,
+ unsigned int index,
+ avro_consumer_t **field_consumer,
+ void **field_user_data,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+ avro_datum_t ud_dest = (avro_datum_t) user_data;
+ avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+
+ const char *field_name =
+ avro_schema_record_field_name(consumer->schema, index);
+
+ /*
+ * Grab the resolver for this field of the writer record. If
+ * it's NULL, this this field doesn't exist in the reader
+ * record, and should be skipped.
+ */
+
+ debug("Retrieving resolver for writer field %i (%s)",
+ index, field_name);
+
+ if (!resolver->child_resolvers[index]) {
+ debug("Reader doesn't have field %s, skipping", field_name);
+ return 0;
+ }
+
+ /*
+ * TODO: Once we can retrieve record fields by index (quickly),
+ * use the index_mapping.
+ */
+
+ avro_datum_t field = NULL;
+ avro_record_get(dest, field_name, &field);
+
+ *field_consumer = resolver->child_resolvers[index];
+ *field_user_data = field;
+ return 0;
+}
+
+static int
+try_record(avro_memoize_t *mem, avro_resolver_t **resolver,
+ avro_schema_t wschema, avro_schema_t rschema,
+ avro_schema_t root_rschema)
+{
+ /*
+ * First verify that the reader is also a record, and has the
+ * same name as the writer.
+ */
+
+ if (!is_avro_record(rschema)) {
+ return 0;
+ }
+
+ const char *wname = avro_schema_name(wschema);
+ const char *rname = avro_schema_name(rschema);
+
+ if (strcmp(wname, rname)) {
+ return 0;
+ }
+
+ /*
+ * Categorize the fields in the record schemas. Fields that are
+ * only in the writer are ignored. Fields that are only in the
+ * reader raise a schema mismatch error, unless the field has a
+ * default value. Fields that are in both are resolved
+ * recursively.
+ *
+ * The child_resolver array will contain an avro_resolver_t for
+ * each field in the writer schema. To build this array, we
+ * loop through the fields of the reader schema. If that field
+ * is also in the writer schema, we resolve them recursively,
+ * and store the resolver into the array. If the field isn't in
+ * the writer schema, we raise an error. (TODO: Eventually,
+ * we'll handle default values here.) After this loop finishes,
+ * any NULLs in the child_resolver array will represent fields
+ * in the writer but not the reader; these fields will be
+ * skipped when processing the input.
+ */
+
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ avro_memoize_set(mem, wschema, root_rschema, *resolver);
+
+ size_t wfields = avro_schema_record_size(wschema);
+ size_t rfields = avro_schema_record_size(rschema);
+
+ debug("Checking writer record schema %s", wname);
+
+ avro_consumer_t **child_resolvers =
+ (avro_consumer_t **) avro_calloc(wfields, sizeof(avro_consumer_t *));
+ int *index_mapping = (int *) avro_calloc(wfields, sizeof(int));
+
+ unsigned int ri;
+ for (ri = 0; ri < rfields; ri++) {
+ avro_schema_t rfield =
+ avro_schema_record_field_get_by_index(rschema, ri);
+ const char *field_name =
+ avro_schema_record_field_name(rschema, ri);
+
+ debug("Resolving reader record field %u (%s)", ri, field_name);
+
+ /*
+ * See if this field is also in the writer schema.
+ */
+
+ int wi = avro_schema_record_field_get_index(wschema, field_name);
+
+ if (wi == -1) {
+ /*
+ * This field isn't in the writer schema —
+ * that's an error! TODO: Handle default
+ * values!
+ */
+
+ debug("Field %s isn't in writer", field_name);
+ avro_set_error("Reader field %s doesn't appear in writer",
+ field_name);
+ goto error;
+ }
+
+ /*
+ * Try to recursively resolve the schemas for this
+ * field. If they're not compatible, that's an error.
+ */
+
+ avro_schema_t wfield =
+ avro_schema_record_field_get_by_index(wschema, wi);
+ avro_consumer_t *field_resolver =
+ avro_resolver_new_memoized(mem, wfield, rfield);
+
+ if (!field_resolver) {
+ avro_prefix_error("Field %s isn't compatible: ", field_name);
+ goto error;
+ }
+
+ /*
+ * Save the details for this field.
+ */
+
+ debug("Found match for field %s (%u in reader, %d in writer)",
+ field_name, ri, wi);
+ child_resolvers[wi] = field_resolver;
+ index_mapping[wi] = ri;
+ }
+
+ /*
+ * We might not have found matches for all of the writer fields,
+ * but that's okay — any extras will be ignored.
+ */
+
+ (*resolver)->num_children = wfields;
+ (*resolver)->child_resolvers = child_resolvers;
+ (*resolver)->index_mapping = index_mapping;
+ (*resolver)->parent.record_start = avro_resolver_record_start;
+ (*resolver)->parent.record_field = avro_resolver_record_field;
+ return 0;
+
+error:
+ /*
+ * Clean up any consumer we might have already created.
+ */
+
+ avro_memoize_delete(mem, wschema, root_rschema);
+ avro_consumer_free(&(*resolver)->parent);
+
+ {
+ unsigned int i;
+ for (i = 0; i < wfields; i++) {
+ if (child_resolvers[i]) {
+ avro_consumer_free(child_resolvers[i]);
+ }
+ }
+ }
+
+ avro_free(child_resolvers, wfields * sizeof(avro_consumer_t *));
+ avro_free(index_mapping, wfields * sizeof(int));
+ return EINVAL;
+}
+
+
+/*-----------------------------------------------------------------------
+ * union
+ */
+
+static int
+avro_resolver_union_branch(avro_consumer_t *consumer,
+ unsigned int discriminant,
+ avro_consumer_t **branch_consumer,
+ void **branch_user_data,
+ void *user_data)
+{
+ avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+
+ /*
+ * Grab the resolver for this branch of the writer union. If
+ * it's NULL, then this branch is incompatible with the reader.
+ */
+
+ debug("Retrieving resolver for writer branch %u", discriminant);
+
+ if (!resolver->child_resolvers[discriminant]) {
+ avro_set_error("Writer union branch %u is incompatible "
+ "with reader schema \"%s\"",
+ discriminant, avro_schema_type_name(resolver->rschema));
+ return EINVAL;
+ }
+
+ /*
+ * Return the branch's resolver.
+ */
+
+ *branch_consumer = resolver->child_resolvers[discriminant];
+ *branch_user_data = user_data;
+ return 0;
+}
+
+static avro_consumer_t *
+try_union(avro_memoize_t *mem, avro_schema_t wschema, avro_schema_t rschema)
+{
+ /*
+ * For a writer union, we recursively try to resolve each branch
+ * against the reader schema. This will work correctly whether
+ * or not the reader is also a union — if the reader is a union,
+ * then we'll resolve each (non-union) writer branch against the
+ * reader union, which will be checked in our calls to
+ * check_simple_writer below. The net result is that we might
+ * end up trying every combination of writer and reader
+ * branches, when looking for compatible schemas.
+ *
+ * Regardless of what the reader schema is, for each writer
+ * branch, we stash away the recursive avro_resolver_t into the
+ * child_resolvers array. A NULL entry in this array means that
+ * that branch isn't compatible with the reader. This isn't an
+ * immediate schema resolution error, since we allow
+ * incompatible branches in the types as long as that branch
+ * never appears in the actual data. We only return an error if
+ * there are *no* branches that are compatible.
+ */
+
+ size_t num_branches = avro_schema_union_size(wschema);
+ debug("Checking %" PRIsz "-branch writer union schema", num_branches);
+
+ avro_resolver_t *resolver = avro_resolver_create(wschema, rschema);
+ avro_memoize_set(mem, wschema, rschema, resolver);
+
+ avro_consumer_t **child_resolvers =
+ (avro_consumer_t **) avro_calloc(num_branches, sizeof(avro_consumer_t *));
+ int some_branch_compatible = 0;
+
+ unsigned int i;
+ for (i = 0; i < num_branches; i++) {
+ avro_schema_t branch_schema =
+ avro_schema_union_branch(wschema, i);
+
+ debug("Resolving writer union branch %u (%s)",
+ i, avro_schema_type_name(branch_schema));
+
+ /*
+ * Try to recursively resolve this branch of the writer
+ * union. Don't raise an error if this fails — it's
+ * okay for some of the branches to not be compatible
+ * with the reader, as long as those branches never
+ * appear in the input.
+ */
+
+ child_resolvers[i] =
+ avro_resolver_new_memoized(mem, branch_schema, rschema);
+ if (child_resolvers[i]) {
+ debug("Found match for writer union branch %u", i);
+ some_branch_compatible = 1;
+ } else {
+ debug("No match for writer union branch %u", i);
+ }
+ }
+
+ /*
+ * As long as there's at least one branch that's compatible with
+ * the reader, then we consider this schema resolution a
+ * success.
+ */
+
+ if (!some_branch_compatible) {
+ debug("No writer union branches match");
+ avro_set_error("No branches in the writer are compatible "
+ "with reader schema %s",
+ avro_schema_type_name(rschema));
+ goto error;
+ }
+
+ resolver->num_children = num_branches;
+ resolver->child_resolvers = child_resolvers;
+ resolver->parent.union_branch = avro_resolver_union_branch;
+ return &resolver->parent;
+
+error:
+ /*
+ * Clean up any consumer we might have already created.
+ */
+
+ avro_memoize_delete(mem, wschema, rschema);
+ avro_consumer_free(&resolver->parent);
+
+ for (i = 0; i < num_branches; i++) {
+ if (child_resolvers[i]) {
+ avro_consumer_free(child_resolvers[i]);
+ }
+ }
+
+ avro_free(child_resolvers, num_branches * sizeof(avro_consumer_t *));
+ return NULL;
+}
+
+
+/*-----------------------------------------------------------------------
+ * schema type dispatcher
+ */
+
+static avro_consumer_t *
+avro_resolver_new_memoized(avro_memoize_t *mem,
+ avro_schema_t wschema, avro_schema_t rschema)
+{
+ check_param(NULL, is_avro_schema(wschema), "writer schema");
+ check_param(NULL, is_avro_schema(rschema), "reader schema");
+
+ skip_links(wschema);
+ skip_links(rschema);
+
+ /*
+ * First see if we've already matched these two schemas. If so,
+ * just return that resolver.
+ */
+
+ avro_resolver_t *saved = NULL;
+ if (avro_memoize_get(mem, wschema, rschema, (void **) &saved)) {
+ debug("Already resolved %s and %s",
+ avro_schema_type_name(wschema),
+ avro_schema_type_name(rschema));
+ return &saved->parent;
+ }
+
+ /*
+ * Otherwise we have some work to do.
+ */
+
+ switch (avro_typeof(wschema))
+ {
+ case AVRO_BOOLEAN:
+ check_simple_writer(mem, wschema, rschema, boolean);
+ return NULL;
+
+ case AVRO_BYTES:
+ check_simple_writer(mem, wschema, rschema, bytes);
+ return NULL;
+
+ case AVRO_DOUBLE:
+ check_simple_writer(mem, wschema, rschema, double);
+ return NULL;
+
+ case AVRO_FLOAT:
+ check_simple_writer(mem, wschema, rschema, float);
+ return NULL;
+
+ case AVRO_INT32:
+ check_simple_writer(mem, wschema, rschema, int);
+ return NULL;
+
+ case AVRO_INT64:
+ check_simple_writer(mem, wschema, rschema, long);
+ return NULL;
+
+ case AVRO_NULL:
+ check_simple_writer(mem, wschema, rschema, null);
+ return NULL;
+
+ case AVRO_STRING:
+ check_simple_writer(mem, wschema, rschema, string);
+ return NULL;
+
+ case AVRO_ARRAY:
+ check_simple_writer(mem, wschema, rschema, array);
+ return NULL;
+
+ case AVRO_ENUM:
+ check_simple_writer(mem, wschema, rschema, enum);
+ return NULL;
+
+ case AVRO_FIXED:
+ check_simple_writer(mem, wschema, rschema, fixed);
+ return NULL;
+
+ case AVRO_MAP:
+ check_simple_writer(mem, wschema, rschema, map);
+ return NULL;
+
+ case AVRO_RECORD:
+ check_simple_writer(mem, wschema, rschema, record);
+ return NULL;
+
+ case AVRO_UNION:
+ return try_union(mem, wschema, rschema);
+
+ default:
+ avro_set_error("Unknown schema type");
+ return NULL;
+ }
+
+ return NULL;
+}
+
+
+avro_consumer_t *
+avro_resolver_new(avro_schema_t wschema, avro_schema_t rschema)
+{
+ avro_memoize_t mem;
+ avro_memoize_init(&mem);
+ avro_consumer_t *result =
+ avro_resolver_new_memoized(&mem, wschema, rschema);
+ avro_memoize_done(&mem);
+ return result;
+}