diff options
Diffstat (limited to 'src/fluent-bit/plugins/filter_record_modifier')
3 files changed, 603 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_record_modifier/CMakeLists.txt b/src/fluent-bit/plugins/filter_record_modifier/CMakeLists.txt new file mode 100644 index 000000000..d971fa203 --- /dev/null +++ b/src/fluent-bit/plugins/filter_record_modifier/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + filter_modifier.c) + +FLB_PLUGIN(filter_record_modifier "${src}" "") diff --git a/src/fluent-bit/plugins/filter_record_modifier/filter_modifier.c b/src/fluent-bit/plugins/filter_record_modifier/filter_modifier.c new file mode 100644 index 000000000..d95066a19 --- /dev/null +++ b/src/fluent-bit/plugins/filter_record_modifier/filter_modifier.c @@ -0,0 +1,531 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_filter_plugin.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_str.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_kv.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_log_event_decoder.h> +#include <fluent-bit/flb_log_event_encoder.h> + +#include <msgpack.h> +#include "filter_modifier.h" + +#define PLUGIN_NAME "filter_record_modifier" + +static int config_allowlist_key(struct record_modifier_ctx *ctx, + struct mk_list *list) +{ + struct modifier_key *mod_key = NULL; + struct mk_list *head = NULL; + struct flb_config_map_val *mv = NULL; + + if (ctx == NULL || list == NULL) { + return -1; + } + + flb_config_map_foreach(head, mv, list) { + mod_key = flb_malloc(sizeof(struct modifier_key)); + if (!mod_key) { + flb_errno(); + continue; + } + mod_key->key = mv->val.str; + mod_key->key_len = flb_sds_len(mv->val.str); + if (mod_key->key[mod_key->key_len - 1] == '*') { + mod_key->dynamic_key = FLB_TRUE; + mod_key->key_len--; + } + else { + mod_key->dynamic_key = FLB_FALSE; + } + mk_list_add(&mod_key->_head, &ctx->allowlist_keys); + ctx->allowlist_keys_num++; + } + return 0; +} + +static int configure(struct record_modifier_ctx *ctx, + struct flb_filter_instance *f_ins) +{ + struct mk_list *head = NULL; + struct modifier_key *mod_key; + struct modifier_record *mod_record; + struct flb_config_map_val *mv; + struct flb_slist_entry *sentry = NULL; + + ctx->records_num = 0; + ctx->remove_keys_num = 0; + ctx->allowlist_keys_num = 0; + + if (flb_filter_config_map_set(f_ins, ctx) < 0) { + flb_errno(); + flb_plg_error(f_ins, "configuration error"); + return -1; + } + + /* Check 'Record' properties */ + flb_config_map_foreach(head, mv, ctx->records_map) { + mod_record = flb_malloc(sizeof(struct modifier_record)); + if (!mod_record) { + flb_errno(); + continue; + } + + if (mk_list_size(mv->val.list) != 2) { + flb_plg_error(ctx->ins, "invalid record parameters, " + "expects 'KEY VALUE'"); + flb_free(mod_record); + continue; + } + /* Get first value (field) */ + sentry = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); + mod_record->key_len = flb_sds_len(sentry->str); + mod_record->key = flb_strndup(sentry->str, mod_record->key_len); + if (mod_record->key == NULL) { + flb_errno(); + flb_free(mod_record); + continue; + } + + sentry = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); + mod_record->val_len = flb_sds_len(sentry->str); + mod_record->val = flb_strndup(sentry->str, mod_record->val_len); + if (mod_record->val == NULL) { + flb_errno(); + flb_free(mod_record->key); + flb_free(mod_record); + continue; + } + + mk_list_add(&mod_record->_head, &ctx->records); + ctx->records_num++; + } + /* Check "Remove_Key" properties */ + flb_config_map_foreach(head, mv, ctx->remove_keys_map) { + mod_key = flb_malloc(sizeof(struct modifier_key)); + if (!mod_key) { + flb_errno(); + continue; + } + mod_key->key = mv->val.str; + mod_key->key_len = flb_sds_len(mv->val.str); + if (mod_key->key[mod_key->key_len - 1] == '*') { + mod_key->dynamic_key = FLB_TRUE; + mod_key->key_len--; + } + else { + mod_key->dynamic_key = FLB_FALSE; + } + mk_list_add(&mod_key->_head, &ctx->remove_keys); + ctx->remove_keys_num++; + } + + /* Check "Allowlist_key" and "Whitelist_key" properties */ + config_allowlist_key(ctx, ctx->allowlist_keys_map); + config_allowlist_key(ctx, ctx->whitelist_keys_map); + + if (ctx->remove_keys_num > 0 && ctx->allowlist_keys_num > 0) { + flb_plg_error(ctx->ins, "remove_keys and allowlist_keys are exclusive " + "with each other."); + return -1; + } + return 0; +} + +static int delete_list(struct record_modifier_ctx *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct modifier_key *key; + struct modifier_record *record; + + mk_list_foreach_safe(head, tmp, &ctx->remove_keys) { + key = mk_list_entry(head, struct modifier_key, _head); + mk_list_del(&key->_head); + flb_free(key); + } + mk_list_foreach_safe(head, tmp, &ctx->allowlist_keys) { + key = mk_list_entry(head, struct modifier_key, _head); + mk_list_del(&key->_head); + flb_free(key); + } + mk_list_foreach_safe(head, tmp, &ctx->records) { + record = mk_list_entry(head, struct modifier_record, _head); + flb_free(record->key); + flb_free(record->val); + mk_list_del(&record->_head); + flb_free(record); + } + return 0; +} + + +static int cb_modifier_init(struct flb_filter_instance *f_ins, + struct flb_config *config, + void *data) +{ + struct record_modifier_ctx *ctx = NULL; + + /* Create context */ + ctx = flb_calloc(1, sizeof(struct record_modifier_ctx)); + if (!ctx) { + flb_errno(); + return -1; + } + mk_list_init(&ctx->records); + mk_list_init(&ctx->remove_keys); + mk_list_init(&ctx->allowlist_keys); + ctx->ins = f_ins; + + if ( configure(ctx, f_ins) < 0 ){ + delete_list(ctx); + flb_free(ctx); + return -1; + } + + flb_filter_set_context(f_ins, ctx); + + return 0; +} + +static int make_bool_map(struct record_modifier_ctx *ctx, msgpack_object *map, + bool_map_t *bool_map, int map_num) +{ + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *check = NULL; + msgpack_object_kv *kv; + struct modifier_key *mod_key; + + char result; + char is_to_delete; + msgpack_object *key; + int ret = map_num; + int i; + + for (i=0; i<map_num; i++) { + bool_map[i] = TO_BE_REMAINED; + } + bool_map[map_num] = TAIL_OF_ARRAY;/* tail of map */ + + if (ctx->remove_keys_num > 0) { + check = &(ctx->remove_keys); + is_to_delete = FLB_TRUE; + } + else if(ctx->allowlist_keys_num > 0) { + check = &(ctx->allowlist_keys); + is_to_delete = FLB_FALSE; + } + + if (check != NULL){ + kv = map->via.map.ptr; + for(i=0; i<map_num; i++){ + key = &(kv+i)->key; + result = FLB_FALSE; + + mk_list_foreach_safe(head, tmp, check) { + mod_key = mk_list_entry(head, struct modifier_key, _head); + if (key->via.bin.size != mod_key->key_len && + key->via.str.size != mod_key->key_len && + mod_key->dynamic_key == FLB_FALSE) { + continue; + } + if (key->via.bin.size < mod_key->key_len && + key->via.str.size < mod_key->key_len && + mod_key->dynamic_key == FLB_TRUE) { + continue; + } + if ((key->type == MSGPACK_OBJECT_BIN && + !strncasecmp(key->via.bin.ptr, mod_key->key, + mod_key->key_len)) || + (key->type == MSGPACK_OBJECT_STR && + !strncasecmp(key->via.str.ptr, mod_key->key, + mod_key->key_len)) + ) { + result = FLB_TRUE; + break; + } + } + if (result == is_to_delete) { + bool_map[i] = TO_BE_REMOVED; + ret--; + } + } + } + + return ret; +} + +static int create_uuid(struct record_modifier_ctx *ctx, char *uuid) +{ + int ret; + + if (uuid == NULL) { + return -1; + } + + ret = flb_utils_uuid_v4_gen(uuid); + if (ret < 0) { + flb_plg_error(ctx->ins, "failed to append uuid"); + return -1; + } + return 0; +} + +#define BOOL_MAP_LIMIT 65535 +static int cb_modifier_filter(const void *data, size_t bytes, + const char *tag, int tag_len, + void **out_buf, size_t *out_size, + struct flb_filter_instance *f_ins, + struct flb_input_instance *i_ins, + void *context, + struct flb_config *config) +{ + struct record_modifier_ctx *ctx = context; + char is_modified = FLB_FALSE; + int i; + int removed_map_num = 0; + int map_num = 0; + int ret; + char uuid[40] = {0}; + size_t uuid_len = 0; + bool_map_t *bool_map = NULL; + struct flb_time tm; + struct modifier_record *mod_rec; + msgpack_object *obj; + msgpack_object_kv *kv; + struct mk_list *tmp; + struct mk_list *head; + struct flb_log_event_encoder log_encoder; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + (void) f_ins; + (void) i_ins; + (void) config; + + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + return FLB_FILTER_NOTOUCH; + } + + ret = flb_log_event_encoder_init(&log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event encoder initialization error : %d", ret); + + flb_log_event_decoder_destroy(&log_decoder); + + return FLB_FILTER_NOTOUCH; + } + + /* Iterate each item to know map number */ + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + map_num = 0; + removed_map_num = 0; + uuid_len = 0; + + if (bool_map != NULL) { + flb_free(bool_map); + bool_map = NULL; + } + + flb_time_copy(&tm, &log_event.timestamp); + obj = log_event.body; + + /* grep keys */ + if (obj->type == MSGPACK_OBJECT_MAP) { + map_num = obj->via.map.size; + if (map_num > BOOL_MAP_LIMIT) { + flb_plg_error(ctx->ins, "The number of elements exceeds limit %d", + BOOL_MAP_LIMIT); + return -1; + } + /* allocate map_num + guard byte */ + bool_map = flb_calloc(map_num+1, sizeof(bool_map_t)); + if (bool_map == NULL) { + flb_errno(); + return -1; + } + removed_map_num = make_bool_map(ctx, obj, + bool_map, obj->via.map.size); + } + else { + continue; + } + + if (removed_map_num != map_num) { + is_modified = FLB_TRUE; + } + + removed_map_num += ctx->records_num; + if (ctx->uuid_key) { + memset(&uuid[0], 0, sizeof(uuid)); + ret = create_uuid(ctx, &uuid[0]); + if (ret == 0) { + removed_map_num++; + uuid_len = strlen(&uuid[0]); + } + } + if (removed_map_num <= 0) { + continue; + } + + ret = flb_log_event_encoder_begin_record(&log_encoder); + + ret = flb_log_event_encoder_set_timestamp(&log_encoder, &tm); + + ret = flb_log_event_encoder_set_metadata_from_msgpack_object( + &log_encoder, log_event.metadata); + + kv = obj->via.map.ptr; + for(i=0; + bool_map[i] != TAIL_OF_ARRAY && + ret == FLB_EVENT_ENCODER_SUCCESS; + i++) { + if (bool_map[i] == TO_BE_REMAINED) { + ret = flb_log_event_encoder_append_body_values( + &log_encoder, + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val)); + } + } + + flb_free(bool_map); + bool_map = NULL; + + /* append record */ + if (ctx->records_num > 0) { + is_modified = FLB_TRUE; + + mk_list_foreach_safe(head, tmp, &ctx->records) { + mod_rec = mk_list_entry(head, struct modifier_record, _head); + + ret = flb_log_event_encoder_append_body_values( + &log_encoder, + FLB_LOG_EVENT_STRING_VALUE(mod_rec->key, mod_rec->key_len), + FLB_LOG_EVENT_STRING_VALUE(mod_rec->val, mod_rec->val_len)); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + } + } + + if (uuid_len > 0) { + is_modified = FLB_TRUE; + + ret = flb_log_event_encoder_append_body_values( + &log_encoder, + FLB_LOG_EVENT_STRING_VALUE(ctx->uuid_key, flb_sds_len(ctx->uuid_key)), + FLB_LOG_EVENT_STRING_VALUE(&uuid[0], uuid_len)); + } + + flb_log_event_encoder_commit_record(&log_encoder); + } + + if (bool_map != NULL) { + flb_free(bool_map); + } + + if (is_modified && + log_encoder.output_length > 0) { + *out_buf = log_encoder.output_buffer; + *out_size = log_encoder.output_length; + + ret = FLB_FILTER_MODIFIED; + + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); + } + else { + ret = FLB_FILTER_NOTOUCH; + } + + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + + return ret; +} + +static int cb_modifier_exit(void *data, struct flb_config *config) +{ + struct record_modifier_ctx *ctx = data; + + if (ctx != NULL) { + delete_list(ctx); + flb_free(ctx); + } + return 0; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_SLIST_2, "record", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct record_modifier_ctx, records_map), + "Append fields. This parameter needs key and value pair." + }, + + { + FLB_CONFIG_MAP_STR, "remove_key", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct record_modifier_ctx, remove_keys_map), + "If the key is matched, that field is removed." + }, + { + FLB_CONFIG_MAP_STR, "allowlist_key", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct record_modifier_ctx, allowlist_keys_map), + "If the key is not matched, that field is removed." + }, + { + FLB_CONFIG_MAP_STR, "whitelist_key", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct record_modifier_ctx, whitelist_keys_map), + "(Alias of allowlist_key)" + }, + + { + FLB_CONFIG_MAP_STR, "uuid_key", NULL, + 0, FLB_TRUE, offsetof(struct record_modifier_ctx, uuid_key), + "If set, the plugin generates uuid per record." + }, + + {0} +}; + +struct flb_filter_plugin filter_record_modifier_plugin = { + .name = "record_modifier", + .description = "modify record", + .cb_init = cb_modifier_init, + .cb_filter = cb_modifier_filter, + .cb_exit = cb_modifier_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/src/fluent-bit/plugins/filter_record_modifier/filter_modifier.h b/src/fluent-bit/plugins/filter_record_modifier/filter_modifier.h new file mode 100644 index 000000000..b9d0818ef --- /dev/null +++ b/src/fluent-bit/plugins/filter_record_modifier/filter_modifier.h @@ -0,0 +1,68 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_FILTER_RECORD_MODIFIER_H +#define FLB_FILTER_RECORD_MODIFIER_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_filter.h> + +struct modifier_record { + char *key; + char *val; + int key_len; + int val_len; + struct mk_list _head; +}; + +struct modifier_key { + char *key; + int key_len; + int dynamic_key; + struct mk_list _head; +}; + +struct record_modifier_ctx { + int records_num; + int remove_keys_num; + int allowlist_keys_num; + + flb_sds_t uuid_key; + + /* config map */ + struct mk_list *records_map; + struct mk_list *remove_keys_map; + struct mk_list *allowlist_keys_map; + struct mk_list *whitelist_keys_map; + + struct mk_list records; + struct mk_list remove_keys; + struct mk_list allowlist_keys; + struct flb_filter_instance *ins; +}; + +typedef enum { + TO_BE_REMOVED = 0, + TO_BE_REMAINED = 1, + TAIL_OF_ARRAY = 2 +} bool_map_t; + + +#endif /* FLB_FILTER_RECORD_MODIFIER_H */ |