summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/processor_attributes
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/processor_attributes')
-rw-r--r--src/fluent-bit/plugins/processor_attributes/CMakeLists.txt4
-rw-r--r--src/fluent-bit/plugins/processor_attributes/attributes.c1408
-rw-r--r--src/fluent-bit/plugins/processor_attributes/variant_utils.h626
3 files changed, 2038 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/processor_attributes/CMakeLists.txt b/src/fluent-bit/plugins/processor_attributes/CMakeLists.txt
new file mode 100644
index 000000000..db01390ed
--- /dev/null
+++ b/src/fluent-bit/plugins/processor_attributes/CMakeLists.txt
@@ -0,0 +1,4 @@
+set(src
+ attributes.c)
+
+FLB_PLUGIN(processor_attributes "${src}" "")
diff --git a/src/fluent-bit/plugins/processor_attributes/attributes.c b/src/fluent-bit/plugins/processor_attributes/attributes.c
new file mode 100644
index 000000000..a59c07ae2
--- /dev/null
+++ b/src/fluent-bit/plugins/processor_attributes/attributes.c
@@ -0,0 +1,1408 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <math.h>
+
+#include <fluent-bit/flb_regex.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_processor_plugin.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_hash.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_processor.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#include <cmetrics/cmetrics.h>
+#include <cmetrics/cmt_histogram.h>
+#include <cmetrics/cmt_summary.h>
+#include <cmetrics/cmt_untyped.h>
+#include <cmetrics/cmt_counter.h>
+#include <cmetrics/cmt_gauge.h>
+#include <cmetrics/cmt_map.h>
+
+#include <cfl/cfl.h>
+
+#include "variant_utils.h"
+
+typedef int (*attribute_transformer)(void *, struct cfl_variant *value);
+
+struct internal_processor_context {
+ struct mk_list *update_list;
+ struct mk_list *insert_list;
+ struct mk_list *upsert_list;
+ struct mk_list *convert_list;
+ struct mk_list *extract_list;
+ struct mk_list *delete_list;
+ struct mk_list *hash_list;
+
+ /* internal attributes ready to append */
+ struct cfl_list update_attributes;
+ struct cfl_list insert_attributes;
+ struct cfl_list upsert_attributes;
+ struct cfl_list convert_attributes;
+ struct cfl_list extract_attributes;
+ struct mk_list delete_attributes;
+ struct mk_list hash_attributes;
+
+ struct flb_processor_instance *instance;
+ struct flb_config *config;
+};
+
+/*
+ * LOCAL
+ */
+static int hex_encode(unsigned char *input_buffer,
+ size_t input_length,
+ cfl_sds_t *output_buffer)
+{
+ const char hex[] = "0123456789abcdef";
+ cfl_sds_t result;
+ size_t index;
+
+ if (cfl_sds_alloc(*output_buffer) <= (input_length * 2)) {
+ result = cfl_sds_increase(*output_buffer,
+ (input_length * 2) -
+ cfl_sds_alloc(*output_buffer));
+
+ if (result == NULL) {
+ return FLB_FALSE;
+ }
+
+ *output_buffer = result;
+ }
+
+ for (index = 0; index < input_length; index++) {
+ (*output_buffer)[index * 2 + 0] = hex[(input_buffer[index] >> 4) & 0xF];
+ (*output_buffer)[index * 2 + 1] = hex[(input_buffer[index] >> 0) & 0xF];
+ }
+
+ cfl_sds_set_len(*output_buffer, input_length * 2);
+
+ (*output_buffer)[index * 2] = '\0';
+
+ return FLB_TRUE;
+}
+
+static int process_attribute_modification_list_setting(
+ struct flb_processor_instance *plugin_instance,
+ const char *setting_name,
+ struct mk_list *source_list,
+ struct mk_list *destination_list)
+{
+ struct flb_config_map_val *source_entry;
+ struct mk_list *iterator;
+ int result;
+
+ if (source_list == NULL ||
+ mk_list_is_empty(source_list) == 0) {
+
+ return 0;
+ }
+
+ flb_config_map_foreach(iterator, source_entry, source_list) {
+ result = flb_slist_add(destination_list, source_entry->val.str);
+
+ if (result != 0) {
+ flb_plg_error(plugin_instance,
+ "could not append attribute name %s\n",
+ source_entry->val.str);
+
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int process_attribute_modification_kvlist_setting(
+ struct flb_processor_instance *plugin_instance,
+ const char *setting_name,
+ struct mk_list *source_list,
+ struct cfl_list *destination_list)
+{
+ struct cfl_kv *processed_pair;
+ struct flb_config_map_val *source_entry;
+ struct mk_list *iterator;
+ struct flb_slist_entry *value;
+ struct flb_slist_entry *key;
+
+ if (source_list == NULL ||
+ mk_list_is_empty(source_list) == 0) {
+
+ return 0;
+ }
+
+ flb_config_map_foreach(iterator, source_entry, source_list) {
+ if (mk_list_size(source_entry->val.list) != 2) {
+ flb_plg_error(plugin_instance,
+ "'%s' expects a key and a value, "
+ "e.g: '%s version 1.8.0'",
+ setting_name, setting_name);
+
+ return -1;
+ }
+
+ key = mk_list_entry_first(source_entry->val.list,
+ struct flb_slist_entry, _head);
+
+ value = mk_list_entry_last(source_entry->val.list,
+ struct flb_slist_entry, _head);
+
+ processed_pair = cfl_kv_item_create(destination_list,
+ key->str,
+ value->str);
+
+ if (processed_pair == NULL) {
+ flb_plg_error(plugin_instance,
+ "could not append attribute %s=%s\n",
+ key->str,
+ value->str);
+
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static void destroy_context(struct internal_processor_context *context)
+{
+ if (context != NULL) {
+ cfl_kv_release(&context->update_attributes);
+ cfl_kv_release(&context->insert_attributes);
+ cfl_kv_release(&context->upsert_attributes);
+ cfl_kv_release(&context->convert_attributes);
+ cfl_kv_release(&context->extract_attributes);
+ flb_slist_destroy(&context->delete_attributes);
+ flb_slist_destroy(&context->hash_attributes);
+
+ flb_free(context);
+ }
+}
+
+static struct internal_processor_context *
+ create_context(struct flb_processor_instance *processor_instance,
+ struct flb_config *config)
+{
+ struct internal_processor_context *context;
+ int result;
+
+ context = flb_calloc(1, sizeof(struct internal_processor_context));
+
+ if (context != NULL) {
+ context->instance = processor_instance;
+ context->config = config;
+
+ cfl_kv_init(&context->update_attributes);
+ cfl_kv_init(&context->insert_attributes);
+ cfl_kv_init(&context->upsert_attributes);
+ cfl_kv_init(&context->convert_attributes);
+ cfl_kv_init(&context->extract_attributes);
+ flb_slist_create(&context->delete_attributes);
+ flb_slist_create(&context->hash_attributes);
+
+ result = flb_processor_instance_config_map_set(processor_instance, (void *) context);
+
+ if (result == 0) {
+ result = process_attribute_modification_kvlist_setting(
+ processor_instance,
+ "update",
+ context->update_list,
+ &context->update_attributes);
+ }
+
+ if (result == 0) {
+ result = process_attribute_modification_kvlist_setting(
+ processor_instance,
+ "insert",
+ context->insert_list,
+ &context->insert_attributes);
+ }
+
+ if (result == 0) {
+ result = process_attribute_modification_kvlist_setting(
+ processor_instance,
+ "convert",
+ context->convert_list,
+ &context->convert_attributes);
+ }
+
+ if (result == 0) {
+ result = process_attribute_modification_kvlist_setting(
+ processor_instance,
+ "extract",
+ context->extract_list,
+ &context->extract_attributes);
+ }
+
+ if (result == 0) {
+ result = process_attribute_modification_kvlist_setting(
+ processor_instance,
+ "upsert",
+ context->upsert_list,
+ &context->upsert_attributes);
+ }
+
+ if (result == 0) {
+ result = process_attribute_modification_list_setting(
+ processor_instance,
+ "delete",
+ context->delete_list,
+ &context->delete_attributes);
+ }
+
+ if (result == 0) {
+ result = process_attribute_modification_list_setting(
+ processor_instance,
+ "hash",
+ context->hash_list,
+ &context->hash_attributes);
+ }
+
+ if (result != 0) {
+ destroy_context(context);
+
+ context = NULL;
+ }
+ }
+ else {
+ flb_errno();
+ }
+
+ return context;
+}
+
+static int cb_init(struct flb_processor_instance *processor_instance,
+ void *source_plugin_instance,
+ int source_plugin_type,
+ struct flb_config *config)
+{
+ processor_instance->context = (void *) create_context(
+ processor_instance, config);
+
+ if (processor_instance->context == NULL) {
+ return FLB_PROCESSOR_FAILURE;
+ }
+
+ return FLB_PROCESSOR_SUCCESS;
+}
+
+
+static int cb_exit(struct flb_processor_instance *processor_instance)
+{
+ if (processor_instance != NULL &&
+ processor_instance->context != NULL) {
+ destroy_context(processor_instance->context);
+ }
+
+ return FLB_PROCESSOR_SUCCESS;
+}
+
+static int cfl_kvlist_contains(struct cfl_kvlist *kvlist,
+ char *name)
+{
+ struct cfl_list *iterator;
+ struct cfl_kvpair *pair;
+
+ cfl_list_foreach(iterator, &kvlist->list) {
+ pair = cfl_list_entry(iterator,
+ struct cfl_kvpair, _head);
+
+ if (strcasecmp(pair->key, name) == 0) {
+ return FLB_TRUE;
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+static void cfl_kvpair_destroy(struct cfl_kvpair *pair)
+{
+ if (pair != NULL) {
+ if (!cfl_list_entry_is_orphan(&pair->_head)) {
+ cfl_list_del(&pair->_head);
+ }
+
+ if (pair->key != NULL) {
+ cfl_sds_destroy(pair->key);
+ }
+
+ if (pair->val != NULL) {
+ cfl_variant_destroy(pair->val);
+ }
+
+ free(pair);
+ }
+}
+
+static int cfl_kvlist_remove(struct cfl_kvlist *kvlist,
+ char *name)
+{
+ struct cfl_list *iterator_backup;
+ struct cfl_list *iterator;
+ struct cfl_kvpair *pair;
+
+ cfl_list_foreach_safe(iterator, iterator_backup, &kvlist->list) {
+ pair = cfl_list_entry(iterator,
+ struct cfl_kvpair, _head);
+
+ if (strcasecmp(pair->key, name) == 0) {
+ cfl_kvpair_destroy(pair);
+ }
+ }
+
+ return FLB_TRUE;
+}
+
+
+/* local declarations */
+
+
+static cfl_sds_t cfl_variant_convert_to_json(struct cfl_variant *value)
+{
+ cfl_sds_t json_result;
+ mpack_writer_t writer;
+ char *data;
+ size_t size;
+
+ data = NULL;
+ size = 0;
+
+ mpack_writer_init_growable(&writer, &data, &size);
+
+ pack_cfl_variant(&writer, value);
+
+ mpack_writer_destroy(&writer);
+
+ json_result = flb_msgpack_raw_to_json_sds(data, size);
+
+ return json_result;
+}
+
+
+
+static int cfl_variant_convert(struct cfl_variant *input_value,
+ struct cfl_variant **output_value,
+ int output_type)
+{
+ char *converstion_canary;
+ struct cfl_variant temporary_value;
+ int errno_backup;
+
+ errno_backup = errno;
+ *output_value = cfl_variant_create();
+
+ memset(&temporary_value, 0, sizeof(struct cfl_variant));
+
+ temporary_value.type = output_type;
+
+ if (input_value->type == CFL_VARIANT_STRING ||
+ input_value->type == CFL_VARIANT_BYTES ||
+ input_value->type == CFL_VARIANT_REFERENCE) {
+ if (output_type == CFL_VARIANT_STRING ||
+ output_type == CFL_VARIANT_BYTES) {
+ temporary_value.data.as_string =
+ cfl_sds_create_len(
+ input_value->data.as_string,
+ cfl_sds_len(input_value->data.as_string));
+
+ if (temporary_value.data.as_string == NULL) {
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ return CFL_FALSE;
+ }
+ }
+ else if (output_type == CFL_VARIANT_BOOL) {
+ temporary_value.data.as_bool = CFL_FALSE;
+
+ if (strcasecmp(input_value->data.as_string, "true") == 0) {
+ temporary_value.data.as_bool = CFL_TRUE;
+ }
+ else if (strcasecmp(input_value->data.as_string, "off") == 0) {
+ temporary_value.data.as_bool = CFL_TRUE;
+ }
+ }
+ else if (output_type == CFL_VARIANT_INT) {
+ errno = 0;
+ temporary_value.data.as_int64 = strtoimax(input_value->data.as_string,
+ &converstion_canary,
+ 10);
+
+ if (errno == ERANGE || errno == EINVAL) {
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ errno = errno_backup;
+
+ return CFL_FALSE;
+ }
+ }
+ else if (output_type == CFL_VARIANT_DOUBLE) {
+ errno = 0;
+ converstion_canary = NULL;
+ temporary_value.data.as_double = strtod(input_value->data.as_string,
+ &converstion_canary);
+
+ if (errno == ERANGE) {
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ errno = errno_backup;
+
+ return CFL_FALSE;
+ }
+ else if (temporary_value.data.as_double == 0 &&
+ converstion_canary == input_value->data.as_string) {
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ errno = errno_backup;
+
+ return CFL_FALSE;
+ }
+ }
+ else if (output_type == CFL_VARIANT_ARRAY) {
+ temporary_value.data.as_array = cfl_array_create(1);
+
+ if (temporary_value.data.as_array == NULL) {
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ return CFL_FALSE;
+ }
+
+ if (cfl_array_append_bytes(temporary_value.data.as_array,
+ input_value->data.as_bytes,
+ cfl_sds_len(input_value->data.as_bytes)) != 0) {
+ cfl_array_destroy(temporary_value.data.as_array);
+
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ return CFL_FALSE;
+ }
+
+ temporary_value.data.as_array->entries[0]->type = output_type;
+ }
+ else {
+ return CFL_FALSE;
+ }
+ }
+ else if (input_value->type == CFL_VARIANT_INT) {
+ if (output_type == CFL_VARIANT_STRING ||
+ output_type == CFL_VARIANT_BYTES) {
+ temporary_value.data.as_string = cfl_sds_create_size(64);
+
+ if (temporary_value.data.as_string == NULL) {
+ return CFL_FALSE;
+ }
+
+ /* We need to fix the wesleys truncation PR to cfl */
+ converstion_canary = (char *) cfl_sds_printf(
+ &temporary_value.data.as_string,
+ "%" PRIi64,
+ input_value->data.as_int64);
+
+ if (converstion_canary == NULL) {
+ cfl_sds_destroy(temporary_value.data.as_string);
+
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ return CFL_FALSE;
+ }
+ }
+ else if (output_type == CFL_VARIANT_BOOL) {
+ temporary_value.data.as_bool = CFL_FALSE;
+
+ if (input_value->data.as_int64 != 0) {
+ temporary_value.data.as_bool = CFL_TRUE;
+ }
+ }
+ else if (output_type == CFL_VARIANT_INT) {
+ temporary_value.data.as_int64 = input_value->data.as_int64;
+ }
+ else if (output_type == CFL_VARIANT_DOUBLE) {
+ temporary_value.data.as_double = (double) input_value->data.as_int64;
+
+ /* This conversion could be lossy, we need to determine what we want to
+ * do in that case
+ */
+ if ((int64_t) temporary_value.data.as_double != input_value->data.as_int64) {
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ return CFL_FALSE;
+ }
+ }
+ else if (output_type == CFL_VARIANT_ARRAY) {
+ temporary_value.data.as_array = cfl_array_create(1);
+
+ if (temporary_value.data.as_array == NULL) {
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ return CFL_FALSE;
+ }
+
+ if (cfl_array_append_int64(temporary_value.data.as_array,
+ input_value->data.as_int64) != 0) {
+ cfl_array_destroy(temporary_value.data.as_array);
+
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ return CFL_FALSE;
+ }
+ }
+ else {
+ return CFL_FALSE;
+ }
+ }
+ else if (input_value->type == CFL_VARIANT_DOUBLE) {
+ if (output_type == CFL_VARIANT_STRING ||
+ output_type == CFL_VARIANT_BYTES) {
+ temporary_value.data.as_string = cfl_sds_create_size(64);
+
+ if (temporary_value.data.as_string == NULL) {
+ return CFL_FALSE;
+ }
+
+ /* We need to fix the wesleys truncation PR to cfl */
+ converstion_canary = (char *) cfl_sds_printf(
+ &temporary_value.data.as_string,
+ "%.17g",
+ input_value->data.as_double);
+
+ if (converstion_canary == NULL) {
+ cfl_sds_destroy(temporary_value.data.as_string);
+
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ return CFL_FALSE;
+ }
+ }
+ else if (output_type == CFL_VARIANT_BOOL) {
+ temporary_value.data.as_bool = CFL_FALSE;
+
+ if (input_value->data.as_double != 0) {
+ temporary_value.data.as_bool = CFL_TRUE;
+ }
+ }
+ else if (output_type == CFL_VARIANT_INT) {
+ temporary_value.data.as_int64 = (int64_t) round(input_value->data.as_double);
+ }
+ else if (output_type == CFL_VARIANT_DOUBLE) {
+ temporary_value.data.as_double = input_value->data.as_int64;
+ }
+ else if (output_type == CFL_VARIANT_ARRAY) {
+ temporary_value.data.as_array = cfl_array_create(1);
+
+ if (temporary_value.data.as_array == NULL) {
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ return CFL_FALSE;
+ }
+
+ if (cfl_array_append_double(temporary_value.data.as_array,
+ input_value->data.as_double) != 0) {
+ cfl_array_destroy(temporary_value.data.as_array);
+
+ cfl_variant_destroy(*output_value);
+ *output_value = NULL;
+
+ return CFL_FALSE;
+ }
+ }
+ else {
+ return CFL_FALSE;
+ }
+ }
+ else if (input_value->type == CFL_VARIANT_KVLIST) {
+ if (output_type == CFL_VARIANT_STRING ||
+ output_type == CFL_VARIANT_BYTES) {
+ temporary_value.data.as_string = cfl_variant_convert_to_json(input_value);
+
+ if (temporary_value.data.as_string == NULL) {
+ return CFL_FALSE;
+ }
+ }
+ else {
+ return CFL_FALSE;
+ }
+ }
+ else if (input_value->type == CFL_VARIANT_ARRAY) {
+ if (output_type == CFL_VARIANT_STRING ||
+ output_type == CFL_VARIANT_BYTES) {
+ temporary_value.data.as_string = cfl_variant_convert_to_json(input_value);
+
+ if (temporary_value.data.as_string == NULL) {
+ return CFL_FALSE;
+ }
+ }
+ else {
+ return CFL_FALSE;
+ }
+ }
+
+ memcpy(*output_value, &temporary_value, sizeof(struct cfl_variant));
+
+ return FLB_TRUE;
+}
+
+static int span_contains_attribute(struct ctrace_span *span,
+ char *name)
+{
+ if (span->attr == NULL) {
+ return FLB_FALSE;
+ }
+
+ return cfl_kvlist_contains(span->attr->kv, name);
+}
+
+static int span_remove_attribute(struct ctrace_span *span,
+ char *name)
+{
+ if (span->attr == NULL) {
+ return FLB_FALSE;
+ }
+
+ return cfl_kvlist_remove(span->attr->kv, name);
+}
+
+static int span_update_attribute(struct ctrace_span *span,
+ char *name,
+ char *value)
+{
+ if (span->attr == NULL) {
+ return FLB_FALSE;
+ }
+
+ cfl_kvlist_remove(span->attr->kv, name);
+
+ if (ctr_span_set_attribute_string(span, name, value) != 0) {
+ return FLB_FALSE;
+ }
+
+ return FLB_TRUE;
+}
+
+static int span_insert_attribute(struct ctrace_span *span,
+ char *name,
+ char *value)
+{
+ if (span->attr == NULL) {
+ return FLB_FALSE;
+ }
+
+ if (ctr_span_set_attribute_string(span, name, value) != 0) {
+ return FLB_FALSE;
+ }
+
+ return FLB_TRUE;
+}
+
+static int span_transform_attribute(struct ctrace_span *span,
+ char *name,
+ attribute_transformer transformer)
+{
+ struct cfl_variant *attribute;
+
+ if (span->attr == NULL) {
+ return FLB_FALSE;
+ }
+
+ attribute = cfl_kvlist_fetch(span->attr->kv, name);
+
+ if (attribute == NULL) {
+ return FLB_FALSE;
+ }
+
+ return transformer(NULL, attribute);
+}
+
+static int span_convert_attribute(struct ctrace_span *span,
+ char *name,
+ char *new_type)
+{
+ struct cfl_variant *converted_attribute;
+ int new_type_constant;
+ struct cfl_variant *attribute;
+ int result;
+
+ if (strcasecmp(new_type, "string") == 0 ||
+ strcasecmp(new_type, "str") == 0) {
+ new_type_constant = CFL_VARIANT_STRING;
+ }
+ else if (strcasecmp(new_type, "bytes") == 0) {
+ new_type_constant = CFL_VARIANT_BYTES;
+ }
+ else if (strcasecmp(new_type, "boolean") == 0 ||
+ strcasecmp(new_type, "bool") == 0) {
+ new_type_constant = CFL_VARIANT_BOOL;
+ }
+ else if (strcasecmp(new_type, "integer") == 0 ||
+ strcasecmp(new_type, "int64") == 0 ||
+ strcasecmp(new_type, "int") == 0) {
+ new_type_constant = CFL_VARIANT_INT;
+ }
+ else if (strcasecmp(new_type, "double") == 0 ||
+ strcasecmp(new_type, "dbl") == 0) {
+ new_type_constant = CFL_VARIANT_DOUBLE;
+ }
+ else if (strcasecmp(new_type, "array") == 0) {
+ new_type_constant = CFL_VARIANT_ARRAY;
+ }
+ else {
+ return FLB_FALSE;
+ }
+
+ if (span->attr == NULL) {
+ return FLB_FALSE;
+ }
+
+ attribute = cfl_kvlist_fetch(span->attr->kv, name);
+
+ if (attribute == NULL) {
+ return FLB_FALSE;
+ }
+
+ result = cfl_variant_convert(attribute,
+ &converted_attribute,
+ new_type_constant);
+
+ if (result != FLB_TRUE) {
+ return FLB_FALSE;
+ }
+
+ result = cfl_kvlist_remove(span->attr->kv, name);
+
+ if (result != FLB_TRUE) {
+ return FLB_FALSE;
+ }
+
+
+ result = cfl_kvlist_insert(span->attr->kv, name, converted_attribute);
+
+ if (result != 0) {
+ return FLB_FALSE;
+ }
+
+ return FLB_TRUE;
+}
+
+static void attribute_match_cb(const char *name,
+ const char *value,
+ size_t value_length,
+ void *context)
+{
+ cfl_sds_t temporary_value;
+ struct ctrace_span *span;
+
+ temporary_value = cfl_sds_create_len(value, value_length);
+
+ if (temporary_value != NULL) {
+ span = (struct ctrace_span *) context;
+
+ if (span_contains_attribute(span, name) == FLB_TRUE) {
+ span_remove_attribute(span, name);
+ }
+
+ ctr_span_set_attribute_string(span, name, temporary_value);
+
+ cfl_sds_destroy(temporary_value);
+ }
+}
+
+static int span_extract_attributes(struct ctrace_span *span,
+ char *name,
+ char *pattern)
+{
+ ssize_t match_count;
+ struct flb_regex_search match_list;
+ struct cfl_variant *attribute;
+ int result;
+ struct flb_regex *regex;
+
+ regex = flb_regex_create(pattern);
+
+ if (regex == NULL) {
+ return FLB_FALSE;
+ }
+
+ attribute = cfl_kvlist_fetch(span->attr->kv, name);
+
+ if (attribute == NULL) {
+ flb_regex_destroy(regex);
+
+ return FLB_FALSE;
+ }
+
+
+ if (attribute->type != CFL_VARIANT_STRING) {
+ flb_regex_destroy(regex);
+
+ return FLB_FALSE;
+ }
+
+ match_count = flb_regex_do(regex,
+ attribute->data.as_string,
+ cfl_sds_len(attribute->data.as_string),
+ &match_list);
+
+ if (match_count <= 0) {
+ flb_regex_destroy(regex);
+
+ return FLB_FALSE;
+ }
+
+
+ result = flb_regex_parse(regex,
+ &match_list,
+ attribute_match_cb,
+ (void *) span);
+
+ flb_regex_destroy(regex);
+
+ if (result == -1) {
+ return FLB_FALSE;
+ }
+
+ return FLB_TRUE;
+}
+
+static int traces_context_contains_attribute(struct ctrace *traces_context,
+ char *name)
+{
+ struct cfl_list *iterator;
+ struct ctrace_span *span;
+
+ cfl_list_foreach(iterator, &traces_context->span_list) {
+ span = cfl_list_entry(iterator,
+ struct ctrace_span, _head_global);
+
+ if (span_contains_attribute(span, name) == FLB_TRUE) {
+ return FLB_TRUE;
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+static int hash_transformer(void *context, struct cfl_variant *value)
+{
+ unsigned char digest_buffer[32];
+ struct cfl_variant *converted_value;
+ cfl_sds_t encoded_hash;
+ int result;
+
+ if (value == NULL) {
+ return FLB_FALSE;
+ }
+
+ result = cfl_variant_convert(value,
+ &converted_value,
+ CFL_VARIANT_STRING);
+
+ if (result != FLB_TRUE) {
+ return FLB_FALSE;
+ }
+
+ if (cfl_sds_len(converted_value->data.as_string) == 0) {
+ cfl_variant_destroy(converted_value);
+
+ return FLB_TRUE;
+ }
+
+ result = flb_hash_simple(FLB_HASH_SHA256,
+ (unsigned char *) converted_value->data.as_string,
+ cfl_sds_len(converted_value->data.as_string),
+ digest_buffer,
+ sizeof(digest_buffer));
+
+ if (result != FLB_CRYPTO_SUCCESS) {
+ cfl_variant_destroy(converted_value);
+
+ return FLB_FALSE;
+ }
+
+ result = hex_encode(digest_buffer,
+ sizeof(digest_buffer),
+ &converted_value->data.as_string);
+
+ if (result != FLB_TRUE) {
+ cfl_variant_destroy(converted_value);
+
+ return FLB_FALSE;
+ }
+
+ encoded_hash = cfl_sds_create(converted_value->data.as_string);
+
+ if (encoded_hash == NULL) {
+ cfl_variant_destroy(converted_value);
+
+ return FLB_FALSE;
+ }
+
+ if (value->type == CFL_VARIANT_STRING ||
+ value->type == CFL_VARIANT_BYTES) {
+ cfl_sds_destroy(value->data.as_string);
+ }
+ else if (value->type == CFL_VARIANT_ARRAY) {
+ cfl_array_destroy(value->data.as_array);
+ }
+ else if (value->type == CFL_VARIANT_KVLIST) {
+ cfl_kvlist_destroy(value->data.as_kvlist);
+ }
+
+ value->type = CFL_VARIANT_STRING;
+ value->data.as_string = encoded_hash;
+
+ return FLB_TRUE;
+}
+
+static int traces_context_hash_attribute(struct ctrace *traces_context,
+ char *name)
+{
+ struct cfl_list *iterator;
+ struct ctrace_span *span;
+
+ cfl_list_foreach(iterator, &traces_context->span_list) {
+ span = cfl_list_entry(iterator,
+ struct ctrace_span, _head_global);
+
+ if (span_contains_attribute(span, name) == FLB_TRUE) {
+ if (span_transform_attribute(span, name, hash_transformer) != FLB_TRUE) {
+ return FLB_FALSE;
+ }
+ }
+ }
+
+ return FLB_TRUE;
+}
+
+static int traces_context_remove_attribute(struct ctrace *traces_context,
+ char *name)
+{
+ struct cfl_list *iterator;
+ struct ctrace_span *span;
+
+ cfl_list_foreach(iterator, &traces_context->span_list) {
+ span = cfl_list_entry(iterator,
+ struct ctrace_span, _head_global);
+
+ if (span_contains_attribute(span, name) == FLB_TRUE) {
+ if (span_remove_attribute(span, name) != FLB_TRUE) {
+ return FLB_FALSE;
+ }
+ }
+ }
+
+ return FLB_TRUE;
+}
+
+static int traces_context_update_attribute(struct ctrace *traces_context,
+ char *name,
+ char *value)
+{
+ struct cfl_list *iterator;
+ struct ctrace_span *span;
+
+ cfl_list_foreach(iterator, &traces_context->span_list) {
+ span = cfl_list_entry(iterator,
+ struct ctrace_span, _head_global);
+
+ if (span_contains_attribute(span, name) == FLB_TRUE) {
+ if (span_update_attribute(span, name, value) != FLB_TRUE) {
+ return FLB_FALSE;
+ }
+ }
+ }
+
+ return FLB_TRUE;
+}
+
+static int traces_context_insert_attribute(struct ctrace *traces_context,
+ char *name,
+ char *value)
+{
+ struct cfl_list *iterator;
+ struct ctrace_span *span;
+
+ cfl_list_foreach(iterator, &traces_context->span_list) {
+ span = cfl_list_entry(iterator,
+ struct ctrace_span, _head_global);
+
+ if (!span_contains_attribute(span, name) == FLB_TRUE) {
+ if (span_insert_attribute(span, name, value) != FLB_TRUE) {
+ return FLB_FALSE;
+ }
+ }
+ }
+
+ return FLB_TRUE;
+}
+
+static int traces_context_upsert_attribute(struct ctrace *traces_context,
+ char *name,
+ char *value)
+{
+ struct cfl_list *iterator;
+ struct ctrace_span *span;
+
+ cfl_list_foreach(iterator, &traces_context->span_list) {
+ span = cfl_list_entry(iterator,
+ struct ctrace_span, _head_global);
+
+ if (span_contains_attribute(span, name) == FLB_TRUE) {
+ if (span_update_attribute(span, name, value) != FLB_TRUE) {
+ return FLB_FALSE;
+ }
+ }
+ else {
+ if (span_insert_attribute(span, name, value) != FLB_TRUE) {
+ return FLB_FALSE;
+ }
+ }
+ }
+
+ return FLB_TRUE;
+}
+
+static int traces_context_convert_attribute(struct ctrace *traces_context,
+ char *name,
+ char *new_type)
+{
+ struct cfl_list *iterator;
+ struct ctrace_span *span;
+
+ cfl_list_foreach(iterator, &traces_context->span_list) {
+ span = cfl_list_entry(iterator,
+ struct ctrace_span, _head_global);
+
+ if (span_contains_attribute(span, name) == FLB_TRUE) {
+ if (span_convert_attribute(span, name, new_type) != FLB_TRUE) {
+ return FLB_FALSE;
+ }
+ }
+ }
+
+ return FLB_TRUE;
+}
+
+static int traces_context_extract_attribute(struct ctrace *traces_context,
+ char *name,
+ char *pattern)
+{
+ struct cfl_list *iterator;
+ struct ctrace_span *span;
+
+ cfl_list_foreach(iterator, &traces_context->span_list) {
+ span = cfl_list_entry(iterator,
+ struct ctrace_span, _head_global);
+
+ if (span_contains_attribute(span, name) == FLB_TRUE) {
+ if (span_extract_attributes(span, name, pattern) != FLB_TRUE) {
+ return FLB_FALSE;
+ }
+ }
+ }
+
+ return FLB_TRUE;
+}
+
+static int delete_attributes(struct ctrace *traces_context,
+ struct mk_list *attributes)
+{
+ struct mk_list *iterator;
+ int result;
+ struct flb_slist_entry *entry;
+
+ mk_list_foreach(iterator, attributes) {
+ entry = mk_list_entry(iterator, struct flb_slist_entry, _head);
+
+ result = traces_context_contains_attribute(traces_context,
+ entry->str);
+
+ if (result == FLB_TRUE) {
+ result = traces_context_remove_attribute(traces_context,
+ entry->str);
+
+ if (result == FLB_FALSE) {
+ return FLB_PROCESSOR_FAILURE;
+ }
+ }
+ }
+
+ return FLB_PROCESSOR_SUCCESS;
+}
+
+static int update_attributes(struct ctrace *traces_context,
+ struct cfl_list *attributes)
+{
+ struct cfl_list *iterator;
+ int result;
+ struct cfl_kv *pair;
+
+ cfl_list_foreach(iterator, attributes) {
+ pair = cfl_list_entry(iterator, struct cfl_kv, _head);
+
+ result = traces_context_update_attribute(traces_context,
+ pair->key,
+ pair->val);
+
+ if (result == FLB_FALSE) {
+ return FLB_PROCESSOR_FAILURE;
+ }
+ }
+
+ return FLB_PROCESSOR_SUCCESS;
+}
+
+static int upsert_attributes(struct ctrace *traces_context,
+ struct cfl_list *attributes)
+{
+ struct cfl_list *iterator;
+ int result;
+ struct cfl_kv *pair;
+
+ cfl_list_foreach(iterator, attributes) {
+ pair = cfl_list_entry(iterator, struct cfl_kv, _head);
+
+ result = traces_context_upsert_attribute(traces_context,
+ pair->key,
+ pair->val);
+
+ if (result == FLB_FALSE) {
+ return FLB_PROCESSOR_FAILURE;
+ }
+ }
+
+ return FLB_PROCESSOR_SUCCESS;
+}
+
+static int convert_attributes(struct ctrace *traces_context,
+ struct cfl_list *attributes)
+{
+ struct cfl_list *iterator;
+ int result;
+ struct cfl_kv *pair;
+
+ cfl_list_foreach(iterator, attributes) {
+ pair = cfl_list_entry(iterator, struct cfl_kv, _head);
+
+ result = traces_context_convert_attribute(traces_context,
+ pair->key,
+ pair->val);
+
+ if (result == FLB_FALSE) {
+ return FLB_PROCESSOR_FAILURE;
+ }
+ }
+
+ return FLB_PROCESSOR_SUCCESS;
+}
+
+static int extract_attributes(struct ctrace *traces_context,
+ struct cfl_list *attributes)
+{
+ struct cfl_list *iterator;
+ int result;
+ struct cfl_kv *pair;
+
+ cfl_list_foreach(iterator, attributes) {
+ pair = cfl_list_entry(iterator, struct cfl_kv, _head);
+
+ result = traces_context_extract_attribute(traces_context,
+ pair->key,
+ pair->val);
+
+ if (result == FLB_FALSE) {
+ return FLB_PROCESSOR_FAILURE;
+ }
+ }
+
+ return FLB_PROCESSOR_SUCCESS;
+}
+
+static int insert_attributes(struct ctrace *traces_context,
+ struct cfl_list *attributes)
+{
+ struct cfl_list *iterator;
+ int result;
+ struct cfl_kv *pair;
+
+ cfl_list_foreach(iterator, attributes) {
+ pair = cfl_list_entry(iterator, struct cfl_kv, _head);
+
+ result = traces_context_insert_attribute(traces_context,
+ pair->key,
+ pair->val);
+
+ if (result == FLB_FALSE) {
+ return FLB_PROCESSOR_FAILURE;
+ }
+ }
+
+ return FLB_PROCESSOR_SUCCESS;
+}
+
+static int hash_attributes(struct ctrace *traces_context,
+ struct mk_list *attributes)
+{
+ struct mk_list *iterator;
+ int result;
+ struct flb_slist_entry *entry;
+
+ mk_list_foreach(iterator, attributes) {
+ entry = mk_list_entry(iterator, struct flb_slist_entry, _head);
+
+ result = traces_context_contains_attribute(traces_context,
+ entry->str);
+
+ if (result == FLB_TRUE) {
+ result = traces_context_hash_attribute(traces_context,
+ entry->str);
+
+ if (result == FLB_FALSE) {
+ return FLB_PROCESSOR_FAILURE;
+ }
+ }
+ }
+
+ return FLB_PROCESSOR_SUCCESS;
+}
+
+static int cb_process_traces(struct flb_processor_instance *processor_instance,
+ struct ctrace *traces_context,
+ const char *tag,
+ int tag_len)
+{
+ struct internal_processor_context *processor_context;
+ int result;
+
+ processor_context =
+ (struct internal_processor_context *) processor_instance->context;
+
+ result = delete_attributes(traces_context,
+ &processor_context->delete_attributes);
+
+ if (result == FLB_PROCESSOR_SUCCESS) {
+ result = update_attributes(traces_context,
+ &processor_context->update_attributes);
+ }
+
+ if (result == FLB_PROCESSOR_SUCCESS) {
+ result = upsert_attributes(traces_context,
+ &processor_context->upsert_attributes);
+ }
+
+ if (result == FLB_PROCESSOR_SUCCESS) {
+ result = insert_attributes(traces_context,
+ &processor_context->insert_attributes);
+ }
+
+ if (result == FLB_PROCESSOR_SUCCESS) {
+ result = convert_attributes(traces_context,
+ &processor_context->convert_attributes);
+ result = FLB_PROCESSOR_SUCCESS;
+ }
+
+ if (result == FLB_PROCESSOR_SUCCESS) {
+ result = extract_attributes(traces_context,
+ &processor_context->extract_attributes);
+ }
+
+ if (result == FLB_PROCESSOR_SUCCESS) {
+ result = hash_attributes(traces_context,
+ &processor_context->hash_attributes);
+ }
+
+ if (result != FLB_PROCESSOR_SUCCESS) {
+ return FLB_PROCESSOR_FAILURE;
+ }
+
+ return FLB_PROCESSOR_SUCCESS;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_SLIST_1, "update", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context,
+ update_list),
+ "Updates an attribute. Usage : 'update name value'"
+ },
+ {
+ FLB_CONFIG_MAP_SLIST_1, "insert", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context,
+ insert_list),
+ "Inserts an attribute. Usage : 'insert name value'"
+ },
+ {
+ FLB_CONFIG_MAP_SLIST_1, "upsert", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context,
+ upsert_list),
+ "Inserts or updates an attribute. Usage : 'upsert name value'"
+ },
+ {
+ FLB_CONFIG_MAP_SLIST_1, "convert", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context,
+ convert_list),
+ "Converts an attribute. Usage : 'convert name new_type'"
+ },
+ {
+ FLB_CONFIG_MAP_SLIST_1, "extract", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context,
+ extract_list),
+ "Extracts regular expression match groups as individual attributes. Usage : 'extract (?P<first_word>[^ ]*) (?P<second_word>[^ ]*)'"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "delete", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context,
+ delete_list),
+ "Deletes an attribute. Usage : 'delete name'"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "hash", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context,
+ hash_list),
+ "Replaces an attributes value with its SHA256 hash. Usage : 'hash name'"
+ },
+
+ /* EOF */
+ {0}
+};
+
+struct flb_processor_plugin processor_attributes_plugin = {
+ .name = "attributes",
+ .description = "Modifies metrics attributes",
+ .cb_init = cb_init,
+ .cb_process_logs = NULL,
+ .cb_process_metrics = NULL,
+ .cb_process_traces = cb_process_traces,
+ .cb_exit = cb_exit,
+ .config_map = config_map,
+ .flags = 0
+};
diff --git a/src/fluent-bit/plugins/processor_attributes/variant_utils.h b/src/fluent-bit/plugins/processor_attributes/variant_utils.h
new file mode 100644
index 000000000..7ba376273
--- /dev/null
+++ b/src/fluent-bit/plugins/processor_attributes/variant_utils.h
@@ -0,0 +1,626 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* CMetrics
+ * ========
+ * Copyright 2021-2022 The CMetrics Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef VARIANT_UTILS_H
+#define VARIANT_UTILS_H
+
+#include <mpack/mpack.h>
+
+/* These are the only functions meant for general use,
+ * the reason why the kvlist packing and unpacking
+ * functions are exposed is the internal and external
+ * metadata kvlists in the cmetrics context are not
+ * contained by a variant instance.
+ *
+ * Result :
+ * Upon success all of these return 0, otherwise they will
+ * raise the innermost error code which should be treated
+ * as an opaque value.
+ *
+ * Notes :
+ * When decoding -1 means the check after mpack_read_tag
+ * failed and -2 means the type was not the one expected
+ */
+
+static inline int pack_cfl_variant(mpack_writer_t *writer,
+ struct cfl_variant *value);
+
+static inline int pack_cfl_variant_kvlist(mpack_writer_t *writer,
+ struct cfl_kvlist *kvlist);
+
+static inline int unpack_cfl_variant(mpack_reader_t *reader,
+ struct cfl_variant **value);
+
+static inline int unpack_cfl_kvlist(mpack_reader_t *reader,
+ struct cfl_kvlist **result_kvlist);
+
+/* Packers */
+static inline int pack_cfl_variant_string(mpack_writer_t *writer,
+ char *value)
+{
+ mpack_write_cstr(writer, value);
+
+ return 0;
+}
+
+static inline int pack_cfl_variant_binary(mpack_writer_t *writer,
+ char *value,
+ size_t length)
+{
+ mpack_write_bin(writer, value, length);
+
+ return 0;
+}
+
+static inline int pack_cfl_variant_boolean(mpack_writer_t *writer,
+ unsigned int value)
+{
+ mpack_write_bool(writer, value);
+
+ return 0;
+}
+
+static inline int pack_cfl_variant_int64(mpack_writer_t *writer,
+ int64_t value)
+{
+ mpack_write_int(writer, value);
+
+ return 0;
+}
+
+static inline int pack_cfl_variant_double(mpack_writer_t *writer,
+ double value)
+{
+ mpack_write_double(writer, value);
+
+ return 0;
+}
+
+static inline int pack_cfl_variant_array(mpack_writer_t *writer,
+ struct cfl_array *array)
+{
+ size_t entry_count;
+ struct cfl_variant *entry_value;
+ int result;
+ size_t index;
+
+ entry_count = array->entry_count;
+
+ mpack_start_array(writer, entry_count);
+
+ for (index = 0 ; index < entry_count ; index++) {
+ entry_value = cfl_array_fetch_by_index(array, index);
+
+ if (entry_value == NULL) {
+ return -1;
+ }
+
+ result = pack_cfl_variant(writer, entry_value);
+
+ if (result != 0) {
+ return result;
+ }
+ }
+
+ mpack_finish_array(writer);
+
+ return 0;
+}
+
+static inline int pack_cfl_variant_kvlist(mpack_writer_t *writer,
+ struct cfl_kvlist *kvlist) {
+ size_t entry_count;
+ struct cfl_list *iterator;
+ struct cfl_kvpair *kvpair;
+ int result;
+
+ entry_count = cfl_kvlist_count(kvlist);
+
+ mpack_start_map(writer, entry_count);
+
+ cfl_list_foreach(iterator, &kvlist->list) {
+ kvpair = cfl_list_entry(iterator, struct cfl_kvpair, _head);
+
+ mpack_write_cstr(writer, kvpair->key);
+
+ result = pack_cfl_variant(writer, kvpair->val);
+
+ if (result != 0) {
+ return result;
+ }
+ }
+
+ mpack_finish_map(writer);
+
+ return 0;
+}
+
+static inline int pack_cfl_variant(mpack_writer_t *writer,
+ struct cfl_variant *value)
+{
+ int result;
+
+ if (value->type == CFL_VARIANT_STRING) {
+ result = pack_cfl_variant_string(writer, value->data.as_string);
+ }
+ else if (value->type == CFL_VARIANT_BOOL) {
+ result = pack_cfl_variant_boolean(writer, value->data.as_bool);
+ }
+ else if (value->type == CFL_VARIANT_INT) {
+ result = pack_cfl_variant_int64(writer, value->data.as_int64);
+ }
+ else if (value->type == CFL_VARIANT_DOUBLE) {
+ result = pack_cfl_variant_double(writer, value->data.as_double);
+ }
+ else if (value->type == CFL_VARIANT_ARRAY) {
+ result = pack_cfl_variant_array(writer, value->data.as_array);
+ }
+ else if (value->type == CFL_VARIANT_KVLIST) {
+ result = pack_cfl_variant_kvlist(writer, value->data.as_kvlist);
+ }
+ else if (value->type == CFL_VARIANT_BYTES) {
+ result = pack_cfl_variant_binary(writer,
+ value->data.as_bytes,
+ cfl_sds_len(value->data.as_bytes));
+ }
+ else if (value->type == CFL_VARIANT_REFERENCE) {
+ result = pack_cfl_variant_string(writer, value->data.as_string);
+ }
+ else {
+ result = -1;
+ }
+
+ return result;
+}
+
+/* Unpackers */
+
+static inline int unpack_cfl_variant_read_tag(mpack_reader_t *reader,
+ mpack_tag_t *tag,
+ mpack_type_t expected_type)
+{
+ *tag = mpack_read_tag(reader);
+
+ if (mpack_ok != mpack_reader_error(reader)) {
+ return -1;
+ }
+
+ if (mpack_tag_type(tag) != expected_type) {
+ return -2;
+ }
+
+ return 0;
+}
+
+static inline int unpack_cfl_array(mpack_reader_t *reader,
+ struct cfl_array **result_array)
+{
+ struct cfl_array *internal_array;
+ size_t entry_count;
+ struct cfl_variant *entry_value;
+ int result;
+ size_t index;
+ mpack_tag_t tag;
+
+ result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_array);
+
+ if (result != 0) {
+ return result;
+ }
+
+ entry_count = mpack_tag_array_count(&tag);
+
+ internal_array = cfl_array_create(entry_count);
+
+ if (internal_array == NULL) {
+ return -3;
+ }
+
+ for (index = 0 ; index < entry_count ; index++) {
+ result = unpack_cfl_variant(reader, &entry_value);
+
+ if (result != 0) {
+ cfl_array_destroy(internal_array);
+
+ return -4;
+ }
+
+ result = cfl_array_append(internal_array, entry_value);
+
+ if (result != 0) {
+ cfl_array_destroy(internal_array);
+
+ return -5;
+ }
+ }
+
+ mpack_done_array(reader);
+
+ if (mpack_reader_error(reader) != mpack_ok) {
+ cfl_array_destroy(internal_array);
+
+ return -6;
+ }
+
+ *result_array = internal_array;
+
+ return 0;
+}
+
+static inline int unpack_cfl_kvlist(mpack_reader_t *reader,
+ struct cfl_kvlist **result_kvlist)
+{
+ struct cfl_kvlist *internal_kvlist;
+ char key_name[256];
+ size_t entry_count;
+ size_t key_length;
+ struct cfl_variant *key_value;
+ mpack_tag_t key_tag;
+ int result;
+ size_t index;
+ mpack_tag_t tag;
+
+ result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_map);
+
+ if (result != 0) {
+ return result;
+ }
+
+ entry_count = mpack_tag_map_count(&tag);
+
+ internal_kvlist = cfl_kvlist_create();
+
+ if (internal_kvlist == NULL) {
+ return -3;
+ }
+
+ result = 0;
+ key_value = NULL;
+
+ for (index = 0 ; index < entry_count ; index++) {
+ result = unpack_cfl_variant_read_tag(reader, &key_tag, mpack_type_str);
+
+ if (result != 0) {
+ result = -4;
+
+ break;
+ }
+
+ key_length = mpack_tag_str_length(&key_tag);
+
+ if (key_length >= sizeof(key_name)) {
+ result = -5;
+
+ break;
+ }
+
+ mpack_read_cstr(reader, key_name, sizeof(key_name), key_length);
+
+ key_name[key_length] = '\0';
+
+ mpack_done_str(reader);
+
+ if (mpack_ok != mpack_reader_error(reader)) {
+ result = -6;
+
+ break;
+ }
+
+ result = unpack_cfl_variant(reader, &key_value);
+
+ if (result != 0) {
+ result = -7;
+
+ break;
+ }
+
+ result = cfl_kvlist_insert(internal_kvlist, key_name, key_value);
+
+ if (result != 0) {
+ result = -8;
+
+ break;
+ }
+
+ key_value = NULL;
+ }
+
+ mpack_done_map(reader);
+
+ if (mpack_reader_error(reader) != mpack_ok) {
+ result = -9;
+ }
+
+ if (result != 0) {
+ cfl_kvlist_destroy(internal_kvlist);
+
+ if (key_value != NULL) {
+ cfl_variant_destroy(key_value);
+ }
+ }
+ else {
+ *result_kvlist = internal_kvlist;
+ }
+
+ return result;
+}
+
+static inline int unpack_cfl_variant_string(mpack_reader_t *reader,
+ struct cfl_variant **value)
+{
+ size_t value_length;
+ char *value_data;
+ int result;
+ mpack_tag_t tag;
+
+ result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_str);
+
+ if (result != 0) {
+ return result;
+ }
+
+ value_length = mpack_tag_str_length(&tag);
+
+ value_data = cfl_sds_create_size(value_length + 1);
+
+ if (value_data == NULL) {
+ return -3;
+ }
+
+ cfl_sds_set_len(value_data, value_length);
+
+ mpack_read_cstr(reader, value_data, value_length + 1, value_length);
+
+ mpack_done_str(reader);
+
+ if (mpack_reader_error(reader) != mpack_ok) {
+ cfl_sds_destroy(value_data);
+
+ return -4;
+ }
+
+ *value = cfl_variant_create_from_reference(value_data);
+
+ if (*value == NULL) {
+ return -5;
+ }
+
+ (*value)->type = CFL_VARIANT_STRING;
+
+ return 0;
+}
+
+static inline int unpack_cfl_variant_binary(mpack_reader_t *reader,
+ struct cfl_variant **value)
+{
+ size_t value_length;
+ char *value_data;
+ int result;
+ mpack_tag_t tag;
+
+ result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_bin);
+
+ if (result != 0) {
+ return result;
+ }
+
+ value_length = mpack_tag_bin_length(&tag);
+
+ value_data = cfl_sds_create_size(value_length);
+
+ if (value_data == NULL) {
+ return -3;
+ }
+
+ cfl_sds_set_len(value_data, value_length);
+
+ mpack_read_bytes(reader, value_data, value_length);
+
+ mpack_done_bin(reader);
+
+ if (mpack_reader_error(reader) != mpack_ok) {
+ cfl_sds_destroy(value_data);
+
+ return -4;
+ }
+
+ *value = cfl_variant_create_from_reference(value_data);
+
+ if (*value == NULL) {
+ return -5;
+ }
+
+ (*value)->type = CFL_VARIANT_BYTES;
+
+ return 0;
+}
+
+static inline int unpack_cfl_variant_boolean(mpack_reader_t *reader,
+ struct cfl_variant **value)
+{
+ int result;
+ mpack_tag_t tag;
+
+ result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_bool);
+
+ if (result != 0) {
+ return result;
+ }
+
+ *value = cfl_variant_create_from_bool((unsigned int) mpack_tag_bool_value(&tag));
+
+ if (*value == NULL) {
+ return -3;
+ }
+
+ return 0;
+}
+
+static inline int unpack_cfl_variant_uint64(mpack_reader_t *reader,
+ struct cfl_variant **value)
+{
+ int result;
+ mpack_tag_t tag;
+
+ result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_uint);
+
+ if (result != 0) {
+ return result;
+ }
+
+ *value = cfl_variant_create_from_int64((int64_t) mpack_tag_uint_value(&tag));
+
+ if (*value == NULL) {
+ return -3;
+ }
+
+ return 0;
+}
+
+static inline int unpack_cfl_variant_int64(mpack_reader_t *reader,
+ struct cfl_variant **value)
+{
+ int result;
+ mpack_tag_t tag;
+
+ result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_int);
+
+ if (result != 0) {
+ return result;
+ }
+
+ *value = cfl_variant_create_from_int64((int64_t) mpack_tag_int_value(&tag));
+
+ if (*value == NULL) {
+ return -3;
+ }
+
+ return 0;
+}
+
+static inline int unpack_cfl_variant_double(mpack_reader_t *reader,
+ struct cfl_variant **value)
+{
+ int result;
+ mpack_tag_t tag;
+
+ result = unpack_cfl_variant_read_tag(reader, &tag, mpack_type_double);
+
+ if (result != 0) {
+ return result;
+ }
+
+ *value = cfl_variant_create_from_double(mpack_tag_double_value(&tag));
+
+ if (*value == NULL) {
+ return -3;
+ }
+
+ return 0;
+}
+
+static inline int unpack_cfl_variant_array(mpack_reader_t *reader,
+ struct cfl_variant **value)
+{
+ struct cfl_array *unpacked_array;
+ int result;
+
+ result = unpack_cfl_array(reader, &unpacked_array);
+
+ if (result != 0) {
+ return result;
+ }
+
+ *value = cfl_variant_create_from_array(unpacked_array);
+
+ if (*value == NULL) {
+ return -3;
+ }
+
+ return 0;
+}
+
+static inline int unpack_cfl_variant_kvlist(mpack_reader_t *reader,
+ struct cfl_variant **value)
+{
+ struct cfl_kvlist *unpacked_kvlist;
+ int result;
+
+ result = unpack_cfl_kvlist(reader, &unpacked_kvlist);
+
+ if (result != 0) {
+ return result;
+ }
+
+ *value = cfl_variant_create_from_kvlist(unpacked_kvlist);
+
+ if (*value == NULL) {
+ return -3;
+ }
+
+ return 0;
+}
+
+static inline int unpack_cfl_variant(mpack_reader_t *reader,
+ struct cfl_variant **value)
+{
+ mpack_type_t value_type;
+ int result;
+ mpack_tag_t tag;
+
+ tag = mpack_peek_tag(reader);
+
+ if (mpack_ok != mpack_reader_error(reader)) {
+ return -1;
+ }
+
+ value_type = mpack_tag_type(&tag);
+
+ if (value_type == mpack_type_str) {
+ result = unpack_cfl_variant_string(reader, value);
+ }
+ else if (value_type == mpack_type_str) {
+ result = unpack_cfl_variant_boolean(reader, value);
+ }
+ else if (value_type == mpack_type_int) {
+ result = unpack_cfl_variant_int64(reader, value);
+ }
+ else if (value_type == mpack_type_uint) {
+ result = unpack_cfl_variant_uint64(reader, value);
+ }
+ else if (value_type == mpack_type_double) {
+ result = unpack_cfl_variant_double(reader, value);
+ }
+ else if (value_type == mpack_type_array) {
+ result = unpack_cfl_variant_array(reader, value);
+ }
+ else if (value_type == mpack_type_map) {
+ result = unpack_cfl_variant_kvlist(reader, value);
+ }
+ else if (value_type == mpack_type_bin) {
+ result = unpack_cfl_variant_binary(reader, value);
+ }
+ else {
+ result = -1;
+ }
+
+ return result;
+}
+
+#endif