diff options
Diffstat (limited to 'fluent-bit/plugins/processor_attributes/attributes.c')
-rw-r--r-- | fluent-bit/plugins/processor_attributes/attributes.c | 1408 |
1 files changed, 1408 insertions, 0 deletions
diff --git a/fluent-bit/plugins/processor_attributes/attributes.c b/fluent-bit/plugins/processor_attributes/attributes.c new file mode 100644 index 00000000..a59c07ae --- /dev/null +++ b/fluent-bit/plugins/processor_attributes/attributes.c @@ -0,0 +1,1408 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <math.h> + +#include <fluent-bit/flb_regex.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_processor_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_hash.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_processor.h> +#include <fluent-bit/flb_log_event_decoder.h> +#include <fluent-bit/flb_log_event_encoder.h> + +#include <cmetrics/cmetrics.h> +#include <cmetrics/cmt_histogram.h> +#include <cmetrics/cmt_summary.h> +#include <cmetrics/cmt_untyped.h> +#include <cmetrics/cmt_counter.h> +#include <cmetrics/cmt_gauge.h> +#include <cmetrics/cmt_map.h> + +#include <cfl/cfl.h> + +#include "variant_utils.h" + +typedef int (*attribute_transformer)(void *, struct cfl_variant *value); + +struct internal_processor_context { + struct mk_list *update_list; + struct mk_list *insert_list; + struct mk_list *upsert_list; + struct mk_list *convert_list; + struct mk_list *extract_list; + struct mk_list *delete_list; + struct mk_list *hash_list; + + /* internal attributes ready to append */ + struct cfl_list update_attributes; + struct cfl_list insert_attributes; + struct cfl_list upsert_attributes; + struct cfl_list convert_attributes; + struct cfl_list extract_attributes; + struct mk_list delete_attributes; + struct mk_list hash_attributes; + + struct flb_processor_instance *instance; + struct flb_config *config; +}; + +/* + * LOCAL + */ +static int hex_encode(unsigned char *input_buffer, + size_t input_length, + cfl_sds_t *output_buffer) +{ + const char hex[] = "0123456789abcdef"; + cfl_sds_t result; + size_t index; + + if (cfl_sds_alloc(*output_buffer) <= (input_length * 2)) { + result = cfl_sds_increase(*output_buffer, + (input_length * 2) - + cfl_sds_alloc(*output_buffer)); + + if (result == NULL) { + return FLB_FALSE; + } + + *output_buffer = result; + } + + for (index = 0; index < input_length; index++) { + (*output_buffer)[index * 2 + 0] = hex[(input_buffer[index] >> 4) & 0xF]; + (*output_buffer)[index * 2 + 1] = hex[(input_buffer[index] >> 0) & 0xF]; + } + + cfl_sds_set_len(*output_buffer, input_length * 2); + + (*output_buffer)[index * 2] = '\0'; + + return FLB_TRUE; +} + +static int process_attribute_modification_list_setting( + struct flb_processor_instance *plugin_instance, + const char *setting_name, + struct mk_list *source_list, + struct mk_list *destination_list) +{ + struct flb_config_map_val *source_entry; + struct mk_list *iterator; + int result; + + if (source_list == NULL || + mk_list_is_empty(source_list) == 0) { + + return 0; + } + + flb_config_map_foreach(iterator, source_entry, source_list) { + result = flb_slist_add(destination_list, source_entry->val.str); + + if (result != 0) { + flb_plg_error(plugin_instance, + "could not append attribute name %s\n", + source_entry->val.str); + + return -1; + } + } + + return 0; +} + +static int process_attribute_modification_kvlist_setting( + struct flb_processor_instance *plugin_instance, + const char *setting_name, + struct mk_list *source_list, + struct cfl_list *destination_list) +{ + struct cfl_kv *processed_pair; + struct flb_config_map_val *source_entry; + struct mk_list *iterator; + struct flb_slist_entry *value; + struct flb_slist_entry *key; + + if (source_list == NULL || + mk_list_is_empty(source_list) == 0) { + + return 0; + } + + flb_config_map_foreach(iterator, source_entry, source_list) { + if (mk_list_size(source_entry->val.list) != 2) { + flb_plg_error(plugin_instance, + "'%s' expects a key and a value, " + "e.g: '%s version 1.8.0'", + setting_name, setting_name); + + return -1; + } + + key = mk_list_entry_first(source_entry->val.list, + struct flb_slist_entry, _head); + + value = mk_list_entry_last(source_entry->val.list, + struct flb_slist_entry, _head); + + processed_pair = cfl_kv_item_create(destination_list, + key->str, + value->str); + + if (processed_pair == NULL) { + flb_plg_error(plugin_instance, + "could not append attribute %s=%s\n", + key->str, + value->str); + + return -1; + } + } + + return 0; +} + +static void destroy_context(struct internal_processor_context *context) +{ + if (context != NULL) { + cfl_kv_release(&context->update_attributes); + cfl_kv_release(&context->insert_attributes); + cfl_kv_release(&context->upsert_attributes); + cfl_kv_release(&context->convert_attributes); + cfl_kv_release(&context->extract_attributes); + flb_slist_destroy(&context->delete_attributes); + flb_slist_destroy(&context->hash_attributes); + + flb_free(context); + } +} + +static struct internal_processor_context * + create_context(struct flb_processor_instance *processor_instance, + struct flb_config *config) +{ + struct internal_processor_context *context; + int result; + + context = flb_calloc(1, sizeof(struct internal_processor_context)); + + if (context != NULL) { + context->instance = processor_instance; + context->config = config; + + cfl_kv_init(&context->update_attributes); + cfl_kv_init(&context->insert_attributes); + cfl_kv_init(&context->upsert_attributes); + cfl_kv_init(&context->convert_attributes); + cfl_kv_init(&context->extract_attributes); + flb_slist_create(&context->delete_attributes); + flb_slist_create(&context->hash_attributes); + + result = flb_processor_instance_config_map_set(processor_instance, (void *) context); + + if (result == 0) { + result = process_attribute_modification_kvlist_setting( + processor_instance, + "update", + context->update_list, + &context->update_attributes); + } + + if (result == 0) { + result = process_attribute_modification_kvlist_setting( + processor_instance, + "insert", + context->insert_list, + &context->insert_attributes); + } + + if (result == 0) { + result = process_attribute_modification_kvlist_setting( + processor_instance, + "convert", + context->convert_list, + &context->convert_attributes); + } + + if (result == 0) { + result = process_attribute_modification_kvlist_setting( + processor_instance, + "extract", + context->extract_list, + &context->extract_attributes); + } + + if (result == 0) { + result = process_attribute_modification_kvlist_setting( + processor_instance, + "upsert", + context->upsert_list, + &context->upsert_attributes); + } + + if (result == 0) { + result = process_attribute_modification_list_setting( + processor_instance, + "delete", + context->delete_list, + &context->delete_attributes); + } + + if (result == 0) { + result = process_attribute_modification_list_setting( + processor_instance, + "hash", + context->hash_list, + &context->hash_attributes); + } + + if (result != 0) { + destroy_context(context); + + context = NULL; + } + } + else { + flb_errno(); + } + + return context; +} + +static int cb_init(struct flb_processor_instance *processor_instance, + void *source_plugin_instance, + int source_plugin_type, + struct flb_config *config) +{ + processor_instance->context = (void *) create_context( + processor_instance, config); + + if (processor_instance->context == NULL) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + + +static int cb_exit(struct flb_processor_instance *processor_instance) +{ + if (processor_instance != NULL && + processor_instance->context != NULL) { + destroy_context(processor_instance->context); + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int cfl_kvlist_contains(struct cfl_kvlist *kvlist, + char *name) +{ + struct cfl_list *iterator; + struct cfl_kvpair *pair; + + cfl_list_foreach(iterator, &kvlist->list) { + pair = cfl_list_entry(iterator, + struct cfl_kvpair, _head); + + if (strcasecmp(pair->key, name) == 0) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static void cfl_kvpair_destroy(struct cfl_kvpair *pair) +{ + if (pair != NULL) { + if (!cfl_list_entry_is_orphan(&pair->_head)) { + cfl_list_del(&pair->_head); + } + + if (pair->key != NULL) { + cfl_sds_destroy(pair->key); + } + + if (pair->val != NULL) { + cfl_variant_destroy(pair->val); + } + + free(pair); + } +} + +static int cfl_kvlist_remove(struct cfl_kvlist *kvlist, + char *name) +{ + struct cfl_list *iterator_backup; + struct cfl_list *iterator; + struct cfl_kvpair *pair; + + cfl_list_foreach_safe(iterator, iterator_backup, &kvlist->list) { + pair = cfl_list_entry(iterator, + struct cfl_kvpair, _head); + + if (strcasecmp(pair->key, name) == 0) { + cfl_kvpair_destroy(pair); + } + } + + return FLB_TRUE; +} + + +/* local declarations */ + + +static cfl_sds_t cfl_variant_convert_to_json(struct cfl_variant *value) +{ + cfl_sds_t json_result; + mpack_writer_t writer; + char *data; + size_t size; + + data = NULL; + size = 0; + + mpack_writer_init_growable(&writer, &data, &size); + + pack_cfl_variant(&writer, value); + + mpack_writer_destroy(&writer); + + json_result = flb_msgpack_raw_to_json_sds(data, size); + + return json_result; +} + + + +static int cfl_variant_convert(struct cfl_variant *input_value, + struct cfl_variant **output_value, + int output_type) +{ + char *converstion_canary; + struct cfl_variant temporary_value; + int errno_backup; + + errno_backup = errno; + *output_value = cfl_variant_create(); + + memset(&temporary_value, 0, sizeof(struct cfl_variant)); + + temporary_value.type = output_type; + + if (input_value->type == CFL_VARIANT_STRING || + input_value->type == CFL_VARIANT_BYTES || + input_value->type == CFL_VARIANT_REFERENCE) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + temporary_value.data.as_string = + cfl_sds_create_len( + input_value->data.as_string, + cfl_sds_len(input_value->data.as_string)); + + if (temporary_value.data.as_string == NULL) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_BOOL) { + temporary_value.data.as_bool = CFL_FALSE; + + if (strcasecmp(input_value->data.as_string, "true") == 0) { + temporary_value.data.as_bool = CFL_TRUE; + } + else if (strcasecmp(input_value->data.as_string, "off") == 0) { + temporary_value.data.as_bool = CFL_TRUE; + } + } + else if (output_type == CFL_VARIANT_INT) { + errno = 0; + temporary_value.data.as_int64 = strtoimax(input_value->data.as_string, + &converstion_canary, + 10); + + if (errno == ERANGE || errno == EINVAL) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + errno = errno_backup; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_DOUBLE) { + errno = 0; + converstion_canary = NULL; + temporary_value.data.as_double = strtod(input_value->data.as_string, + &converstion_canary); + + if (errno == ERANGE) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + errno = errno_backup; + + return CFL_FALSE; + } + else if (temporary_value.data.as_double == 0 && + converstion_canary == input_value->data.as_string) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + errno = errno_backup; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_ARRAY) { + temporary_value.data.as_array = cfl_array_create(1); + + if (temporary_value.data.as_array == NULL) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + + if (cfl_array_append_bytes(temporary_value.data.as_array, + input_value->data.as_bytes, + cfl_sds_len(input_value->data.as_bytes)) != 0) { + cfl_array_destroy(temporary_value.data.as_array); + + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + + temporary_value.data.as_array->entries[0]->type = output_type; + } + else { + return CFL_FALSE; + } + } + else if (input_value->type == CFL_VARIANT_INT) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + temporary_value.data.as_string = cfl_sds_create_size(64); + + if (temporary_value.data.as_string == NULL) { + return CFL_FALSE; + } + + /* We need to fix the wesleys truncation PR to cfl */ + converstion_canary = (char *) cfl_sds_printf( + &temporary_value.data.as_string, + "%" PRIi64, + input_value->data.as_int64); + + if (converstion_canary == NULL) { + cfl_sds_destroy(temporary_value.data.as_string); + + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_BOOL) { + temporary_value.data.as_bool = CFL_FALSE; + + if (input_value->data.as_int64 != 0) { + temporary_value.data.as_bool = CFL_TRUE; + } + } + else if (output_type == CFL_VARIANT_INT) { + temporary_value.data.as_int64 = input_value->data.as_int64; + } + else if (output_type == CFL_VARIANT_DOUBLE) { + temporary_value.data.as_double = (double) input_value->data.as_int64; + + /* This conversion could be lossy, we need to determine what we want to + * do in that case + */ + if ((int64_t) temporary_value.data.as_double != input_value->data.as_int64) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_ARRAY) { + temporary_value.data.as_array = cfl_array_create(1); + + if (temporary_value.data.as_array == NULL) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + + if (cfl_array_append_int64(temporary_value.data.as_array, + input_value->data.as_int64) != 0) { + cfl_array_destroy(temporary_value.data.as_array); + + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else { + return CFL_FALSE; + } + } + else if (input_value->type == CFL_VARIANT_DOUBLE) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + temporary_value.data.as_string = cfl_sds_create_size(64); + + if (temporary_value.data.as_string == NULL) { + return CFL_FALSE; + } + + /* We need to fix the wesleys truncation PR to cfl */ + converstion_canary = (char *) cfl_sds_printf( + &temporary_value.data.as_string, + "%.17g", + input_value->data.as_double); + + if (converstion_canary == NULL) { + cfl_sds_destroy(temporary_value.data.as_string); + + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_BOOL) { + temporary_value.data.as_bool = CFL_FALSE; + + if (input_value->data.as_double != 0) { + temporary_value.data.as_bool = CFL_TRUE; + } + } + else if (output_type == CFL_VARIANT_INT) { + temporary_value.data.as_int64 = (int64_t) round(input_value->data.as_double); + } + else if (output_type == CFL_VARIANT_DOUBLE) { + temporary_value.data.as_double = input_value->data.as_int64; + } + else if (output_type == CFL_VARIANT_ARRAY) { + temporary_value.data.as_array = cfl_array_create(1); + + if (temporary_value.data.as_array == NULL) { + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + + if (cfl_array_append_double(temporary_value.data.as_array, + input_value->data.as_double) != 0) { + cfl_array_destroy(temporary_value.data.as_array); + + cfl_variant_destroy(*output_value); + *output_value = NULL; + + return CFL_FALSE; + } + } + else { + return CFL_FALSE; + } + } + else if (input_value->type == CFL_VARIANT_KVLIST) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + temporary_value.data.as_string = cfl_variant_convert_to_json(input_value); + + if (temporary_value.data.as_string == NULL) { + return CFL_FALSE; + } + } + else { + return CFL_FALSE; + } + } + else if (input_value->type == CFL_VARIANT_ARRAY) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + temporary_value.data.as_string = cfl_variant_convert_to_json(input_value); + + if (temporary_value.data.as_string == NULL) { + return CFL_FALSE; + } + } + else { + return CFL_FALSE; + } + } + + memcpy(*output_value, &temporary_value, sizeof(struct cfl_variant)); + + return FLB_TRUE; +} + +static int span_contains_attribute(struct ctrace_span *span, + char *name) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + return cfl_kvlist_contains(span->attr->kv, name); +} + +static int span_remove_attribute(struct ctrace_span *span, + char *name) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + return cfl_kvlist_remove(span->attr->kv, name); +} + +static int span_update_attribute(struct ctrace_span *span, + char *name, + char *value) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + cfl_kvlist_remove(span->attr->kv, name); + + if (ctr_span_set_attribute_string(span, name, value) != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int span_insert_attribute(struct ctrace_span *span, + char *name, + char *value) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + if (ctr_span_set_attribute_string(span, name, value) != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int span_transform_attribute(struct ctrace_span *span, + char *name, + attribute_transformer transformer) +{ + struct cfl_variant *attribute; + + if (span->attr == NULL) { + return FLB_FALSE; + } + + attribute = cfl_kvlist_fetch(span->attr->kv, name); + + if (attribute == NULL) { + return FLB_FALSE; + } + + return transformer(NULL, attribute); +} + +static int span_convert_attribute(struct ctrace_span *span, + char *name, + char *new_type) +{ + struct cfl_variant *converted_attribute; + int new_type_constant; + struct cfl_variant *attribute; + int result; + + if (strcasecmp(new_type, "string") == 0 || + strcasecmp(new_type, "str") == 0) { + new_type_constant = CFL_VARIANT_STRING; + } + else if (strcasecmp(new_type, "bytes") == 0) { + new_type_constant = CFL_VARIANT_BYTES; + } + else if (strcasecmp(new_type, "boolean") == 0 || + strcasecmp(new_type, "bool") == 0) { + new_type_constant = CFL_VARIANT_BOOL; + } + else if (strcasecmp(new_type, "integer") == 0 || + strcasecmp(new_type, "int64") == 0 || + strcasecmp(new_type, "int") == 0) { + new_type_constant = CFL_VARIANT_INT; + } + else if (strcasecmp(new_type, "double") == 0 || + strcasecmp(new_type, "dbl") == 0) { + new_type_constant = CFL_VARIANT_DOUBLE; + } + else if (strcasecmp(new_type, "array") == 0) { + new_type_constant = CFL_VARIANT_ARRAY; + } + else { + return FLB_FALSE; + } + + if (span->attr == NULL) { + return FLB_FALSE; + } + + attribute = cfl_kvlist_fetch(span->attr->kv, name); + + if (attribute == NULL) { + return FLB_FALSE; + } + + result = cfl_variant_convert(attribute, + &converted_attribute, + new_type_constant); + + if (result != FLB_TRUE) { + return FLB_FALSE; + } + + result = cfl_kvlist_remove(span->attr->kv, name); + + if (result != FLB_TRUE) { + return FLB_FALSE; + } + + + result = cfl_kvlist_insert(span->attr->kv, name, converted_attribute); + + if (result != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static void attribute_match_cb(const char *name, + const char *value, + size_t value_length, + void *context) +{ + cfl_sds_t temporary_value; + struct ctrace_span *span; + + temporary_value = cfl_sds_create_len(value, value_length); + + if (temporary_value != NULL) { + span = (struct ctrace_span *) context; + + if (span_contains_attribute(span, name) == FLB_TRUE) { + span_remove_attribute(span, name); + } + + ctr_span_set_attribute_string(span, name, temporary_value); + + cfl_sds_destroy(temporary_value); + } +} + +static int span_extract_attributes(struct ctrace_span *span, + char *name, + char *pattern) +{ + ssize_t match_count; + struct flb_regex_search match_list; + struct cfl_variant *attribute; + int result; + struct flb_regex *regex; + + regex = flb_regex_create(pattern); + + if (regex == NULL) { + return FLB_FALSE; + } + + attribute = cfl_kvlist_fetch(span->attr->kv, name); + + if (attribute == NULL) { + flb_regex_destroy(regex); + + return FLB_FALSE; + } + + + if (attribute->type != CFL_VARIANT_STRING) { + flb_regex_destroy(regex); + + return FLB_FALSE; + } + + match_count = flb_regex_do(regex, + attribute->data.as_string, + cfl_sds_len(attribute->data.as_string), + &match_list); + + if (match_count <= 0) { + flb_regex_destroy(regex); + + return FLB_FALSE; + } + + + result = flb_regex_parse(regex, + &match_list, + attribute_match_cb, + (void *) span); + + flb_regex_destroy(regex); + + if (result == -1) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int traces_context_contains_attribute(struct ctrace *traces_context, + char *name) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int hash_transformer(void *context, struct cfl_variant *value) +{ + unsigned char digest_buffer[32]; + struct cfl_variant *converted_value; + cfl_sds_t encoded_hash; + int result; + + if (value == NULL) { + return FLB_FALSE; + } + + result = cfl_variant_convert(value, + &converted_value, + CFL_VARIANT_STRING); + + if (result != FLB_TRUE) { + return FLB_FALSE; + } + + if (cfl_sds_len(converted_value->data.as_string) == 0) { + cfl_variant_destroy(converted_value); + + return FLB_TRUE; + } + + result = flb_hash_simple(FLB_HASH_SHA256, + (unsigned char *) converted_value->data.as_string, + cfl_sds_len(converted_value->data.as_string), + digest_buffer, + sizeof(digest_buffer)); + + if (result != FLB_CRYPTO_SUCCESS) { + cfl_variant_destroy(converted_value); + + return FLB_FALSE; + } + + result = hex_encode(digest_buffer, + sizeof(digest_buffer), + &converted_value->data.as_string); + + if (result != FLB_TRUE) { + cfl_variant_destroy(converted_value); + + return FLB_FALSE; + } + + encoded_hash = cfl_sds_create(converted_value->data.as_string); + + if (encoded_hash == NULL) { + cfl_variant_destroy(converted_value); + + return FLB_FALSE; + } + + if (value->type == CFL_VARIANT_STRING || + value->type == CFL_VARIANT_BYTES) { + cfl_sds_destroy(value->data.as_string); + } + else if (value->type == CFL_VARIANT_ARRAY) { + cfl_array_destroy(value->data.as_array); + } + else if (value->type == CFL_VARIANT_KVLIST) { + cfl_kvlist_destroy(value->data.as_kvlist); + } + + value->type = CFL_VARIANT_STRING; + value->data.as_string = encoded_hash; + + return FLB_TRUE; +} + +static int traces_context_hash_attribute(struct ctrace *traces_context, + char *name) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_transform_attribute(span, name, hash_transformer) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_remove_attribute(struct ctrace *traces_context, + char *name) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_remove_attribute(span, name) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_update_attribute(struct ctrace *traces_context, + char *name, + char *value) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_update_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_insert_attribute(struct ctrace *traces_context, + char *name, + char *value) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (!span_contains_attribute(span, name) == FLB_TRUE) { + if (span_insert_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_upsert_attribute(struct ctrace *traces_context, + char *name, + char *value) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_update_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + else { + if (span_insert_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_convert_attribute(struct ctrace *traces_context, + char *name, + char *new_type) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_convert_attribute(span, name, new_type) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_extract_attribute(struct ctrace *traces_context, + char *name, + char *pattern) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_extract_attributes(span, name, pattern) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int delete_attributes(struct ctrace *traces_context, + struct mk_list *attributes) +{ + struct mk_list *iterator; + int result; + struct flb_slist_entry *entry; + + mk_list_foreach(iterator, attributes) { + entry = mk_list_entry(iterator, struct flb_slist_entry, _head); + + result = traces_context_contains_attribute(traces_context, + entry->str); + + if (result == FLB_TRUE) { + result = traces_context_remove_attribute(traces_context, + entry->str); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int update_attributes(struct ctrace *traces_context, + struct cfl_list *attributes) +{ + struct cfl_list *iterator; + int result; + struct cfl_kv *pair; + + cfl_list_foreach(iterator, attributes) { + pair = cfl_list_entry(iterator, struct cfl_kv, _head); + + result = traces_context_update_attribute(traces_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int upsert_attributes(struct ctrace *traces_context, + struct cfl_list *attributes) +{ + struct cfl_list *iterator; + int result; + struct cfl_kv *pair; + + cfl_list_foreach(iterator, attributes) { + pair = cfl_list_entry(iterator, struct cfl_kv, _head); + + result = traces_context_upsert_attribute(traces_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int convert_attributes(struct ctrace *traces_context, + struct cfl_list *attributes) +{ + struct cfl_list *iterator; + int result; + struct cfl_kv *pair; + + cfl_list_foreach(iterator, attributes) { + pair = cfl_list_entry(iterator, struct cfl_kv, _head); + + result = traces_context_convert_attribute(traces_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int extract_attributes(struct ctrace *traces_context, + struct cfl_list *attributes) +{ + struct cfl_list *iterator; + int result; + struct cfl_kv *pair; + + cfl_list_foreach(iterator, attributes) { + pair = cfl_list_entry(iterator, struct cfl_kv, _head); + + result = traces_context_extract_attribute(traces_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int insert_attributes(struct ctrace *traces_context, + struct cfl_list *attributes) +{ + struct cfl_list *iterator; + int result; + struct cfl_kv *pair; + + cfl_list_foreach(iterator, attributes) { + pair = cfl_list_entry(iterator, struct cfl_kv, _head); + + result = traces_context_insert_attribute(traces_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int hash_attributes(struct ctrace *traces_context, + struct mk_list *attributes) +{ + struct mk_list *iterator; + int result; + struct flb_slist_entry *entry; + + mk_list_foreach(iterator, attributes) { + entry = mk_list_entry(iterator, struct flb_slist_entry, _head); + + result = traces_context_contains_attribute(traces_context, + entry->str); + + if (result == FLB_TRUE) { + result = traces_context_hash_attribute(traces_context, + entry->str); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_process_traces(struct flb_processor_instance *processor_instance, + struct ctrace *traces_context, + const char *tag, + int tag_len) +{ + struct internal_processor_context *processor_context; + int result; + + processor_context = + (struct internal_processor_context *) processor_instance->context; + + result = delete_attributes(traces_context, + &processor_context->delete_attributes); + + if (result == FLB_PROCESSOR_SUCCESS) { + result = update_attributes(traces_context, + &processor_context->update_attributes); + } + + if (result == FLB_PROCESSOR_SUCCESS) { + result = upsert_attributes(traces_context, + &processor_context->upsert_attributes); + } + + if (result == FLB_PROCESSOR_SUCCESS) { + result = insert_attributes(traces_context, + &processor_context->insert_attributes); + } + + if (result == FLB_PROCESSOR_SUCCESS) { + result = convert_attributes(traces_context, + &processor_context->convert_attributes); + result = FLB_PROCESSOR_SUCCESS; + } + + if (result == FLB_PROCESSOR_SUCCESS) { + result = extract_attributes(traces_context, + &processor_context->extract_attributes); + } + + if (result == FLB_PROCESSOR_SUCCESS) { + result = hash_attributes(traces_context, + &processor_context->hash_attributes); + } + + if (result != FLB_PROCESSOR_SUCCESS) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_SLIST_1, "update", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + update_list), + "Updates an attribute. Usage : 'update name value'" + }, + { + FLB_CONFIG_MAP_SLIST_1, "insert", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + insert_list), + "Inserts an attribute. Usage : 'insert name value'" + }, + { + FLB_CONFIG_MAP_SLIST_1, "upsert", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + upsert_list), + "Inserts or updates an attribute. Usage : 'upsert name value'" + }, + { + FLB_CONFIG_MAP_SLIST_1, "convert", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + convert_list), + "Converts an attribute. Usage : 'convert name new_type'" + }, + { + FLB_CONFIG_MAP_SLIST_1, "extract", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + extract_list), + "Extracts regular expression match groups as individual attributes. Usage : 'extract (?P<first_word>[^ ]*) (?P<second_word>[^ ]*)'" + }, + { + FLB_CONFIG_MAP_STR, "delete", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + delete_list), + "Deletes an attribute. Usage : 'delete name'" + }, + { + FLB_CONFIG_MAP_STR, "hash", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, + hash_list), + "Replaces an attributes value with its SHA256 hash. Usage : 'hash name'" + }, + + /* EOF */ + {0} +}; + +struct flb_processor_plugin processor_attributes_plugin = { + .name = "attributes", + .description = "Modifies metrics attributes", + .cb_init = cb_init, + .cb_process_logs = NULL, + .cb_process_metrics = NULL, + .cb_process_traces = cb_process_traces, + .cb_exit = cb_exit, + .config_map = config_map, + .flags = 0 +}; |