diff options
Diffstat (limited to 'fluent-bit/lib/avro/src/datum.c')
-rw-r--r-- | fluent-bit/lib/avro/src/datum.c | 1255 |
1 files changed, 1255 insertions, 0 deletions
diff --git a/fluent-bit/lib/avro/src/datum.c b/fluent-bit/lib/avro/src/datum.c new file mode 100644 index 00000000..2c427809 --- /dev/null +++ b/fluent-bit/lib/avro/src/datum.c @@ -0,0 +1,1255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include "avro/allocation.h" +#include "avro/basics.h" +#include "avro/errors.h" +#include "avro/legacy.h" +#include "avro/refcount.h" +#include "avro/schema.h" +#include "avro_private.h" +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include "datum.h" +#include "schema.h" +#include "encoding.h" + +#define DEFAULT_TABLE_SIZE 32 + +static void avro_datum_init(avro_datum_t datum, avro_type_t type) +{ + datum->type = type; + datum->class_type = AVRO_DATUM; + avro_refcount_set(&datum->refcount, 1); +} + +static void +avro_str_free_wrapper(void *ptr, size_t sz) +{ + // don't need sz, since the size is stored in the string buffer + AVRO_UNUSED(sz); + avro_str_free((char *)ptr); +} + +static avro_datum_t avro_string_private(char *str, int64_t size, + avro_free_func_t string_free) +{ + struct avro_string_datum_t *datum = + (struct avro_string_datum_t *) avro_new(struct avro_string_datum_t); + if (!datum) { + avro_set_error("Cannot create new string datum"); + return NULL; + } + datum->s = str; + datum->size = size; + datum->free = string_free; + + avro_datum_init(&datum->obj, AVRO_STRING); + return &datum->obj; +} + +avro_datum_t avro_string(const char *str) +{ + char *p = avro_strdup(str); + if (!p) { + avro_set_error("Cannot copy string content"); + return NULL; + } + avro_datum_t s_datum = avro_string_private(p, 0, avro_str_free_wrapper); + if (!s_datum) { + avro_str_free(p); + } + + return s_datum; +} + +avro_datum_t avro_givestring(const char *str, + avro_free_func_t free) +{ + int64_t sz = strlen(str)+1; + return avro_string_private((char *)str, sz, free); +} + +int avro_string_get(avro_datum_t datum, char **p) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_string(datum), "string datum"); + check_param(EINVAL, p, "string buffer"); + + *p = avro_datum_to_string(datum)->s; + return 0; +} + +static int avro_string_set_private(avro_datum_t datum, + const char *p, int64_t size, + avro_free_func_t string_free) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_string(datum), "string datum"); + check_param(EINVAL, p, "string content"); + + struct avro_string_datum_t *string = avro_datum_to_string(datum); + if (string->free) { + string->free(string->s, string->size); + } + string->free = string_free; + string->s = (char *)p; + string->size = size; + return 0; +} + +int avro_string_set(avro_datum_t datum, const char *p) +{ + char *string_copy = avro_strdup(p); + int rval; + if (!string_copy) { + avro_set_error("Cannot copy string content"); + return ENOMEM; + } + rval = avro_string_set_private(datum, string_copy, 0, + avro_str_free_wrapper); + if (rval) { + avro_str_free(string_copy); + } + return rval; +} + +int avro_givestring_set(avro_datum_t datum, const char *p, + avro_free_func_t free) +{ + int64_t size = strlen(p)+1; + return avro_string_set_private(datum, p, size, free); +} + +static avro_datum_t avro_bytes_private(char *bytes, int64_t size, + avro_free_func_t bytes_free) +{ + struct avro_bytes_datum_t *datum; + datum = (struct avro_bytes_datum_t *) avro_new(struct avro_bytes_datum_t); + if (!datum) { + avro_set_error("Cannot create new bytes datum"); + return NULL; + } + datum->bytes = bytes; + datum->size = size; + datum->free = bytes_free; + + avro_datum_init(&datum->obj, AVRO_BYTES); + return &datum->obj; +} + +avro_datum_t avro_bytes(const char *bytes, int64_t size) +{ + char *bytes_copy = (char *) avro_malloc(size); + if (!bytes_copy) { + avro_set_error("Cannot copy bytes content"); + return NULL; + } + memcpy(bytes_copy, bytes, size); + avro_datum_t result = + avro_bytes_private(bytes_copy, size, avro_alloc_free_func); + if (result == NULL) { + avro_free(bytes_copy, size); + } + return result; +} + +avro_datum_t avro_givebytes(const char *bytes, int64_t size, + avro_free_func_t free) +{ + return avro_bytes_private((char *)bytes, size, free); +} + +static int avro_bytes_set_private(avro_datum_t datum, const char *bytes, + const int64_t size, + avro_free_func_t bytes_free) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_bytes(datum), "bytes datum"); + + struct avro_bytes_datum_t *b = avro_datum_to_bytes(datum); + if (b->free) { + b->free(b->bytes, b->size); + } + + b->free = bytes_free; + b->bytes = (char *)bytes; + b->size = size; + return 0; +} + +int avro_bytes_set(avro_datum_t datum, const char *bytes, const int64_t size) +{ + int rval; + char *bytes_copy = (char *) avro_malloc(size); + if (!bytes_copy) { + avro_set_error("Cannot copy bytes content"); + return ENOMEM; + } + memcpy(bytes_copy, bytes, size); + rval = avro_bytes_set_private(datum, bytes_copy, size, avro_alloc_free_func); + if (rval) { + avro_free(bytes_copy, size); + } + return rval; +} + +int avro_givebytes_set(avro_datum_t datum, const char *bytes, + const int64_t size, avro_free_func_t free) +{ + return avro_bytes_set_private(datum, bytes, size, free); +} + +int avro_bytes_get(avro_datum_t datum, char **bytes, int64_t * size) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_bytes(datum), "bytes datum"); + check_param(EINVAL, bytes, "bytes"); + check_param(EINVAL, size, "size"); + + *bytes = avro_datum_to_bytes(datum)->bytes; + *size = avro_datum_to_bytes(datum)->size; + return 0; +} + +avro_datum_t avro_int32(int32_t i) +{ + struct avro_int32_datum_t *datum = + (struct avro_int32_datum_t *) avro_new(struct avro_int32_datum_t); + if (!datum) { + avro_set_error("Cannot create new int datum"); + return NULL; + } + datum->i32 = i; + + avro_datum_init(&datum->obj, AVRO_INT32); + return &datum->obj; +} + +int avro_int32_get(avro_datum_t datum, int32_t * i) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_int32(datum), "int datum"); + check_param(EINVAL, i, "value pointer"); + + *i = avro_datum_to_int32(datum)->i32; + return 0; +} + +int avro_int32_set(avro_datum_t datum, const int32_t i) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_int32(datum), "int datum"); + + avro_datum_to_int32(datum)->i32 = i; + return 0; +} + +avro_datum_t avro_int64(int64_t l) +{ + struct avro_int64_datum_t *datum = + (struct avro_int64_datum_t *) avro_new(struct avro_int64_datum_t); + if (!datum) { + avro_set_error("Cannot create new long datum"); + return NULL; + } + datum->i64 = l; + + avro_datum_init(&datum->obj, AVRO_INT64); + return &datum->obj; +} + +int avro_int64_get(avro_datum_t datum, int64_t * l) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_int64(datum), "long datum"); + check_param(EINVAL, l, "value pointer"); + + *l = avro_datum_to_int64(datum)->i64; + return 0; +} + +int avro_int64_set(avro_datum_t datum, const int64_t l) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_int64(datum), "long datum"); + + avro_datum_to_int64(datum)->i64 = l; + return 0; +} + +avro_datum_t avro_float(float f) +{ + struct avro_float_datum_t *datum = + (struct avro_float_datum_t *) avro_new(struct avro_float_datum_t); + if (!datum) { + avro_set_error("Cannot create new float datum"); + return NULL; + } + datum->f = f; + + avro_datum_init(&datum->obj, AVRO_FLOAT); + return &datum->obj; +} + +int avro_float_set(avro_datum_t datum, const float f) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_float(datum), "float datum"); + + avro_datum_to_float(datum)->f = f; + return 0; +} + +int avro_float_get(avro_datum_t datum, float *f) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_float(datum), "float datum"); + check_param(EINVAL, f, "value pointer"); + + *f = avro_datum_to_float(datum)->f; + return 0; +} + +avro_datum_t avro_double(double d) +{ + struct avro_double_datum_t *datum = + (struct avro_double_datum_t *) avro_new(struct avro_double_datum_t); + if (!datum) { + avro_set_error("Cannot create new double atom"); + return NULL; + } + datum->d = d; + + avro_datum_init(&datum->obj, AVRO_DOUBLE); + return &datum->obj; +} + +int avro_double_set(avro_datum_t datum, const double d) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_double(datum), "double datum"); + + avro_datum_to_double(datum)->d = d; + return 0; +} + +int avro_double_get(avro_datum_t datum, double *d) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_double(datum), "double datum"); + check_param(EINVAL, d, "value pointer"); + + *d = avro_datum_to_double(datum)->d; + return 0; +} + +avro_datum_t avro_boolean(int8_t i) +{ + struct avro_boolean_datum_t *datum = + (struct avro_boolean_datum_t *) avro_new(struct avro_boolean_datum_t); + if (!datum) { + avro_set_error("Cannot create new boolean datum"); + return NULL; + } + datum->i = i; + avro_datum_init(&datum->obj, AVRO_BOOLEAN); + return &datum->obj; +} + +int avro_boolean_set(avro_datum_t datum, const int8_t i) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_boolean(datum), "boolean datum"); + + avro_datum_to_boolean(datum)->i = i; + return 0; +} + +int avro_boolean_get(avro_datum_t datum, int8_t * i) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_boolean(datum), "boolean datum"); + check_param(EINVAL, i, "value pointer"); + + *i = avro_datum_to_boolean(datum)->i; + return 0; +} + +avro_datum_t avro_null(void) +{ + static struct avro_obj_t obj = { + AVRO_NULL, + AVRO_DATUM, + 1 + }; + return avro_datum_incref(&obj); +} + +avro_datum_t avro_union(avro_schema_t schema, + int64_t discriminant, avro_datum_t value) +{ + check_param(NULL, is_avro_schema(schema), "schema"); + + struct avro_union_datum_t *datum = + (struct avro_union_datum_t *) avro_new(struct avro_union_datum_t); + if (!datum) { + avro_set_error("Cannot create new union datum"); + return NULL; + } + datum->schema = avro_schema_incref(schema); + datum->discriminant = discriminant; + datum->value = avro_datum_incref(value); + + avro_datum_init(&datum->obj, AVRO_UNION); + return &datum->obj; +} + +int64_t avro_union_discriminant(const avro_datum_t datum) +{ + return avro_datum_to_union(datum)->discriminant; +} + +avro_datum_t avro_union_current_branch(avro_datum_t datum) +{ + return avro_datum_to_union(datum)->value; +} + +int avro_union_set_discriminant(avro_datum_t datum, + int discriminant, + avro_datum_t *branch) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_union(datum), "union datum"); + + struct avro_union_datum_t *unionp = avro_datum_to_union(datum); + + avro_schema_t schema = unionp->schema; + avro_schema_t branch_schema = + avro_schema_union_branch(schema, discriminant); + + if (branch_schema == NULL) { + // That branch doesn't exist! + avro_set_error("Branch %d doesn't exist", discriminant); + return EINVAL; + } + + if (unionp->discriminant != discriminant) { + // If we're changing the branch, throw away any old + // branch value. + if (unionp->value != NULL) { + avro_datum_decref(unionp->value); + unionp->value = NULL; + } + + unionp->discriminant = discriminant; + } + + // Create a new branch value, if there isn't one already. + if (unionp->value == NULL) { + unionp->value = avro_datum_from_schema(branch_schema); + } + + if (branch != NULL) { + *branch = unionp->value; + } + + return 0; +} + +avro_datum_t avro_record(avro_schema_t schema) +{ + check_param(NULL, is_avro_schema(schema), "schema"); + + struct avro_record_datum_t *datum = + (struct avro_record_datum_t *) avro_new(struct avro_record_datum_t); + if (!datum) { + avro_set_error("Cannot create new record datum"); + return NULL; + } + datum->field_order = st_init_numtable_with_size(DEFAULT_TABLE_SIZE); + if (!datum->field_order) { + avro_set_error("Cannot create new record datum"); + avro_freet(struct avro_record_datum_t, datum); + return NULL; + } + datum->fields_byname = st_init_strtable_with_size(DEFAULT_TABLE_SIZE); + if (!datum->fields_byname) { + avro_set_error("Cannot create new record datum"); + st_free_table(datum->field_order); + avro_freet(struct avro_record_datum_t, datum); + return NULL; + } + + datum->schema = avro_schema_incref(schema); + avro_datum_init(&datum->obj, AVRO_RECORD); + return &datum->obj; +} + +int +avro_record_get(const avro_datum_t datum, const char *field_name, + avro_datum_t * field) +{ + union { + avro_datum_t field; + st_data_t data; + } val; + if (is_avro_datum(datum) && is_avro_record(datum) && field_name) { + if (st_lookup + (avro_datum_to_record(datum)->fields_byname, + (st_data_t) field_name, &(val.data))) { + *field = val.field; + return 0; + } + } + avro_set_error("No field named %s", field_name); + return EINVAL; +} + +int +avro_record_set(avro_datum_t datum, const char *field_name, + const avro_datum_t field_value) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_record(datum), "record datum"); + check_param(EINVAL, field_name, "field_name"); + + char *key = (char *)field_name; + avro_datum_t old_field; + + if (avro_record_get(datum, field_name, &old_field) == 0) { + /* Overriding old value */ + avro_datum_decref(old_field); + } else { + /* Inserting new value */ + struct avro_record_datum_t *record = + avro_datum_to_record(datum); + key = avro_strdup(field_name); + if (!key) { + avro_set_error("Cannot copy field name"); + return ENOMEM; + } + st_insert(record->field_order, + record->field_order->num_entries, + (st_data_t) key); + } + avro_datum_incref(field_value); + st_insert(avro_datum_to_record(datum)->fields_byname, + (st_data_t) key, (st_data_t) field_value); + return 0; +} + +avro_datum_t avro_enum(avro_schema_t schema, int i) +{ + check_param(NULL, is_avro_schema(schema), "schema"); + + struct avro_enum_datum_t *datum = + (struct avro_enum_datum_t *) avro_new(struct avro_enum_datum_t); + if (!datum) { + avro_set_error("Cannot create new enum datum"); + return NULL; + } + datum->schema = avro_schema_incref(schema); + datum->value = i; + + avro_datum_init(&datum->obj, AVRO_ENUM); + return &datum->obj; +} + +int avro_enum_get(const avro_datum_t datum) +{ + return avro_datum_to_enum(datum)->value; +} + +const char *avro_enum_get_name(const avro_datum_t datum) +{ + int value = avro_enum_get(datum); + avro_schema_t schema = avro_datum_to_enum(datum)->schema; + return avro_schema_enum_get(schema, value); +} + +int avro_enum_set(avro_datum_t datum, const int symbol_value) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_enum(datum), "enum datum"); + + avro_datum_to_enum(datum)->value = symbol_value; + return 0; +} + +int avro_enum_set_name(avro_datum_t datum, const char *symbol_name) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_enum(datum), "enum datum"); + check_param(EINVAL, symbol_name, "symbol name"); + + avro_schema_t schema = avro_datum_to_enum(datum)->schema; + int symbol_value = avro_schema_enum_get_by_name(schema, symbol_name); + if (symbol_value == -1) { + avro_set_error("No symbol named %s", symbol_name); + return EINVAL; + } + avro_datum_to_enum(datum)->value = symbol_value; + return 0; +} + +static avro_datum_t avro_fixed_private(avro_schema_t schema, + const char *bytes, const int64_t size, + avro_free_func_t fixed_free) +{ + check_param(NULL, is_avro_schema(schema), "schema"); + struct avro_fixed_schema_t *fschema = avro_schema_to_fixed(schema); + if (size != fschema->size) { + avro_free((char *) bytes, size); + avro_set_error("Fixed size (%zu) doesn't match schema (%zu)", + (size_t) size, (size_t) fschema->size); + return NULL; + } + + struct avro_fixed_datum_t *datum = + (struct avro_fixed_datum_t *) avro_new(struct avro_fixed_datum_t); + if (!datum) { + avro_free((char *) bytes, size); + avro_set_error("Cannot create new fixed datum"); + return NULL; + } + datum->schema = avro_schema_incref(schema); + datum->size = size; + datum->bytes = (char *)bytes; + datum->free = fixed_free; + + avro_datum_init(&datum->obj, AVRO_FIXED); + return &datum->obj; +} + +avro_datum_t avro_fixed(avro_schema_t schema, + const char *bytes, const int64_t size) +{ + char *bytes_copy = (char *) avro_malloc(size); + if (!bytes_copy) { + avro_set_error("Cannot copy fixed content"); + return NULL; + } + memcpy(bytes_copy, bytes, size); + return avro_fixed_private(schema, bytes_copy, size, avro_alloc_free_func); +} + +avro_datum_t avro_givefixed(avro_schema_t schema, + const char *bytes, const int64_t size, + avro_free_func_t free) +{ + return avro_fixed_private(schema, bytes, size, free); +} + +static int avro_fixed_set_private(avro_datum_t datum, + const char *bytes, const int64_t size, + avro_free_func_t fixed_free) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_fixed(datum), "fixed datum"); + + struct avro_fixed_datum_t *fixed = avro_datum_to_fixed(datum); + struct avro_fixed_schema_t *schema = avro_schema_to_fixed(fixed->schema); + if (size != schema->size) { + avro_set_error("Fixed size doesn't match schema"); + return EINVAL; + } + + if (fixed->free) { + fixed->free(fixed->bytes, fixed->size); + } + + fixed->free = fixed_free; + fixed->bytes = (char *)bytes; + fixed->size = size; + return 0; +} + +int avro_fixed_set(avro_datum_t datum, const char *bytes, const int64_t size) +{ + int rval; + char *bytes_copy = (char *) avro_malloc(size); + if (!bytes_copy) { + avro_set_error("Cannot copy fixed content"); + return ENOMEM; + } + memcpy(bytes_copy, bytes, size); + rval = avro_fixed_set_private(datum, bytes_copy, size, avro_alloc_free_func); + if (rval) { + avro_free(bytes_copy, size); + } + return rval; +} + +int avro_givefixed_set(avro_datum_t datum, const char *bytes, + const int64_t size, avro_free_func_t free) +{ + return avro_fixed_set_private(datum, bytes, size, free); +} + +int avro_fixed_get(avro_datum_t datum, char **bytes, int64_t * size) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_fixed(datum), "fixed datum"); + check_param(EINVAL, bytes, "bytes"); + check_param(EINVAL, size, "size"); + + *bytes = avro_datum_to_fixed(datum)->bytes; + *size = avro_datum_to_fixed(datum)->size; + return 0; +} + +static int +avro_init_map(struct avro_map_datum_t *datum) +{ + datum->map = st_init_strtable_with_size(DEFAULT_TABLE_SIZE); + if (!datum->map) { + avro_set_error("Cannot create new map datum"); + return ENOMEM; + } + datum->indices_by_key = st_init_strtable_with_size(DEFAULT_TABLE_SIZE); + if (!datum->indices_by_key) { + avro_set_error("Cannot create new map datum"); + st_free_table(datum->map); + return ENOMEM; + } + datum->keys_by_index = st_init_numtable_with_size(DEFAULT_TABLE_SIZE); + if (!datum->keys_by_index) { + avro_set_error("Cannot create new map datum"); + st_free_table(datum->indices_by_key); + st_free_table(datum->map); + return ENOMEM; + } + return 0; +} + +avro_datum_t avro_map(avro_schema_t schema) +{ + check_param(NULL, is_avro_schema(schema), "schema"); + + struct avro_map_datum_t *datum = + (struct avro_map_datum_t *) avro_new(struct avro_map_datum_t); + if (!datum) { + avro_set_error("Cannot create new map datum"); + return NULL; + } + + if (avro_init_map(datum) != 0) { + avro_freet(struct avro_map_datum_t, datum); + return NULL; + } + + datum->schema = avro_schema_incref(schema); + avro_datum_init(&datum->obj, AVRO_MAP); + return &datum->obj; +} + +size_t +avro_map_size(const avro_datum_t datum) +{ + const struct avro_map_datum_t *map = avro_datum_to_map(datum); + return map->map->num_entries; +} + +int +avro_map_get(const avro_datum_t datum, const char *key, avro_datum_t * value) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_map(datum), "map datum"); + check_param(EINVAL, key, "key"); + check_param(EINVAL, value, "value"); + + union { + avro_datum_t datum; + st_data_t data; + } val; + + struct avro_map_datum_t *map = avro_datum_to_map(datum); + if (st_lookup(map->map, (st_data_t) key, &(val.data))) { + *value = val.datum; + return 0; + } + + avro_set_error("No map element named %s", key); + return EINVAL; +} + +int avro_map_get_key(const avro_datum_t datum, int index, + const char **key) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_map(datum), "map datum"); + check_param(EINVAL, index >= 0, "index"); + check_param(EINVAL, key, "key"); + + union { + st_data_t data; + char *key; + } val; + + struct avro_map_datum_t *map = avro_datum_to_map(datum); + if (st_lookup(map->keys_by_index, (st_data_t) index, &val.data)) { + *key = val.key; + return 0; + } + + avro_set_error("No map element with index %d", index); + return EINVAL; +} + +int avro_map_get_index(const avro_datum_t datum, const char *key, + int *index) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_map(datum), "map datum"); + check_param(EINVAL, key, "key"); + check_param(EINVAL, index, "index"); + + st_data_t data; + + struct avro_map_datum_t *map = avro_datum_to_map(datum); + if (st_lookup(map->indices_by_key, (st_data_t) key, &data)) { + *index = (int) data; + return 0; + } + + avro_set_error("No map element with key %s", key); + return EINVAL; +} + +int +avro_map_set(avro_datum_t datum, const char *key, + const avro_datum_t value) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + check_param(EINVAL, is_avro_map(datum), "map datum"); + check_param(EINVAL, key, "key"); + check_param(EINVAL, is_avro_datum(value), "value"); + + char *save_key = (char *)key; + avro_datum_t old_datum; + + struct avro_map_datum_t *map = avro_datum_to_map(datum); + + if (avro_map_get(datum, key, &old_datum) == 0) { + /* Overwriting an old value */ + avro_datum_decref(old_datum); + } else { + /* Inserting a new value */ + save_key = avro_strdup(key); + if (!save_key) { + avro_set_error("Cannot copy map key"); + return ENOMEM; + } + int new_index = map->map->num_entries; + st_insert(map->indices_by_key, (st_data_t) save_key, + (st_data_t) new_index); + st_insert(map->keys_by_index, (st_data_t) new_index, + (st_data_t) save_key); + } + avro_datum_incref(value); + st_insert(map->map, (st_data_t) save_key, (st_data_t) value); + return 0; +} + +static int +avro_init_array(struct avro_array_datum_t *datum) +{ + datum->els = st_init_numtable_with_size(DEFAULT_TABLE_SIZE); + if (!datum->els) { + avro_set_error("Cannot create new array datum"); + return ENOMEM; + } + return 0; +} + +avro_datum_t avro_array(avro_schema_t schema) +{ + check_param(NULL, is_avro_schema(schema), "schema"); + + struct avro_array_datum_t *datum = + (struct avro_array_datum_t *) avro_new(struct avro_array_datum_t); + if (!datum) { + avro_set_error("Cannot create new array datum"); + return NULL; + } + + if (avro_init_array(datum) != 0) { + avro_freet(struct avro_array_datum_t, datum); + return NULL; + } + + datum->schema = avro_schema_incref(schema); + avro_datum_init(&datum->obj, AVRO_ARRAY); + return &datum->obj; +} + +int +avro_array_get(const avro_datum_t array_datum, int64_t index, avro_datum_t * value) +{ + check_param(EINVAL, is_avro_datum(array_datum), "datum"); + check_param(EINVAL, is_avro_array(array_datum), "array datum"); + check_param(EINVAL, value, "value pointer"); + + union { + st_data_t data; + avro_datum_t datum; + } val; + + const struct avro_array_datum_t * array = avro_datum_to_array(array_datum); + if (st_lookup(array->els, index, &val.data)) { + *value = val.datum; + return 0; + } + + avro_set_error("No array element with index %ld", (long) index); + return EINVAL; +} + +size_t +avro_array_size(const avro_datum_t datum) +{ + const struct avro_array_datum_t *array = avro_datum_to_array(datum); + return array->els->num_entries; +} + +int +avro_array_append_datum(avro_datum_t array_datum, + const avro_datum_t datum) +{ + check_param(EINVAL, is_avro_datum(array_datum), "datum"); + check_param(EINVAL, is_avro_array(array_datum), "array datum"); + check_param(EINVAL, is_avro_datum(datum), "element datum"); + + struct avro_array_datum_t *array = avro_datum_to_array(array_datum); + st_insert(array->els, array->els->num_entries, + (st_data_t) avro_datum_incref(datum)); + return 0; +} + +static int char_datum_free_foreach(char *key, avro_datum_t datum, void *arg) +{ + AVRO_UNUSED(arg); + + avro_datum_decref(datum); + avro_str_free(key); + return ST_DELETE; +} + +static int array_free_foreach(int i, avro_datum_t datum, void *arg) +{ + AVRO_UNUSED(i); + AVRO_UNUSED(arg); + + avro_datum_decref(datum); + return ST_DELETE; +} + +avro_schema_t avro_datum_get_schema(const avro_datum_t datum) +{ + check_param(NULL, is_avro_datum(datum), "datum"); + + switch (avro_typeof(datum)) { + /* + * For the primitive types, which don't store an + * explicit reference to their schema, we decref the + * schema before returning. This maintains the + * invariant that this function doesn't add any + * additional references to the schema. The primitive + * schemas won't be freed, because there's always at + * least 1 reference for their initial static + * initializers. + */ + + case AVRO_STRING: + { + avro_schema_t result = avro_schema_string(); + avro_schema_decref(result); + return result; + } + case AVRO_BYTES: + { + avro_schema_t result = avro_schema_bytes(); + avro_schema_decref(result); + return result; + } + case AVRO_INT32: + { + avro_schema_t result = avro_schema_int(); + avro_schema_decref(result); + return result; + } + case AVRO_INT64: + { + avro_schema_t result = avro_schema_long(); + avro_schema_decref(result); + return result; + } + case AVRO_FLOAT: + { + avro_schema_t result = avro_schema_float(); + avro_schema_decref(result); + return result; + } + case AVRO_DOUBLE: + { + avro_schema_t result = avro_schema_double(); + avro_schema_decref(result); + return result; + } + case AVRO_BOOLEAN: + { + avro_schema_t result = avro_schema_boolean(); + avro_schema_decref(result); + return result; + } + case AVRO_NULL: + { + avro_schema_t result = avro_schema_null(); + avro_schema_decref(result); + return result; + } + + case AVRO_RECORD: + return avro_datum_to_record(datum)->schema; + case AVRO_ENUM: + return avro_datum_to_enum(datum)->schema; + case AVRO_FIXED: + return avro_datum_to_fixed(datum)->schema; + case AVRO_MAP: + return avro_datum_to_map(datum)->schema; + case AVRO_ARRAY: + return avro_datum_to_array(datum)->schema; + case AVRO_UNION: + return avro_datum_to_union(datum)->schema; + + default: + return NULL; + } +} + +static void avro_datum_free(avro_datum_t datum) +{ + if (is_avro_datum(datum)) { + switch (avro_typeof(datum)) { + case AVRO_STRING:{ + struct avro_string_datum_t *string; + string = avro_datum_to_string(datum); + if (string->free) { + string->free(string->s, string->size); + } + avro_freet(struct avro_string_datum_t, string); + } + break; + case AVRO_BYTES:{ + struct avro_bytes_datum_t *bytes; + bytes = avro_datum_to_bytes(datum); + if (bytes->free) { + bytes->free(bytes->bytes, bytes->size); + } + avro_freet(struct avro_bytes_datum_t, bytes); + } + break; + case AVRO_INT32:{ + avro_freet(struct avro_int32_datum_t, datum); + } + break; + case AVRO_INT64:{ + avro_freet(struct avro_int64_datum_t, datum); + } + break; + case AVRO_FLOAT:{ + avro_freet(struct avro_float_datum_t, datum); + } + break; + case AVRO_DOUBLE:{ + avro_freet(struct avro_double_datum_t, datum); + } + break; + case AVRO_BOOLEAN:{ + avro_freet(struct avro_boolean_datum_t, datum); + } + break; + case AVRO_NULL: + /* Nothing allocated */ + break; + + case AVRO_RECORD:{ + struct avro_record_datum_t *record; + record = avro_datum_to_record(datum); + avro_schema_decref(record->schema); + st_foreach(record->fields_byname, + HASH_FUNCTION_CAST char_datum_free_foreach, 0); + st_free_table(record->field_order); + st_free_table(record->fields_byname); + avro_freet(struct avro_record_datum_t, record); + } + break; + case AVRO_ENUM:{ + struct avro_enum_datum_t *enump; + enump = avro_datum_to_enum(datum); + avro_schema_decref(enump->schema); + avro_freet(struct avro_enum_datum_t, enump); + } + break; + case AVRO_FIXED:{ + struct avro_fixed_datum_t *fixed; + fixed = avro_datum_to_fixed(datum); + avro_schema_decref(fixed->schema); + if (fixed->free) { + fixed->free((void *)fixed->bytes, + fixed->size); + } + avro_freet(struct avro_fixed_datum_t, fixed); + } + break; + case AVRO_MAP:{ + struct avro_map_datum_t *map; + map = avro_datum_to_map(datum); + avro_schema_decref(map->schema); + st_foreach(map->map, HASH_FUNCTION_CAST char_datum_free_foreach, + 0); + st_free_table(map->map); + st_free_table(map->indices_by_key); + st_free_table(map->keys_by_index); + avro_freet(struct avro_map_datum_t, map); + } + break; + case AVRO_ARRAY:{ + struct avro_array_datum_t *array; + array = avro_datum_to_array(datum); + avro_schema_decref(array->schema); + st_foreach(array->els, HASH_FUNCTION_CAST array_free_foreach, 0); + st_free_table(array->els); + avro_freet(struct avro_array_datum_t, array); + } + break; + case AVRO_UNION:{ + struct avro_union_datum_t *unionp; + unionp = avro_datum_to_union(datum); + avro_schema_decref(unionp->schema); + avro_datum_decref(unionp->value); + avro_freet(struct avro_union_datum_t, unionp); + } + break; + case AVRO_LINK:{ + /* TODO */ + } + break; + } + } +} + +static int +datum_reset_foreach(int i, avro_datum_t datum, void *arg) +{ + AVRO_UNUSED(i); + int rval; + int *result = (int *) arg; + + rval = avro_datum_reset(datum); + if (rval == 0) { + return ST_CONTINUE; + } else { + *result = rval; + return ST_STOP; + } +} + +int +avro_datum_reset(avro_datum_t datum) +{ + check_param(EINVAL, is_avro_datum(datum), "datum"); + int rval; + + switch (avro_typeof(datum)) { + case AVRO_ARRAY: + { + struct avro_array_datum_t *array; + array = avro_datum_to_array(datum); + st_foreach(array->els, HASH_FUNCTION_CAST array_free_foreach, 0); + st_free_table(array->els); + + rval = avro_init_array(array); + if (rval != 0) { + avro_freet(struct avro_array_datum_t, array); + return rval; + } + return 0; + } + + case AVRO_MAP: + { + struct avro_map_datum_t *map; + map = avro_datum_to_map(datum); + st_foreach(map->map, HASH_FUNCTION_CAST char_datum_free_foreach, 0); + st_free_table(map->map); + st_free_table(map->indices_by_key); + st_free_table(map->keys_by_index); + + rval = avro_init_map(map); + if (rval != 0) { + avro_freet(struct avro_map_datum_t, map); + return rval; + } + return 0; + } + + case AVRO_RECORD: + { + struct avro_record_datum_t *record; + record = avro_datum_to_record(datum); + rval = 0; + st_foreach(record->fields_byname, + HASH_FUNCTION_CAST datum_reset_foreach, (st_data_t) &rval); + return rval; + } + + case AVRO_UNION: + { + struct avro_union_datum_t *unionp; + unionp = avro_datum_to_union(datum); + return (unionp->value == NULL)? 0: + avro_datum_reset(unionp->value); + } + + default: + return 0; + } +} + +avro_datum_t avro_datum_incref(avro_datum_t datum) +{ + if (datum) { + avro_refcount_inc(&datum->refcount); + } + return datum; +} + +void avro_datum_decref(avro_datum_t datum) +{ + if (datum && avro_refcount_dec(&datum->refcount)) { + avro_datum_free(datum); + } +} + +void avro_datum_print(avro_datum_t value, FILE * fp) +{ + AVRO_UNUSED(value); + AVRO_UNUSED(fp); +} |