diff options
Diffstat (limited to 'src/fluent-bit/plugins/processor_attributes')
3 files changed, 2038 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/processor_attributes/CMakeLists.txt b/src/fluent-bit/plugins/processor_attributes/CMakeLists.txt new file mode 100644 index 000000000..db01390ed --- /dev/null +++ b/src/fluent-bit/plugins/processor_attributes/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + attributes.c) + +FLB_PLUGIN(processor_attributes "${src}" "") diff --git a/src/fluent-bit/plugins/processor_attributes/attributes.c b/src/fluent-bit/plugins/processor_attributes/attributes.c new file mode 100644 index 000000000..a59c07ae2 --- /dev/null +++ b/src/fluent-bit/plugins/processor_attributes/attributes.c @@ -0,0 +1,1408 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <math.h> + +#include <fluent-bit/flb_regex.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_processor_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_hash.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_processor.h> +#include <fluent-bit/flb_log_event_decoder.h> +#include <fluent-bit/flb_log_event_encoder.h> + +#include <cmetrics/cmetrics.h> +#include <cmetrics/cmt_histogram.h> +#include <cmetrics/cmt_summary.h> +#include <cmetrics/cmt_untyped.h> +#include <cmetrics/cmt_counter.h> +#include <cmetrics/cmt_gauge.h> +#include <cmetrics/cmt_map.h> + +#include <cfl/cfl.h> + +#include "variant_utils.h" + +typedef int (*attribute_transformer)(void *, struct cfl_variant *value); + +struct internal_processor_context { + struct mk_list *update_list; + struct mk_list *insert_list; + struct mk_list *upsert_list; + struct mk_list *convert_list; + struct mk_list *extract_list; + struct mk_list *delete_list; + struct mk_list *hash_list; + + /* internal attributes ready to append */ + struct cfl_list update_attributes; + struct cfl_list insert_attributes; + struct cfl_list upsert_attributes; + struct cfl_list convert_attributes; + struct cfl_list extract_attributes; + struct mk_list delete_attributes; + struct mk_list hash_attributes; + + struct flb_processor_instance *instance; + struct flb_config *config; +}; + +/* + * LOCAL + */ +static int hex_encode(unsigned char *input_buffer, + size_t input_length, + cfl_sds_t *output_buffer) +{ + const char hex[] = "0123456789abcdef"; + cfl_sds_t result; + size_t index; + + if (cfl_sds_alloc(*output_buffer) <= (input_length * 2)) { + result = cfl_sds_increase(*output_buffer, + (input_length * 2) - + cfl_sds_alloc(*output_buffer)); + + if (result == NULL) { + return FLB_FALSE; + } + + *output_buffer = result; + } + + for (index = 0; index < input_length; index++) { + (*output_buffer)[index * 2 + 0] = hex[(input_buffer[index] >> 4) & 0xF]; + (*output_buffer)[index * 2 + 1] = hex[(input_buffer[index] >> 0) & 0xF]; + } + + cfl_sds_set_len(*output_buffer, input_length * 2); + + (*output_buffer)[index * 2] = '\0'; + + return FLB_TRUE; +} + +static int process_attribute_modification_list_setting( + struct flb_processor_instance *plugin_instance, + const char *setting_name, + struct mk_list *source_list, + struct mk_list *destination_list) +{ + struct flb_config_map_val *source_entry; + struct mk_list *iterator; + int result; + + if (source_list == NULL || + mk_list_is_empty(source_list) == 0) { + + return 0; + } + + flb_config_map_foreach(iterator, source_entry, source_list) { + result = flb_slist_add(destination_list, source_entry->val.str); + + if (result != 0) { + flb_plg_error(plugin_instance, + "could not append attribute name %s\n", + source_entry->val.str); + + return -1; + } + } + + return 0; +} + +static int process_attribute_modification_kvlist_setting( + struct flb_processor_instance *plugin_instance, + const char *setting_name, + struct mk_list *source_list, + struct cfl_list *destination_list) +{ + struct cfl_kv *processed_pair; + struct flb_config_map_val *source_entry; + struct mk_list *iterator; + struct flb_slist_entry *value; + struct flb_slist_entry *key; + + if (source_list == NULL || + mk_list_is_empty(source_list) == 0) { + + return 0; + } + + flb_config_map_foreach(iterator, source_entry, source_list) { + if (mk_list_size(source_entry->val.list) != 2) { + flb_plg_error(plugin_instance, + "'%s' expects a key and a value, " + "e.g: '%s version 1.8.0'", + setting_name, setting_name); + + return -1; + } + + key = mk_list_entry_first(source_entry->val.list, + struct flb_slist_entry, _head); + + value = mk_list_entry_last(source_entry->val.list, + struct flb_slist_entry, _head); + + processed_pair = cfl_kv_item_create(destination_list, + key->str, + value->str); + + if (processed_pair == NULL) { + flb_plg_error(plugin_instance, + "could not append attribute %s=%s\n", + key->str, + value->str); + + return -1; + } + } + + return 0; +} + +static void destroy_context(struct internal_processor_context *context) +{ + if (context != NULL) { + cfl_kv_release(&context->update_attributes); + cfl_kv_release(&context->insert_attributes); + cfl_kv_release(&context->upsert_attributes); + cfl_kv_release(&context->convert_attributes); + cfl_kv_release(&context->extract_attributes); + flb_slist_destroy(&context->delete_attributes); + flb_slist_destroy(&context->hash_attributes); + + flb_free(context); + } +} + +static struct internal_processor_context * + create_context(struct flb_processor_instance *processor_instance, + struct flb_config *config) +{ + struct internal_processor_context *context; + int result; + + context = flb_calloc(1, sizeof(struct internal_processor_context)); + + if (context != NULL) { + context->instance = processor_instance; + context->config = config; + + cfl_kv_init(&context->update_attributes); + cfl_kv_init(&context->insert_attributes); + cfl_kv_init(&context->upsert_attributes); + cfl_kv_init(&context->convert_attributes); + cfl_kv_init(&context->extract_attributes); + flb_slist_create(&context->delete_attributes); + flb_slist_create(&context->hash_attributes); + + result = flb_processor_instance_config_map_set(processor_instance, (void *) context); + + if (result == 0) { + result = process_attribute_modification_kvlist_setting( + processor_instance, + "update", + context->update_list, + &context->update_attributes); + } + + if (result == 0) { + result = process_attribute_modification_kvlist_setting( + processor_instance, + "insert", + context->insert_list, + &context->insert_attributes); + } + + if (result == 0) { + result = process_attribute_modification_kvlist_setting( + processor_instance, + "convert", + context->convert_list, + &context->convert_attributes); + } + + if (result == 0) { + result = process_attribute_modification_kvlist_setting( + processor_instance, + "extract", + context->extract_list, + &context->extract_attributes); + } + + if (result == 0) { + result = process_attribute_modification_kvlist_setting( + processor_instance, + "upsert", + context->upsert_list, + &context->upsert_attributes); + } + + if (result == 0) { + result = process_attribute_modification_list_setting( + processor_instance, + "delete", + context->delete_list, + &context->delete_attributes); + } + + if (result == 0) { + result = process_attribute_modification_list_setting( + processor_instance, + "hash", + context->hash_list, + &context->hash_attributes); + } + + if (result != 0) { + destroy_context(context); + + context = NULL; + } + } + else { + flb_errno(); + } + + return context; +} + +static int cb_init(struct flb_processor_instance *processor_instance, + void *source_plugin_instance, + int source_plugin_type, + struct flb_config *config) +{ + processor_instance->context = (void *) create_context( + processor_instance, config); + + if (processor_instance->context == NULL) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + + +static int cb_exit(struct flb_processor_instance *processor_instance) +{ + if (processor_instance != NULL && + processor_instance->context != NULL) { + destroy_context(processor_instance->context); + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int cfl_kvlist_contains(struct cfl_kvlist *kvlist, + char *name) +{ + struct cfl_list *iterator; + struct cfl_kvpair *pair; + + cfl_list_foreach(iterator, &kvlist->list) { + pair = cfl_list_entry(iterator, + struct cfl_kvpair, _head); + + if (strcasecmp(pair->key, name) == 0) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static void cfl_kvpair_destroy(struct cfl_kvpair *pair) +{ + if (pair != NULL) { + if (!cfl_list_entry_is_orphan(&pair->_head)) { + cfl_list_del(&pair->_head); + } + + if (pair->key != NULL) { + cfl_sds_destroy(pair->key); + } + + if (pair->val != NULL) { + cfl_variant_destroy(pair->val); + } + + free(pair); + } +} + +static int cfl_kvlist_remove(struct cfl_kvlist *kvlist, + char *name) +{ + struct cfl_list *iterator_backup; + struct cfl_list *iterator; + struct cfl_kvpair *pair; + + cfl_list_foreach_safe(iterator, iterator_backup, &kvlist->list) { + pair = cfl_list_entry(iterator, + struct cfl_kvpair, _head); + + if (strcasecmp(pair->key, name) == 0) { + cfl_kvpair_destroy(pair); + } + } + + return FLB_TRUE; +} + + +/* local declarations */ + + +static cfl_sds_t cfl_variant_convert_to_json(struct cfl_variant *value) +{ + cfl_sds_t json_result; + mpack_writer_t writer; + char *data; + size_t size; + + data = NULL; + size = 0; + + mpack_writer_init_growable(&writer, &data, &size); + + pack_cfl_variant(&writer, value); + + mpack_writer_destroy(&writer); + + json_result = flb_msgpack_raw_to_json_sds(data, size); + + return json_result; +} + + + +static int cfl_variant_convert(struct cfl_variant *input_value, + struct cfl_variant **output_value, + int output_type) +{ + char *converstion_canary; + struct cfl_variant temporary_value; + int errno_backup; + + errno_backup = errno; + *output_value = cfl_variant_create(); + + memset(&temporary_value, 0, sizeof(struct cfl_variant)); + + temporary_value.type = output_type; + + if (input_value->type == CFL_VARIANT_STRING || + input_value->type == CFL_VARIANT_BYTES || + input_value->type == CFL_VARIANT_REFERENCE) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + temporary_value.data.as_string = + cfl_sds_create_len( + input_value->data.as_string, + cfl_sds_len(input_value->data.as_string)); + + if (temporary_value.data.as_string == NULL) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_BOOL) { + temporary_value.data.as_bool = CFL_FALSE; + + if (strcasecmp(input_value->data.as_string, "true") == 0) { + temporary_value.data.as_bool = CFL_TRUE; + } + else if (strcasecmp(input_value->data.as_string, "off") == 0) { + temporary_value.data.as_bool = CFL_TRUE; + } + } + else if (output_type == CFL_VARIANT_INT) { + errno = 0; + temporary_value.data.as_int64 = strtoimax(input_value->data.as_string, + &converstion_canary, + 10); + + if (errno == ERANGE || errno == EINVAL) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + errno = errno_backup; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_DOUBLE) { + errno = 0; + converstion_canary = NULL; + temporary_value.data.as_double = strtod(input_value->data.as_string, + &converstion_canary); + + if (errno == ERANGE) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + errno = errno_backup; + + return CFL_FALSE; + } + else if (temporary_value.data.as_double == 0 && + converstion_canary == input_value->data.as_string) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + errno = errno_backup; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_ARRAY) { + temporary_value.data.as_array = cfl_array_create(1); + + if (temporary_value.data.as_array == NULL) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + + if (cfl_array_append_bytes(temporary_value.data.as_array, + input_value->data.as_bytes, + cfl_sds_len(input_value->data.as_bytes)) != 0) { + cfl_array_destroy(temporary_value.data.as_array); + + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + + temporary_value.data.as_array->entries[0]->type = output_type; + } + else { + return CFL_FALSE; + } + } + else if (input_value->type == CFL_VARIANT_INT) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + temporary_value.data.as_string = cfl_sds_create_size(64); + + if (temporary_value.data.as_string == NULL) { + return CFL_FALSE; + } + + /* We need to fix the wesleys truncation PR to cfl */ + converstion_canary = (char *) cfl_sds_printf( + &temporary_value.data.as_string, + "%" PRIi64, + input_value->data.as_int64); + + if (converstion_canary == NULL) { + cfl_sds_destroy(temporary_value.data.as_string); + + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_BOOL) { + temporary_value.data.as_bool = CFL_FALSE; + + if (input_value->data.as_int64 != 0) { + temporary_value.data.as_bool = CFL_TRUE; + } + } + else if (output_type == CFL_VARIANT_INT) { + temporary_value.data.as_int64 = input_value->data.as_int64; + } + else if (output_type == CFL_VARIANT_DOUBLE) { + temporary_value.data.as_double = (double) input_value->data.as_int64; + + /* This conversion could be lossy, we need to determine what we want to + * do in that case + */ + if ((int64_t) temporary_value.data.as_double != input_value->data.as_int64) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_ARRAY) { + temporary_value.data.as_array = cfl_array_create(1); + + if (temporary_value.data.as_array == NULL) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + + if (cfl_array_append_int64(temporary_value.data.as_array, + input_value->data.as_int64) != 0) { + cfl_array_destroy(temporary_value.data.as_array); + + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else { + return CFL_FALSE; + } + } + else if (input_value->type == CFL_VARIANT_DOUBLE) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + temporary_value.data.as_string = cfl_sds_create_size(64); + + if (temporary_value.data.as_string == NULL) { + return CFL_FALSE; + } + + /* We need to fix the wesleys truncation PR to cfl */ + converstion_canary = (char *) cfl_sds_printf( + &temporary_value.data.as_string, + "%.17g", + input_value->data.as_double); + + if (converstion_canary == NULL) { + cfl_sds_destroy(temporary_value.data.as_string); + + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_BOOL) { + temporary_value.data.as_bool = CFL_FALSE; + + if (input_value->data.as_double != 0) { + temporary_value.data.as_bool = CFL_TRUE; + } + } + else if (output_type == CFL_VARIANT_INT) { + temporary_value.data.as_int64 = (int64_t) round(input_value->data.as_double); + } + else if (output_type == CFL_VARIANT_DOUBLE) { + temporary_value.data.as_double = input_value->data.as_int64; + } + else if (output_type == CFL_VARIANT_ARRAY) { + temporary_value.data.as_array = cfl_array_create(1); + + if (temporary_value.data.as_array == NULL) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + + if (cfl_array_append_double(temporary_value.data.as_array, + input_value->data.as_double) != 0) { + cfl_array_destroy(temporary_value.data.as_array); + + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else { + return CFL_FALSE; + } + } + else if (input_value->type == CFL_VARIANT_KVLIST) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + temporary_value.data.as_string = cfl_variant_convert_to_json(input_value); + + if (temporary_value.data.as_string == NULL) { + return CFL_FALSE; + } + } + else { + return CFL_FALSE; + } + } + else if (input_value->type == CFL_VARIANT_ARRAY) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + temporary_value.data.as_string = cfl_variant_convert_to_json(input_value); + + if (temporary_value.data.as_string == NULL) { + return CFL_FALSE; + } + } + else { + return CFL_FALSE; + } + } + + memcpy(*output_value, &temporary_value, sizeof(struct cfl_variant)); + + return FLB_TRUE; +} + +static int span_contains_attribute(struct ctrace_span *span, + char *name) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + return cfl_kvlist_contains(span->attr->kv, name); +} + +static int span_remove_attribute(struct ctrace_span *span, + char *name) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + return cfl_kvlist_remove(span->attr->kv, name); +} + +static int span_update_attribute(struct ctrace_span *span, + char *name, + char *value) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + cfl_kvlist_remove(span->attr->kv, name); + + if (ctr_span_set_attribute_string(span, name, value) != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int span_insert_attribute(struct ctrace_span *span, + char *name, + char *value) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + if (ctr_span_set_attribute_string(span, name, value) != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int span_transform_attribute(struct ctrace_span *span, + char *name, + attribute_transformer transformer) +{ + struct cfl_variant *attribute; + + if (span->attr == NULL) { + return FLB_FALSE; + } + + attribute = cfl_kvlist_fetch(span->attr->kv, name); + + if (attribute == NULL) { + return FLB_FALSE; + } + + return transformer(NULL, attribute); +} + +static int span_convert_attribute(struct ctrace_span *span, + char *name, + char *new_type) +{ + struct cfl_variant *converted_attribute; + int new_type_constant; + struct cfl_variant *attribute; + int result; + + if (strcasecmp(new_type, "string") == 0 || + strcasecmp(new_type, "str") == 0) { + new_type_constant = CFL_VARIANT_STRING; + } + else if (strcasecmp(new_type, "bytes") == 0) { + new_type_constant = CFL_VARIANT_BYTES; + } + else if (strcasecmp(new_type, "boolean") == 0 || + strcasecmp(new_type, "bool") == 0) { + new_type_constant = CFL_VARIANT_BOOL; + } + else if (strcasecmp(new_type, "integer") == 0 || + strcasecmp(new_type, "int64") == 0 || + strcasecmp(new_type, "int") == 0) { + new_type_constant = CFL_VARIANT_INT; + } + else if (strcasecmp(new_type, "double") == 0 || + strcasecmp(new_type, "dbl") == 0) { + new_type_constant = CFL_VARIANT_DOUBLE; + } + else if (strcasecmp(new_type, "array") == 0) { + new_type_constant = CFL_VARIANT_ARRAY; + } + else { + return FLB_FALSE; + } + + if (span->attr == NULL) { + return FLB_FALSE; + } + + attribute = cfl_kvlist_fetch(span->attr->kv, name); + + if (attribute == NULL) { + return FLB_FALSE; + } + + result = cfl_variant_convert(attribute, + &converted_attribute, + new_type_constant); + + if (result != FLB_TRUE) { + return FLB_FALSE; + } + + result = cfl_kvlist_remove(span->attr->kv, name); + + if (result != FLB_TRUE) { + return FLB_FALSE; + } + + + result = cfl_kvlist_insert(span->attr->kv, name, converted_attribute); + + if (result != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static void attribute_match_cb(const char *name, + const char *value, + size_t value_length, + void *context) +{ + cfl_sds_t temporary_value; + struct ctrace_span *span; + + temporary_value = cfl_sds_create_len(value, value_length); + + if (temporary_value != NULL) { + span = (struct ctrace_span *) context; + + if (span_contains_attribute(span, name) == FLB_TRUE) { + span_remove_attribute(span, name); + } + + ctr_span_set_attribute_string(span, name, temporary_value); + + cfl_sds_destroy(temporary_value); + } +} + +static int span_extract_attributes(struct ctrace_span *span, + char *name, + char *pattern) +{ + ssize_t match_count; + struct flb_regex_search match_list; + struct cfl_variant *attribute; + int result; + struct flb_regex *regex; + + regex = flb_regex_create(pattern); + + if (regex == NULL) { + return FLB_FALSE; + } + + attribute = cfl_kvlist_fetch(span->attr->kv, name); + + if (attribute == NULL) { + flb_regex_destroy(regex); + + return FLB_FALSE; + } + + + if (attribute->type != CFL_VARIANT_STRING) { + flb_regex_destroy(regex); + + return FLB_FALSE; + } + + match_count = flb_regex_do(regex, + attribute->data.as_string, + cfl_sds_len(attribute->data.as_string), + &match_list); + + if (match_count <= 0) { + flb_regex_destroy(regex); + + return FLB_FALSE; + } + + + result = flb_regex_parse(regex, + &match_list, + attribute_match_cb, + (void *) span); + + flb_regex_destroy(regex); + + if (result == -1) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int traces_context_contains_attribute(struct ctrace *traces_context, + char *name) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int hash_transformer(void *context, struct cfl_variant *value) +{ + unsigned char digest_buffer[32]; + struct cfl_variant *converted_value; + cfl_sds_t encoded_hash; + int result; + + if (value == NULL) { + return FLB_FALSE; + } + + result = cfl_variant_convert(value, + &converted_value, + CFL_VARIANT_STRING); + + if (result != FLB_TRUE) { + return FLB_FALSE; + } + + if (cfl_sds_len(converted_value->data.as_string) == 0) { + cfl_variant_destroy(converted_value); + + return FLB_TRUE; + } + + result = flb_hash_simple(FLB_HASH_SHA256, + (unsigned char *) converted_value->data.as_string, + cfl_sds_len(converted_value->data.as_string), + digest_buffer, + sizeof(digest_buffer)); + + if (result != FLB_CRYPTO_SUCCESS) { + cfl_variant_destroy(converted_value); + + return FLB_FALSE; + } + + result = hex_encode(digest_buffer, + sizeof(digest_buffer), + &converted_value->data.as_string); + + if (result != FLB_TRUE) { + cfl_variant_destroy(converted_value); + + return FLB_FALSE; + } + + encoded_hash = cfl_sds_create(converted_value->data.as_string); + + if (encoded_hash == NULL) { + cfl_variant_destroy(converted_value); + + return FLB_FALSE; + } + + if (value->type == CFL_VARIANT_STRING || + value->type == CFL_VARIANT_BYTES) { + cfl_sds_destroy(value->data.as_string); + } + else if (value->type == CFL_VARIANT_ARRAY) { + cfl_array_destroy(value->data.as_array); + } + else if (value->type == CFL_VARIANT_KVLIST) { + cfl_kvlist_destroy(value->data.as_kvlist); + } + + value->type = CFL_VARIANT_STRING; + value->data.as_string = encoded_hash; + + return FLB_TRUE; +} + +static int traces_context_hash_attribute(struct ctrace *traces_context, + char *name) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_transform_attribute(span, name, hash_transformer) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_remove_attribute(struct ctrace *traces_context, + char *name) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_remove_attribute(span, name) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_update_attribute(struct ctrace *traces_context, + char *name, + char *value) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_update_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_insert_attribute(struct ctrace *traces_context, + char *name, + char *value) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (!span_contains_attribute(span, name) == FLB_TRUE) { + if (span_insert_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_upsert_attribute(struct ctrace *traces_context, + char *name, + char *value) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_update_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + else { + if (span_insert_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_convert_attribute(struct ctrace *traces_context, + char *name, + char *new_type) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_convert_attribute(span, name, new_type) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_extract_attribute(struct ctrace *traces_context, + char *name, + char *pattern) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_extract_attributes(span, name, pattern) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int delete_attributes(struct ctrace *traces_context, + struct mk_list *attributes) +{ + struct mk_list *iterator; + int result; + struct flb_slist_entry *entry; + + mk_list_foreach(iterator, attributes) { + entry = mk_list_entry(iterator, struct flb_slist_entry, _head); + + result = traces_context_contains_attribute(traces_context, + entry->str); + + if (result == FLB_TRUE) { + result = traces_context_remove_attribute(traces_context, + entry->str); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int update_attributes(struct ctrace *traces_context, + struct cfl_list *attributes) +{ + struct cfl_list *iterator; + int result; + struct cfl_kv *pair; + + cfl_list_foreach(iterator, attributes) { + pair = cfl_list_entry(iterator, struct cfl_kv, _head); + + result = traces_context_update_attribute(traces_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int upsert_attributes(struct ctrace *traces_context, + struct cfl_list *attributes) +{ + struct cfl_list *iterator; + int result; + struct cfl_kv *pair; + + cfl_list_foreach(iterator, attributes) { + pair = cfl_list_entry(iterator, struct cfl_kv, _head); + + result = traces_context_upsert_attribute(traces_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int convert_attributes(struct ctrace *traces_context, + struct cfl_list *attributes) +{ + struct cfl_list *iterator; + int result; + struct cfl_kv *pair; + + cfl_list_foreach(iterator, attributes) { + pair = cfl_list_entry(iterator, struct cfl_kv, _head); + + result = traces_context_convert_attribute(traces_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int extract_attributes(struct ctrace *traces_context, + struct cfl_list *attributes) +{ + struct cfl_list *iterator; + int result; + struct cfl_kv *pair; + + cfl_list_foreach(iterator, attributes) { + pair = cfl_list_entry(iterator, struct cfl_kv, _head); + + result = traces_context_extract_attribute(traces_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int insert_attributes(struct ctrace *traces_context, + struct cfl_list *attributes) +{ + struct cfl_list *iterator; + int result; + struct cfl_kv *pair; + + cfl_list_foreach(iterator, attributes) { + pair = cfl_list_entry(iterator, struct cfl_kv, _head); + + result = traces_context_insert_attribute(traces_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int hash_attributes(struct ctrace *traces_context, + struct mk_list *attributes) +{ + struct mk_list *iterator; + int result; + struct flb_slist_entry *entry; + + mk_list_foreach(iterator, attributes) { + entry = mk_list_entry(iterator, struct flb_slist_entry, _head); + + result = traces_context_contains_attribute(traces_context, + entry->str); + + if (result == FLB_TRUE) { + result = traces_context_hash_attribute(traces_context, + entry->str); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_process_traces(struct flb_processor_instance *processor_instance, + struct ctrace *traces_context, + const char *tag, + int tag_len) +{ + struct internal_processor_context *processor_context; + int result; + + processor_context = + (struct internal_processor_context *) processor_instance->context; + + result = delete_attributes(traces_context, + &processor_context->delete_attributes); + + if (result == FLB_PROCESSOR_SUCCESS) { + result = update_attributes(traces_context, + &processor_context->update_attributes); + } + + if (result == FLB_PROCESSOR_SUCCESS) { + result = upsert_attributes(traces_context, + &processor_context->upsert_attributes); + } + + if (result == FLB_PROCESSOR_SUCCESS) { + result = insert_attributes(traces_context, + &processor_context->insert_attributes); + } + + if (result == FLB_PROCESSOR_SUCCESS) { + result = convert_attributes(traces_context, + &processor_context->convert_attributes); + result = FLB_PROCESSOR_SUCCESS; + } + + if (result == FLB_PROCESSOR_SUCCESS) { + result = extract_attributes(traces_context, + &processor_context->extract_attributes); + } + + if (result == FLB_PROCESSOR_SUCCESS) { + result = hash_attributes(traces_context, + &processor_context->hash_attributes); + } + + if (result != FLB_PROCESSOR_SUCCESS) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_SLIST_1, "update", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + update_list), + "Updates an attribute. Usage : 'update name value'" + }, + { + FLB_CONFIG_MAP_SLIST_1, "insert", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + insert_list), + "Inserts an attribute. Usage : 'insert name value'" + }, + { + FLB_CONFIG_MAP_SLIST_1, "upsert", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + upsert_list), + "Inserts or updates an attribute. Usage : 'upsert name value'" + }, + { + FLB_CONFIG_MAP_SLIST_1, "convert", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + convert_list), + "Converts an attribute. Usage : 'convert name new_type'" + }, + { + FLB_CONFIG_MAP_SLIST_1, "extract", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + extract_list), + "Extracts regular expression match groups as individual attributes. Usage : 'extract (?P<first_word>[^ ]*) (?P<second_word>[^ ]*)'" + }, + { + FLB_CONFIG_MAP_STR, "delete", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + delete_list), + "Deletes an attribute. Usage : 'delete name'" + }, + { + FLB_CONFIG_MAP_STR, "hash", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + hash_list), + "Replaces an attributes value with its SHA256 hash. Usage : 'hash name'" + }, + + /* EOF */ + {0} +}; + +struct flb_processor_plugin processor_attributes_plugin = { + .name = "attributes", + .description = "Modifies metrics attributes", + .cb_init = cb_init, + .cb_process_logs = NULL, + .cb_process_metrics = NULL, + .cb_process_traces = cb_process_traces, + .cb_exit = cb_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/src/fluent-bit/plugins/processor_attributes/variant_utils.h b/src/fluent-bit/plugins/processor_attributes/variant_utils.h new file mode 100644 index 000000000..7ba376273 --- /dev/null +++ b/src/fluent-bit/plugins/processor_attributes/variant_utils.h @@ -0,0 +1,626 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* CMetrics + * ======== + * Copyright 2021-2022 The CMetrics Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef VARIANT_UTILS_H +#define VARIANT_UTILS_H + +#include <mpack/mpack.h> + +/* These are the only functions meant for general use, + * the reason why the kvlist packing and unpacking + * functions are exposed is the internal and external + * metadata kvlists in the cmetrics context are not + * contained by a variant instance. + * + * Result : + * Upon success all of these return 0, otherwise they will + * raise the innermost error code which should be treated + * as an opaque value. + * + * Notes : + * When decoding -1 means the check after mpack_read_tag + * failed and -2 means the type was not the one expected + */ + +static inline int pack_cfl_variant(mpack_writer_t *writer, + struct cfl_variant *value); + +static inline int pack_cfl_variant_kvlist(mpack_writer_t *writer, + struct cfl_kvlist *kvlist); + +static inline int unpack_cfl_variant(mpack_reader_t *reader, + struct cfl_variant **value); + +static inline int unpack_cfl_kvlist(mpack_reader_t *reader, + struct cfl_kvlist **result_kvlist); + +/* Packers */ +static inline int pack_cfl_variant_string(mpack_writer_t *writer, + char *value) +{ + mpack_write_cstr(writer, value); + + return 0; +} + +static inline int pack_cfl_variant_binary(mpack_writer_t *writer, + char *value, + size_t length) +{ + mpack_write_bin(writer, value, length); + + return 0; +} + +static inline int pack_cfl_variant_boolean(mpack_writer_t *writer, + unsigned int value) +{ + mpack_write_bool(writer, value); + + return 0; +} + +static inline int pack_cfl_variant_int64(mpack_writer_t *writer, + int64_t value) +{ + mpack_write_int(writer, value); + + return 0; +} + +static inline int pack_cfl_variant_double(mpack_writer_t *writer, + double value) +{ + mpack_write_double(writer, value); + + return 0; +} + +static inline int pack_cfl_variant_array(mpack_writer_t *writer, + struct cfl_array *array) +{ + size_t entry_count; + struct cfl_variant *entry_value; + int result; + size_t index; + + entry_count = array->entry_count; + + mpack_start_array(writer, entry_count); + + for (index = 0 ; index < entry_count ; index++) { + entry_value = cfl_array_fetch_by_index(array, index); + + if (entry_value == NULL) { + return -1; + } + + result = pack_cfl_variant(writer, entry_value); + + if (result != 0) { + return result; + } + } + + mpack_finish_array(writer); + + return 0; +} + +static inline int pack_cfl_variant_kvlist(mpack_writer_t *writer, + struct cfl_kvlist *kvlist) { + size_t entry_count; + struct cfl_list *iterator; + struct cfl_kvpair *kvpair; + int result; + + entry_count = cfl_kvlist_count(kvlist); + + mpack_start_map(writer, entry_count); + + cfl_list_foreach(iterator, &kvlist->list) { + kvpair = cfl_list_entry(iterator, struct cfl_kvpair, _head); + + mpack_write_cstr(writer, kvpair->key); + + result = pack_cfl_variant(writer, kvpair->val); + + if (result != 0) { + return result; + } + } + + mpack_finish_map(writer); + + return 0; +} + +static inline int pack_cfl_variant(mpack_writer_t *writer, + struct cfl_variant *value) +{ + int result; + + if (value->type == CFL_VARIANT_STRING) { + result = pack_cfl_variant_string(writer, value->data.as_string); + } + else if (value->type == CFL_VARIANT_BOOL) { + result = pack_cfl_variant_boolean(writer, value->data.as_bool); + } + else if (value->type == CFL_VARIANT_INT) { + result = pack_cfl_variant_int64(writer, value->data.as_int64); + } + else if (value->type == CFL_VARIANT_DOUBLE) { + result = pack_cfl_variant_double(writer, value->data.as_double); + } + else if (value->type == CFL_VARIANT_ARRAY) { + result = pack_cfl_variant_array(writer, value->data.as_array); + } + else if (value->type == CFL_VARIANT_KVLIST) { + result = pack_cfl_variant_kvlist(writer, value->data.as_kvlist); + } + else if (value->type == CFL_VARIANT_BYTES) { + result = pack_cfl_variant_binary(writer, + value->data.as_bytes, + cfl_sds_len(value->data.as_bytes)); + } + else if (value->type == CFL_VARIANT_REFERENCE) { + result = pack_cfl_variant_string(writer, value->data.as_string); + } + else { + result = -1; + } + + return result; +} + +/* Unpackers */ + +static inline int unpack_cfl_variant_read_tag(mpack_reader_t *reader, + mpack_tag_t *tag, + mpack_type_t expected_type) +{ + *tag = mpack_read_tag(reader); + + if (mpack_ok != mpack_reader_error(reader)) { + return -1; + } + + if (mpack_tag_type(tag) != expected_type) { + return -2; + } + + return 0; +} + +static inline int unpack_cfl_array(mpack_reader_t *reader, + struct cfl_array **result_array) +{ + struct cfl_array *internal_array; + size_t entry_count; + struct cfl_variant *entry_value; + int result; + size_t index; + mpack_tag_t tag; + + result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_array); + + if (result != 0) { + return result; + } + + entry_count = mpack_tag_array_count(&tag); + + internal_array = cfl_array_create(entry_count); + + if (internal_array == NULL) { + return -3; + } + + for (index = 0 ; index < entry_count ; index++) { + result = unpack_cfl_variant(reader, &entry_value); + + if (result != 0) { + cfl_array_destroy(internal_array); + + return -4; + } + + result = cfl_array_append(internal_array, entry_value); + + if (result != 0) { + cfl_array_destroy(internal_array); + + return -5; + } + } + + mpack_done_array(reader); + + if (mpack_reader_error(reader) != mpack_ok) { + cfl_array_destroy(internal_array); + + return -6; + } + + *result_array = internal_array; + + return 0; +} + +static inline int unpack_cfl_kvlist(mpack_reader_t *reader, + struct cfl_kvlist **result_kvlist) +{ + struct cfl_kvlist *internal_kvlist; + char key_name[256]; + size_t entry_count; + size_t key_length; + struct cfl_variant *key_value; + mpack_tag_t key_tag; + int result; + size_t index; + mpack_tag_t tag; + + result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_map); + + if (result != 0) { + return result; + } + + entry_count = mpack_tag_map_count(&tag); + + internal_kvlist = cfl_kvlist_create(); + + if (internal_kvlist == NULL) { + return -3; + } + + result = 0; + key_value = NULL; + + for (index = 0 ; index < entry_count ; index++) { + result = unpack_cfl_variant_read_tag(reader, &key_tag, mpack_type_str); + + if (result != 0) { + result = -4; + + break; + } + + key_length = mpack_tag_str_length(&key_tag); + + if (key_length >= sizeof(key_name)) { + result = -5; + + break; + } + + mpack_read_cstr(reader, key_name, sizeof(key_name), key_length); + + key_name[key_length] = '\0'; + + mpack_done_str(reader); + + if (mpack_ok != mpack_reader_error(reader)) { + result = -6; + + break; + } + + result = unpack_cfl_variant(reader, &key_value); + + if (result != 0) { + result = -7; + + break; + } + + result = cfl_kvlist_insert(internal_kvlist, key_name, key_value); + + if (result != 0) { + result = -8; + + break; + } + + key_value = NULL; + } + + mpack_done_map(reader); + + if (mpack_reader_error(reader) != mpack_ok) { + result = -9; + } + + if (result != 0) { + cfl_kvlist_destroy(internal_kvlist); + + if (key_value != NULL) { + cfl_variant_destroy(key_value); + } + } + else { + *result_kvlist = internal_kvlist; + } + + return result; +} + +static inline int unpack_cfl_variant_string(mpack_reader_t *reader, + struct cfl_variant **value) +{ + size_t value_length; + char *value_data; + int result; + mpack_tag_t tag; + + result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_str); + + if (result != 0) { + return result; + } + + value_length = mpack_tag_str_length(&tag); + + value_data = cfl_sds_create_size(value_length + 1); + + if (value_data == NULL) { + return -3; + } + + cfl_sds_set_len(value_data, value_length); + + mpack_read_cstr(reader, value_data, value_length + 1, value_length); + + mpack_done_str(reader); + + if (mpack_reader_error(reader) != mpack_ok) { + cfl_sds_destroy(value_data); + + return -4; + } + + *value = cfl_variant_create_from_reference(value_data); + + if (*value == NULL) { + return -5; + } + + (*value)->type = CFL_VARIANT_STRING; + + return 0; +} + +static inline int unpack_cfl_variant_binary(mpack_reader_t *reader, + struct cfl_variant **value) +{ + size_t value_length; + char *value_data; + int result; + mpack_tag_t tag; + + result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_bin); + + if (result != 0) { + return result; + } + + value_length = mpack_tag_bin_length(&tag); + + value_data = cfl_sds_create_size(value_length); + + if (value_data == NULL) { + return -3; + } + + cfl_sds_set_len(value_data, value_length); + + mpack_read_bytes(reader, value_data, value_length); + + mpack_done_bin(reader); + + if (mpack_reader_error(reader) != mpack_ok) { + cfl_sds_destroy(value_data); + + return -4; + } + + *value = cfl_variant_create_from_reference(value_data); + + if (*value == NULL) { + return -5; + } + + (*value)->type = CFL_VARIANT_BYTES; + + return 0; +} + +static inline int unpack_cfl_variant_boolean(mpack_reader_t *reader, + struct cfl_variant **value) +{ + int result; + mpack_tag_t tag; + + result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_bool); + + if (result != 0) { + return result; + } + + *value = cfl_variant_create_from_bool((unsigned int) mpack_tag_bool_value(&tag)); + + if (*value == NULL) { + return -3; + } + + return 0; +} + +static inline int unpack_cfl_variant_uint64(mpack_reader_t *reader, + struct cfl_variant **value) +{ + int result; + mpack_tag_t tag; + + result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_uint); + + if (result != 0) { + return result; + } + + *value = cfl_variant_create_from_int64((int64_t) mpack_tag_uint_value(&tag)); + + if (*value == NULL) { + return -3; + } + + return 0; +} + +static inline int unpack_cfl_variant_int64(mpack_reader_t *reader, + struct cfl_variant **value) +{ + int result; + mpack_tag_t tag; + + result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_int); + + if (result != 0) { + return result; + } + + *value = cfl_variant_create_from_int64((int64_t) mpack_tag_int_value(&tag)); + + if (*value == NULL) { + return -3; + } + + return 0; +} + +static inline int unpack_cfl_variant_double(mpack_reader_t *reader, + struct cfl_variant **value) +{ + int result; + mpack_tag_t tag; + + result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_double); + + if (result != 0) { + return result; + } + + *value = cfl_variant_create_from_double(mpack_tag_double_value(&tag)); + + if (*value == NULL) { + return -3; + } + + return 0; +} + +static inline int unpack_cfl_variant_array(mpack_reader_t *reader, + struct cfl_variant **value) +{ + struct cfl_array *unpacked_array; + int result; + + result = unpack_cfl_array(reader, &unpacked_array); + + if (result != 0) { + return result; + } + + *value = cfl_variant_create_from_array(unpacked_array); + + if (*value == NULL) { + return -3; + } + + return 0; +} + +static inline int unpack_cfl_variant_kvlist(mpack_reader_t *reader, + struct cfl_variant **value) +{ + struct cfl_kvlist *unpacked_kvlist; + int result; + + result = unpack_cfl_kvlist(reader, &unpacked_kvlist); + + if (result != 0) { + return result; + } + + *value = cfl_variant_create_from_kvlist(unpacked_kvlist); + + if (*value == NULL) { + return -3; + } + + return 0; +} + +static inline int unpack_cfl_variant(mpack_reader_t *reader, + struct cfl_variant **value) +{ + mpack_type_t value_type; + int result; + mpack_tag_t tag; + + tag = mpack_peek_tag(reader); + + if (mpack_ok != mpack_reader_error(reader)) { + return -1; + } + + value_type = mpack_tag_type(&tag); + + if (value_type == mpack_type_str) { + result = unpack_cfl_variant_string(reader, value); + } + else if (value_type == mpack_type_str) { + result = unpack_cfl_variant_boolean(reader, value); + } + else if (value_type == mpack_type_int) { + result = unpack_cfl_variant_int64(reader, value); + } + else if (value_type == mpack_type_uint) { + result = unpack_cfl_variant_uint64(reader, value); + } + else if (value_type == mpack_type_double) { + result = unpack_cfl_variant_double(reader, value); + } + else if (value_type == mpack_type_array) { + result = unpack_cfl_variant_array(reader, value); + } + else if (value_type == mpack_type_map) { + result = unpack_cfl_variant_kvlist(reader, value); + } + else if (value_type == mpack_type_bin) { + result = unpack_cfl_variant_binary(reader, value); + } + else { + result = -1; + } + + return result; +} + +#endif |