summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/filter_rewrite_tag/rewrite_tag.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/filter_rewrite_tag/rewrite_tag.c')
-rw-r--r--src/fluent-bit/plugins/filter_rewrite_tag/rewrite_tag.c621
1 files changed, 621 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_rewrite_tag/rewrite_tag.c b/src/fluent-bit/plugins/filter_rewrite_tag/rewrite_tag.c
new file mode 100644
index 000000000..5969d1582
--- /dev/null
+++ b/src/fluent-bit/plugins/filter_rewrite_tag/rewrite_tag.c
@@ -0,0 +1,621 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_plugin.h>
+#include <fluent-bit/flb_processor.h>
+#include <fluent-bit/flb_filter_plugin.h>
+#include <fluent-bit/flb_metrics.h>
+#include <fluent-bit/flb_storage.h>
+#include <fluent-bit/flb_regex.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_record_accessor.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+#include <msgpack.h>
+
+#include "rewrite_tag.h"
+
+/* Create an emitter input instance */
+static int emitter_create(struct flb_rewrite_tag *ctx)
+{
+ int ret;
+ struct flb_input_instance *ins;
+
+ ret = flb_input_name_exists(ctx->emitter_name, ctx->config);
+ if (ret == FLB_TRUE) {
+ flb_plg_error(ctx->ins, "emitter_name '%s' already exists",
+ ctx->emitter_name);
+ return -1;
+ }
+
+ ins = flb_input_new(ctx->config, "emitter", NULL, FLB_FALSE);
+ if (!ins) {
+ flb_plg_error(ctx->ins, "cannot create emitter instance");
+ return -1;
+ }
+
+ /* Set the alias name */
+ ret = flb_input_set_property(ins, "alias", ctx->emitter_name);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins,
+ "cannot set emitter_name, using fallback name '%s'",
+ ins->name);
+ }
+
+ /* Set the emitter_mem_buf_limit */
+ if(ctx->emitter_mem_buf_limit > 0) {
+ ins->mem_buf_limit = ctx->emitter_mem_buf_limit;
+ }
+
+ /* Set the storage type */
+ ret = flb_input_set_property(ins, "storage.type",
+ ctx->emitter_storage_type);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "cannot set storage.type");
+ }
+
+ /* Initialize emitter plugin */
+ ret = flb_input_instance_init(ins, ctx->config);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "cannot initialize emitter instance '%s'",
+ ins->name);
+ flb_input_instance_exit(ins, ctx->config);
+ flb_input_instance_destroy(ins);
+ return -1;
+ }
+
+#ifdef FLB_HAVE_METRICS
+ /* Override Metrics title */
+ ret = flb_metrics_title(ctx->emitter_name, ins->metrics);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins, "cannot set metrics title, using fallback name %s",
+ ins->name);
+ }
+#endif
+
+ /* Storage context */
+ ret = flb_storage_input_create(ctx->config->cio, ins);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "cannot initialize storage for stream '%s'",
+ ctx->emitter_name);
+ flb_input_instance_exit(ins, ctx->config);
+ flb_input_instance_destroy(ins);
+ return -1;
+ }
+ ctx->ins_emitter = ins;
+ return 0;
+}
+
+/*
+ * Validate and prepare internal contexts based on the received
+ * config_map values.
+ */
+static int process_config(struct flb_rewrite_tag *ctx)
+{
+ struct mk_list *head;
+ struct flb_slist_entry *entry;
+ struct rewrite_rule *rule;
+ struct flb_config_map_val *val;
+
+ if (!ctx->cm_rules) {
+ return -1;
+ }
+
+ mk_list_foreach(head, ctx->cm_rules) {
+ /*
+ * When multiple entries are allowed in a config map, this becomes
+ * a list of struct flb_config_map_val. Every entry is linked in the
+ * 'mult' field
+ */
+ val = mk_list_entry(head, struct flb_config_map_val, _head);
+
+ /* Allocate a rule */
+ rule = flb_malloc(sizeof(struct rewrite_rule));
+ if (!rule) {
+ flb_errno();
+ return -1;
+ }
+
+ /* key */
+ entry = flb_slist_entry_get(val->val.list, 0);
+ if (entry == NULL) {
+ flb_plg_error(ctx->ins, "failed to get entry");
+ flb_free(rule);
+ return -1;
+ }
+ rule->ra_key = flb_ra_create(entry->str, FLB_FALSE);
+ if (!rule->ra_key) {
+ flb_plg_error(ctx->ins, "invalid record accessor key ? '%s'",
+ entry->str);
+ flb_free(rule);
+ return -1;
+ }
+
+ /* regex */
+ entry = flb_slist_entry_get(val->val.list, 1);
+ rule->regex = flb_regex_create(entry->str);
+ if (!rule->regex) {
+ flb_plg_error(ctx->ins, "could not compile regex pattern '%s'",
+ entry->str);
+ flb_ra_destroy(rule->ra_key);
+ flb_free(rule);
+ return -1;
+ }
+
+ /* tag */
+ entry = flb_slist_entry_get(val->val.list, 2);
+ rule->ra_tag = flb_ra_create(entry->str, FLB_FALSE);
+
+ if (!rule->ra_tag) {
+ flb_plg_error(ctx->ins, "could not compose tag: %s", entry->str);
+ flb_ra_destroy(rule->ra_key);
+ flb_regex_destroy(rule->regex);
+ flb_free(rule);
+ return -1;
+ }
+
+ /* keep record ? */
+ entry = flb_slist_entry_get(val->val.list, 3);
+ rule->keep_record = flb_utils_bool(entry->str);
+
+ /* Link new rule */
+ mk_list_add(&rule->_head, &ctx->rules);
+ }
+
+ if (mk_list_size(&ctx->rules) == 0) {
+ flb_plg_warn(ctx->ins, "no rules have defined");
+ return 0;
+ }
+
+ return 0;
+}
+
+static int is_wildcard(char* match)
+{
+ size_t len;
+ size_t i;
+
+ if (match == NULL) {
+ return 0;
+ }
+ len = strlen(match);
+
+ /* '***' should be ignored. So we check every char. */
+ for (i=0; i<len; i++) {
+ if (match[i] != '*') {
+ return 0;
+ }
+ }
+ return 1;
+}
+
+static int cb_rewrite_tag_init(struct flb_filter_instance *ins,
+ struct flb_config *config,
+ void *data)
+{
+ int ret;
+ flb_sds_t tmp;
+ flb_sds_t emitter_name = NULL;
+ struct flb_rewrite_tag *ctx;
+ (void) data;
+
+ /* Create context */
+ ctx = flb_calloc(1, sizeof(struct flb_rewrite_tag));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+ if (is_wildcard(ins->match)) {
+ flb_plg_warn(ins, "'Match' may cause infinite loop.");
+ }
+ ctx->ins = ins;
+ ctx->config = config;
+ mk_list_init(&ctx->rules);
+
+ /*
+ * Emitter name: every rewrite_tag instance needs an emitter input plugin,
+ * with that one is able to emit records. We use a unique instance so we
+ * can use the metrics interface.
+ *
+ * If not set, we define an emitter name
+ *
+ * Validate if the emitter_name has been set before to check with the
+ * config map. If is not set, do a manual set of the property, so we let the
+ * config map handle the memory allocation.
+ */
+ tmp = (char *) flb_filter_get_property("emitter_name", ins);
+ if (!tmp) {
+ emitter_name = flb_sds_create_size(64);
+ if (!emitter_name) {
+ flb_free(ctx);
+ return -1;
+ }
+
+ tmp = flb_sds_printf(&emitter_name, "emitter_for_%s",
+ flb_filter_name(ins));
+ if (!tmp) {
+ flb_error("[filter rewrite_tag] cannot compose emitter_name");
+ flb_sds_destroy(emitter_name);
+ flb_free(ctx);
+ return -1;
+ }
+
+ flb_filter_set_property(ins, "emitter_name", emitter_name);
+ flb_sds_destroy(emitter_name);
+ }
+
+ /* Set config_map properties in our local context */
+ ret = flb_filter_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return -1;
+ }
+
+ /*
+ * Emitter Storage Type: the emitter input plugin to be created by default
+ * uses memory buffer, this option allows to define a filesystem mechanism
+ * for new records created (only if the main service is also filesystem
+ * enabled).
+ *
+ * On this code we just validate the input type: 'memory' or 'filesystem'.
+ */
+ tmp = ctx->emitter_storage_type;
+ if (strcasecmp(tmp, "memory") != 0 && strcasecmp(tmp, "filesystem") != 0) {
+ flb_plg_error(ins, "invalid 'emitter_storage.type' value. Only "
+ "'memory' or 'filesystem' types are allowed");
+ flb_free(ctx);
+ return -1;
+ }
+
+ /* Set plugin context */
+ flb_filter_set_context(ins, ctx);
+
+ /* Process the configuration */
+ ret = process_config(ctx);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* Create the emitter context */
+ ret = emitter_create(ctx);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* Register a metric to count the number of emitted records */
+#ifdef FLB_HAVE_METRICS
+ ctx->cmt_emitted = cmt_counter_create(ins->cmt,
+ "fluentbit", "filter", "emit_records_total",
+ "Total number of emitted records",
+ 1, (char *[]) {"name"});
+
+ /* OLD api */
+ flb_metrics_add(FLB_RTAG_METRIC_EMITTED,
+ "emit_records", ctx->ins->metrics);
+#endif
+
+ return 0;
+}
+
+static int ingest_inline(struct flb_rewrite_tag *ctx,
+ flb_sds_t out_tag,
+ const void *buf, size_t buf_size)
+{
+ struct flb_input_instance *input_instance;
+ struct flb_processor_unit *processor_unit;
+ struct flb_processor *processor;
+ int result;
+
+ if (ctx->ins->parent_processor != NULL) {
+ processor_unit = (struct flb_processor_unit *) \
+ ctx->ins->parent_processor;
+ processor = (struct flb_processor *) processor_unit->parent;
+ input_instance = (struct flb_input_instance *) processor->data;
+
+ if (processor->source_plugin_type == FLB_PLUGIN_INPUT) {
+ result = flb_input_log_append_skip_processor_stages(
+ input_instance,
+ processor_unit->stage + 1,
+ out_tag, flb_sds_len(out_tag),
+ buf, buf_size);
+
+ if (result == 0) {
+ return FLB_TRUE;
+ }
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+
+/*
+ * On given record, check if a rule applies or not to the map, if so, compose
+ * the new tag, emit the record and return FLB_TRUE, otherwise just return
+ * FLB_FALSE and the original record will remain.
+ */
+static int process_record(const char *tag, int tag_len, msgpack_object map,
+ const void *buf, size_t buf_size, int *keep,
+ struct flb_rewrite_tag *ctx, int *matched)
+{
+ int ret;
+ flb_sds_t out_tag;
+ struct mk_list *head;
+ struct rewrite_rule *rule = NULL;
+ struct flb_regex_search result = {0};
+
+ if (matched == NULL) {
+ return FLB_FALSE;
+ }
+ *matched = FLB_FALSE;
+
+ mk_list_foreach(head, &ctx->rules) {
+ rule = mk_list_entry(head, struct rewrite_rule, _head);
+ if (rule) {
+ *keep = rule->keep_record;
+ }
+ ret = flb_ra_regex_match(rule->ra_key, map, rule->regex, &result);
+ if (ret < 0) { /* no match */
+ rule = NULL;
+ continue;
+ }
+
+ /* A record matched, just break and check 'rule' */
+ break;
+ }
+
+ if (!rule) {
+ return FLB_FALSE;
+ }
+ *matched = FLB_TRUE;
+
+ /* Compose new tag */
+ out_tag = flb_ra_translate(rule->ra_tag, (char *) tag, tag_len, map, &result);
+
+ /* Release any capture info from 'results' */
+ flb_regex_results_release(&result);
+
+ /* Validate new outgoing tag */
+ if (!out_tag) {
+ return FLB_FALSE;
+ }
+
+ ret = ingest_inline(ctx, out_tag, buf, buf_size);
+
+ if (!ret) {
+ /* Emit record with new tag */
+ ret = in_emitter_add_record(out_tag, flb_sds_len(out_tag), buf, buf_size,
+ ctx->ins_emitter);
+ }
+ else {
+ ret = 0;
+ }
+
+ /* Release the tag */
+ flb_sds_destroy(out_tag);
+
+ if (ret == -1) {
+ return FLB_FALSE;
+ }
+
+ return FLB_TRUE;
+}
+
+static int cb_rewrite_tag_filter(const void *data, size_t bytes,
+ const char *tag, int tag_len,
+ void **out_buf, size_t *out_bytes,
+ struct flb_filter_instance *f_ins,
+ struct flb_input_instance *i_ins,
+ void *filter_context,
+ struct flb_config *config)
+{
+ int keep;
+ int emitted_num = 0;
+ int is_matched = FLB_FALSE;
+ int is_emitted = FLB_FALSE;
+ size_t pre = 0;
+ size_t off = 0;
+#ifdef FLB_HAVE_METRICS
+ uint64_t ts;
+ char *name;
+#endif
+ msgpack_object map;
+ struct flb_rewrite_tag *ctx;
+ struct flb_log_event_encoder log_encoder;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int ret;
+
+ (void) config;
+ (void) i_ins;
+
+ ctx = (struct flb_rewrite_tag *) filter_context;
+
+#ifdef FLB_HAVE_METRICS
+ ts = cfl_time_now();
+ name = (char *) flb_filter_name(f_ins);
+#endif
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ ret = flb_log_event_encoder_init(&log_encoder,
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event encoder initialization error : %d", ret);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ off = log_decoder.offset;
+ map = *log_event.body;
+ is_matched = FLB_FALSE;
+ /*
+ * Process the record according the defined rules. If it returns FLB_TRUE means
+ * the record was emitter with a different tag.
+ *
+ * If a record was emitted, the variable 'keep' will define if the record must
+ * be preserved or not.
+ */
+ is_emitted = process_record(tag, tag_len, map, (char *) data + pre, off - pre, &keep, ctx, &is_matched);
+ if (is_emitted == FLB_TRUE) {
+ /* A record with the new tag was emitted */
+ emitted_num++;
+ }
+
+ /*
+ * Here we decide if the original record must be preserved or not:
+ *
+ * - record with new tag was emitted and the rule says it must be preserved
+ * - record was not emitted
+ */
+ if (keep == FLB_TRUE || is_matched != FLB_TRUE) {
+ ret = flb_log_event_encoder_emit_raw_record(
+ &log_encoder,
+ log_decoder.record_base,
+ log_decoder.record_length);
+ }
+
+ /* Adjust previous offset */
+ pre = off;
+ }
+
+ if (emitted_num == 0) {
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+#ifdef FLB_HAVE_METRICS
+ else if (emitted_num > 0) {
+ cmt_counter_add(ctx->cmt_emitted, ts, emitted_num,
+ 1, (char *[]) {name});
+
+ /* OLD api */
+ flb_metrics_sum(FLB_RTAG_METRIC_EMITTED, emitted_num, ctx->ins->metrics);
+ }
+#endif
+
+ if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA &&
+ log_decoder.offset == bytes) {
+ ret = FLB_EVENT_ENCODER_SUCCESS;
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ *out_buf = log_encoder.output_buffer;
+ *out_bytes = log_encoder.output_length;
+
+ ret = FLB_FILTER_MODIFIED;
+
+ flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "Log event encoder error : %d", ret);
+
+ ret = FLB_FILTER_NOTOUCH;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return ret;
+}
+
+/* Destroy rules from context */
+static void destroy_rules(struct flb_rewrite_tag *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct rewrite_rule *rule;
+
+ mk_list_foreach_safe(head, tmp, &ctx->rules) {
+ rule = mk_list_entry(head, struct rewrite_rule, _head);
+ flb_regex_destroy(rule->regex);
+ flb_ra_destroy(rule->ra_key);
+ flb_ra_destroy(rule->ra_tag);
+ mk_list_del(&rule->_head);
+ flb_free(rule);
+ }
+}
+
+static int cb_rewrite_tag_exit(void *data, struct flb_config *config)
+{
+ struct flb_rewrite_tag *ctx = (struct flb_rewrite_tag *) data;
+
+ if (!ctx) {
+ return 0;
+ }
+
+ destroy_rules(ctx);
+ flb_free(ctx);
+
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_SLIST_4, "rule", NULL,
+ FLB_TRUE, FLB_TRUE, offsetof(struct flb_rewrite_tag, cm_rules),
+ NULL
+ },
+ {
+ FLB_CONFIG_MAP_STR, "emitter_name", NULL,
+ FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_name),
+ NULL
+ },
+ {
+ FLB_CONFIG_MAP_STR, "emitter_storage.type", "memory",
+ FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_storage_type),
+ NULL
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "emitter_mem_buf_limit", FLB_RTAG_MEM_BUF_LIMIT_DEFAULT,
+ FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_mem_buf_limit),
+ "set a memory buffer limit to restrict memory usage of emitter"
+ },
+ /* EOF */
+ {0}
+};
+
+struct flb_filter_plugin filter_rewrite_tag_plugin = {
+ .name = "rewrite_tag",
+ .description = "Rewrite records tags",
+ .cb_init = cb_rewrite_tag_init,
+ .cb_filter = cb_rewrite_tag_filter,
+ .cb_exit = cb_rewrite_tag_exit,
+ .config_map = config_map,
+ .flags = 0
+};