summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_filter.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /fluent-bit/src/flb_filter.c
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/flb_filter.c')
-rw-r--r--fluent-bit/src/flb_filter.c669
1 files changed, 0 insertions, 669 deletions
diff --git a/fluent-bit/src/flb_filter.c b/fluent-bit/src/flb_filter.c
deleted file mode 100644
index 389709a9a..000000000
--- a/fluent-bit/src/flb_filter.c
+++ /dev/null
@@ -1,669 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_config.h>
-#include <fluent-bit/flb_filter.h>
-#include <fluent-bit/flb_str.h>
-#include <fluent-bit/flb_env.h>
-#include <fluent-bit/flb_router.h>
-#include <fluent-bit/flb_mp.h>
-#include <fluent-bit/flb_kv.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_metrics.h>
-#include <fluent-bit/flb_utils.h>
-#include <chunkio/chunkio.h>
-
-#ifdef FLB_HAVE_CHUNK_TRACE
-#include <fluent-bit/flb_chunk_trace.h>
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
-static inline int instance_id(struct flb_config *config)
-{
- struct flb_filter_instance *entry;
-
- if (mk_list_size(&config->filters) == 0) {
- return 0;
- }
-
- entry = mk_list_entry_last(&config->filters, struct flb_filter_instance,
- _head);
- return (entry->id + 1);
-}
-
-static int is_active(struct mk_list *in_properties)
-{
- struct mk_list *head;
- struct flb_kv *kv;
-
- mk_list_foreach(head, in_properties) {
- kv = mk_list_entry(head, struct flb_kv, _head);
- if (strcasecmp(kv->key, "active") == 0) {
- /* Skip checking deactivation ... */
- if (strcasecmp(kv->val, "FALSE") == 0 || strcmp(kv->val, "0") == 0) {
- return FLB_FALSE;
- }
- }
- }
- return FLB_TRUE;
-}
-
-static inline int prop_key_check(const char *key, const char *kv, int k_len)
-{
- int len;
-
- len = strlen(key);
- if (strncasecmp(key, kv, k_len) == 0 && len == k_len) {
- return 0;
- }
-
- return -1;
-}
-
-void flb_filter_do(struct flb_input_chunk *ic,
- const void *data, size_t bytes,
- const char *tag, int tag_len,
- struct flb_config *config)
-{
- int ret;
-#ifdef FLB_HAVE_METRICS
- int in_records = 0;
- int out_records = 0;
- int diff = 0;
- int pre_records = 0;
- uint64_t ts;
- char *name;
-#endif
- char *ntag;
- const char *work_data;
- size_t work_size;
- void *out_buf;
- size_t cur_size;
- size_t out_size;
- ssize_t content_size;
- ssize_t write_at;
- struct mk_list *head;
- struct flb_filter_instance *f_ins;
- struct flb_input_instance *i_ins = ic->in;
-/* measure time between filters for chunk traces. */
-#ifdef FLB_HAVE_CHUNK_TRACE
- struct flb_time tm_start;
- struct flb_time tm_finish;
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
- /* For the incoming Tag make sure to create a NULL terminated reference */
- ntag = flb_malloc(tag_len + 1);
- if (!ntag) {
- flb_errno();
- flb_error("[filter] could not filter record due to memory problems");
- return;
- }
- memcpy(ntag, tag, tag_len);
- ntag[tag_len] = '\0';
-
- work_data = (const char *) data;
- work_size = bytes;
-
-#ifdef FLB_HAVE_METRICS
- /* timestamp */
- ts = cfl_time_now();
-
- /* Count number of incoming records */
- in_records = ic->added_records;
- pre_records = ic->total_records - in_records;
-#endif
-
- /* Iterate filters */
- mk_list_foreach(head, &config->filters) {
- f_ins = mk_list_entry(head, struct flb_filter_instance, _head);
- if (is_active(&f_ins->properties) == FLB_FALSE) {
- continue;
- }
- if (flb_router_match(ntag, tag_len, f_ins->match
-#ifdef FLB_HAVE_REGEX
- , f_ins->match_regex
-#else
- , NULL
-#endif
- )) {
- /* Reset filtered buffer */
- out_buf = NULL;
- out_size = 0;
-
- content_size = cio_chunk_get_content_size(ic->chunk);
-
- /* where to position the new content if modified ? */
- write_at = (content_size - work_size);
-
-#ifdef FLB_HAVE_CHUNK_TRACE
- if (ic->trace) {
- flb_time_get(&tm_start);
- }
-#endif /* FLB_HAVE_CHUNK_TRACE */
- /* Invoke the filter callback */
- ret = f_ins->p->cb_filter(work_data, /* msgpack buffer */
- work_size, /* msgpack size */
- ntag, tag_len, /* input tag */
- &out_buf, /* new data */
- &out_size, /* new data size */
- f_ins, /* filter instance */
- i_ins, /* input instance */
- f_ins->context, /* filter priv data */
- config);
-#ifdef FLB_HAVE_CHUNK_TRACE
- if (ic->trace) {
- flb_time_get(&tm_finish);
- }
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
-#ifdef FLB_HAVE_METRICS
- name = (char *) flb_filter_name(f_ins);
-
- cmt_counter_add(f_ins->cmt_records, ts, in_records,
- 1, (char *[]) {name});
- cmt_counter_add(f_ins->cmt_bytes, ts, content_size,
- 1, (char *[]) {name});
-
- flb_metrics_sum(FLB_METRIC_N_RECORDS, in_records, f_ins->metrics);
- flb_metrics_sum(FLB_METRIC_N_BYTES, content_size, f_ins->metrics);
-#endif
-
- /* Override buffer just if it was modified */
- if (ret == FLB_FILTER_MODIFIED) {
- /* all records removed, no data to continue processing */
- if (out_size == 0) {
- /* reset data content length */
- flb_input_chunk_write_at(ic, write_at, "", 0);
-#ifdef FLB_HAVE_CHUNK_TRACE
- if (ic->trace) {
- flb_chunk_trace_filter(ic->trace, (void *)f_ins, &tm_start, &tm_finish, "", 0);
- }
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
-
-#ifdef FLB_HAVE_METRICS
- ic->total_records = pre_records;
-
- /* cmetrics */
- cmt_counter_add(f_ins->cmt_drop_records, ts, in_records,
- 1, (char *[]) {name});
-
- /* [OLD] Summarize all records removed */
- flb_metrics_sum(FLB_METRIC_N_DROPPED,
- in_records, f_ins->metrics);
-#endif
- break;
- }
- else {
-#ifdef FLB_HAVE_METRICS
- out_records = flb_mp_count(out_buf, out_size);
- if (out_records > in_records) {
- diff = (out_records - in_records);
-
- /* cmetrics */
- cmt_counter_add(f_ins->cmt_add_records, ts, diff,
- 1, (char *[]) {name});
-
- /* [OLD] Summarize new records */
- flb_metrics_sum(FLB_METRIC_N_ADDED,
- diff, f_ins->metrics);
- }
- else if (out_records < in_records) {
- diff = (in_records - out_records);
-
- /* cmetrics */
- cmt_counter_add(f_ins->cmt_drop_records, ts, diff,
- 1, (char *[]) {name});
-
- /* [OLD] Summarize dropped records */
- flb_metrics_sum(FLB_METRIC_N_DROPPED,
- diff, f_ins->metrics);
- }
-
- /* set number of records in new chunk */
- in_records = out_records;
- ic->total_records = pre_records + in_records;
-#endif
- }
- ret = flb_input_chunk_write_at(ic, write_at,
- out_buf, out_size);
- if (ret == -1) {
- flb_error("[filter] could not write data to storage. "
- "Skipping filtering.");
- flb_free(out_buf);
- continue;
- }
-
-#ifdef FLB_HAVE_CHUNK_TRACE
- if (ic->trace) {
- flb_chunk_trace_filter(ic->trace, (void *)f_ins, &tm_start, &tm_finish, out_buf, out_size);
- }
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
- /* Point back the 'data' pointer to the new address */
- ret = cio_chunk_get_content(ic->chunk,
- (char **) &work_data, &cur_size);
- if (ret != CIO_OK) {
- flb_error("[filter] error retrieving data chunk");
- }
- else {
- work_data += (cur_size - out_size);
- work_size = out_size;
- }
- flb_free(out_buf);
- }
- }
- }
-
- flb_free(ntag);
-}
-
-int flb_filter_set_property(struct flb_filter_instance *ins,
- const char *k, const char *v)
-{
- int len;
- int ret;
- flb_sds_t tmp;
- struct flb_kv *kv;
-
- len = strlen(k);
- tmp = flb_env_var_translate(ins->config->env, v);
- if (!tmp) {
- return -1;
- }
-
- /* Check if the key is a known/shared property */
-#ifdef FLB_HAVE_REGEX
- if (prop_key_check("match_regex", k, len) == 0) {
- ins->match_regex = flb_regex_create(tmp);
- flb_sds_destroy(tmp);
- }
- else
-#endif
- if (prop_key_check("match", k, len) == 0) {
- flb_utils_set_plugin_string_property("match", &ins->match, tmp);
- }
- else if (prop_key_check("alias", k, len) == 0 && tmp) {
- flb_utils_set_plugin_string_property("alias", &ins->alias, tmp);
- }
- else if (prop_key_check("log_level", k, len) == 0 && tmp) {
- ret = flb_log_get_level_str(tmp);
- flb_sds_destroy(tmp);
- if (ret == -1) {
- return -1;
- }
- ins->log_level = ret;
- }
- else if (prop_key_check("log_suppress_interval", k, len) == 0 && tmp) {
- ret = flb_utils_time_to_seconds(tmp);
- flb_sds_destroy(tmp);
- if (ret == -1) {
- return -1;
- }
- ins->log_suppress_interval = ret;
- }
- else {
- /*
- * Create the property, we don't pass the value since we will
- * map it directly to avoid an extra memory allocation.
- */
- kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
- if (!kv) {
- if (tmp) {
- flb_sds_destroy(tmp);
- }
- return -1;
- }
- kv->val = tmp;
- }
-
- return 0;
-}
-
-const char *flb_filter_get_property(const char *key,
- struct flb_filter_instance *ins)
-{
- return flb_kv_get_key_value(key, &ins->properties);
-}
-
-void flb_filter_instance_exit(struct flb_filter_instance *ins,
- struct flb_config *config)
-{
- struct flb_filter_plugin *p;
-
- p = ins->p;
- if (p->cb_exit && ins->context) {
- p->cb_exit(ins->context, config);
- }
-}
-
-/* Invoke exit call for the filter plugin */
-void flb_filter_exit(struct flb_config *config)
-{
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_filter_instance *ins;
- struct flb_filter_plugin *p;
-
- mk_list_foreach_safe(head, tmp, &config->filters) {
- ins = mk_list_entry(head, struct flb_filter_instance, _head);
- p = ins->p;
- if (!p) {
- continue;
- }
- flb_filter_instance_exit(ins, config);
- flb_filter_instance_destroy(ins);
- }
-}
-
-struct flb_filter_instance *flb_filter_new(struct flb_config *config,
- const char *filter, void *data)
-{
- int id;
- struct mk_list *head;
- struct flb_filter_plugin *plugin;
- struct flb_filter_instance *instance = NULL;
-
- if (!filter) {
- return NULL;
- }
-
- mk_list_foreach(head, &config->filter_plugins) {
- plugin = mk_list_entry(head, struct flb_filter_plugin, _head);
- if (strcasecmp(plugin->name, filter) == 0) {
- break;
- }
- plugin = NULL;
- }
-
- if (!plugin) {
- return NULL;
- }
-
- instance = flb_calloc(1, sizeof(struct flb_filter_instance));
- if (!instance) {
- flb_errno();
- return NULL;
- }
- instance->config = config;
-
- /*
- * Initialize event type, if not set, default to FLB_FILTER_LOGS. Note that a
- * zero value means it's undefined.
- */
- if (plugin->event_type == 0) {
- instance->event_type = FLB_FILTER_LOGS;
- }
- else {
- instance->event_type = plugin->event_type;
- }
-
- /* Get an ID */
- id = instance_id(config);
-
- /* format name (with instance id) */
- snprintf(instance->name, sizeof(instance->name) - 1,
- "%s.%i", plugin->name, id);
-
- instance->id = id;
- instance->alias = NULL;
- instance->p = plugin;
- instance->data = data;
- instance->match = NULL;
-#ifdef FLB_HAVE_REGEX
- instance->match_regex = NULL;
-#endif
- instance->log_level = -1;
- instance->log_suppress_interval = -1;
-
- mk_list_init(&instance->properties);
- mk_list_add(&instance->_head, &config->filters);
-
- return instance;
-}
-
-/* Return an instance name or alias */
-const char *flb_filter_name(struct flb_filter_instance *ins)
-{
- if (ins->alias) {
- return ins->alias;
- }
-
- return ins->name;
-}
-
-int flb_filter_plugin_property_check(struct flb_filter_instance *ins,
- struct flb_config *config)
-{
- int ret = 0;
- struct mk_list *config_map;
- struct flb_filter_plugin *p = ins->p;
-
- if (p->config_map) {
- /*
- * Create a dynamic version of the configmap that will be used by the specific
- * instance in question.
- */
- config_map = flb_config_map_create(config, p->config_map);
- if (!config_map) {
- flb_error("[filter] error loading config map for '%s' plugin",
- p->name);
- return -1;
- }
- ins->config_map = config_map;
-
- /* Validate incoming properties against config map */
- ret = flb_config_map_properties_check(ins->p->name,
- &ins->properties, ins->config_map);
- if (ret == -1) {
- if (config->program_name) {
- flb_helper("try the command: %s -F %s -h\n",
- config->program_name, ins->p->name);
- }
- return -1;
- }
- }
-
- return 0;
-}
-
-int flb_filter_match_property_existence(struct flb_filter_instance *ins)
-{
- if (!ins->match
-#ifdef FLB_HAVE_REGEX
- && !ins->match_regex
-#endif
- ) {
- return FLB_FALSE;
- }
-
- return FLB_TRUE;
-}
-
-int flb_filter_init(struct flb_config *config, struct flb_filter_instance *ins)
-{
- int ret;
- uint64_t ts;
- char *name;
- struct flb_filter_plugin *p;
-
- if (flb_filter_match_property_existence(ins) == FLB_FALSE) {
- flb_warn("[filter] NO match rule for %s filter instance, unloading.",
- ins->name);
- return -1;
- }
-
- if (ins->log_level == -1 && config->log) {
- ins->log_level = config->log->level;
- }
-
- p = ins->p;
-
- /* Get name or alias for the instance */
- name = (char *) flb_filter_name(ins);
- ts = cfl_time_now();
-
- /* CMetrics */
- ins->cmt = cmt_create();
- if (!ins->cmt) {
- flb_error("[filter] could not create cmetrics context: %s",
- flb_filter_name(ins));
- return -1;
- }
-
- /* Register generic filter plugin metrics */
- ins->cmt_records = cmt_counter_create(ins->cmt,
- "fluentbit", "filter",
- "records_total",
- "Total number of new records processed.",
- 1, (char *[]) {"name"});
- cmt_counter_set(ins->cmt_records, ts, 0, 1, (char *[]) {name});
-
- /* Register generic filter plugin metrics */
- ins->cmt_bytes = cmt_counter_create(ins->cmt,
- "fluentbit", "filter",
- "bytes_total",
- "Total number of new bytes processed.",
- 1, (char *[]) {"name"});
- cmt_counter_set(ins->cmt_bytes, ts, 0, 1, (char *[]) {name});
-
- /* Register generic filter plugin metrics */
- ins->cmt_add_records = cmt_counter_create(ins->cmt,
- "fluentbit", "filter",
- "add_records_total",
- "Total number of new added records.",
- 1, (char *[]) {"name"});
- cmt_counter_set(ins->cmt_add_records, ts, 0, 1, (char *[]) {name});
-
- /* Register generic filter plugin metrics */
- ins->cmt_drop_records = cmt_counter_create(ins->cmt,
- "fluentbit", "filter",
- "drop_records_total",
- "Total number of dropped records.",
- 1, (char *[]) {"name"});
- cmt_counter_set(ins->cmt_drop_records, ts, 0, 1, (char *[]) {name});
-
- /* OLD Metrics API */
-#ifdef FLB_HAVE_METRICS
-
- /* Create the metrics context */
- ins->metrics = flb_metrics_create(name);
- if (!ins->metrics) {
- flb_warn("[filter] cannot initialize metrics for %s filter, "
- "unloading.", name);
- return -1;
- }
-
- /* Register filter metrics */
- flb_metrics_add(FLB_METRIC_N_DROPPED, "drop_records", ins->metrics);
- flb_metrics_add(FLB_METRIC_N_ADDED, "add_records", ins->metrics);
- flb_metrics_add(FLB_METRIC_N_RECORDS, "records", ins->metrics);
- flb_metrics_add(FLB_METRIC_N_BYTES, "bytes", ins->metrics);
-#endif
-
- /*
- * Before to call the initialization callback, make sure that the received
- * configuration parameters are valid if the plugin is registering a config map.
- */
- if (flb_filter_plugin_property_check(ins, config) == -1) {
- return -1;
- }
-
- if (is_active(&ins->properties) == FLB_FALSE) {
- return 0;
- }
-
- /* Initialize the input */
- if (p->cb_init) {
- ret = p->cb_init(ins, config, ins->data);
- if (ret != 0) {
- flb_error("Failed initialize filter %s", ins->name);
- return -1;
- }
- }
-
- return 0;
-}
-
-/* Initialize all filter plugins */
-int flb_filter_init_all(struct flb_config *config)
-{
- int ret;
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_filter_instance *ins;
-
- /* Iterate all active filter instance plugins */
- mk_list_foreach_safe(head, tmp, &config->filters) {
- ins = mk_list_entry(head, struct flb_filter_instance, _head);
- ret = flb_filter_init(config, ins);
- if (ret == -1) {
- flb_filter_instance_destroy(ins);
- return -1;
- }
- }
-
- return 0;
-}
-
-void flb_filter_instance_destroy(struct flb_filter_instance *ins)
-{
- if (!ins) {
- return;
- }
-
- /* destroy config map */
- if (ins->config_map) {
- flb_config_map_destroy(ins->config_map);
- }
-
- /* release properties */
- flb_kv_release(&ins->properties);
-
- if (ins->match != NULL) {
- flb_sds_destroy(ins->match);
- }
-
-#ifdef FLB_HAVE_REGEX
- if (ins->match_regex) {
- flb_regex_destroy(ins->match_regex);
- }
-#endif
-
- /* Remove metrics */
-#ifdef FLB_HAVE_METRICS
- if (ins->cmt) {
- cmt_destroy(ins->cmt);
- }
-
- if (ins->metrics) {
- flb_metrics_destroy(ins->metrics);
- }
-#endif
- if (ins->alias) {
- flb_sds_destroy(ins->alias);
- }
-
- mk_list_del(&ins->_head);
- flb_free(ins);
-}
-
-void flb_filter_set_context(struct flb_filter_instance *ins, void *context)
-{
- ins->context = context;
-}