diff options
Diffstat (limited to 'src/fluent-bit/plugins/filter_grep/grep.c')
-rw-r--r-- | src/fluent-bit/plugins/filter_grep/grep.c | 434 |
1 files changed, 434 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_grep/grep.c b/src/fluent-bit/plugins/filter_grep/grep.c new file mode 100644 index 000000000..7a4657093 --- /dev/null +++ b/src/fluent-bit/plugins/filter_grep/grep.c @@ -0,0 +1,434 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <sys/types.h> + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_kv.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_str.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_filter_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_regex.h> +#include <fluent-bit/flb_record_accessor.h> +#include <fluent-bit/flb_log_event_decoder.h> +#include <fluent-bit/flb_log_event_encoder.h> +#include <msgpack.h> + +#include "grep.h" + +static void delete_rules(struct grep_ctx *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct grep_rule *rule; + + mk_list_foreach_safe(head, tmp, &ctx->rules) { + rule = mk_list_entry(head, struct grep_rule, _head); + flb_sds_destroy(rule->field); + flb_free(rule->regex_pattern); + flb_ra_destroy(rule->ra); + flb_regex_destroy(rule->regex); + mk_list_del(&rule->_head); + flb_free(rule); + } +} + +static int set_rules(struct grep_ctx *ctx, struct flb_filter_instance *f_ins) +{ + int first_rule = GREP_NO_RULE; + flb_sds_t tmp; + struct mk_list *head; + struct mk_list *split; + struct flb_split_entry *sentry; + struct flb_kv *kv; + struct grep_rule *rule; + + /* Iterate all filter properties */ + mk_list_foreach(head, &f_ins->properties) { + kv = mk_list_entry(head, struct flb_kv, _head); + + /* Create a new rule */ + rule = flb_malloc(sizeof(struct grep_rule)); + if (!rule) { + flb_errno(); + return -1; + } + + /* Get the type */ + if (strcasecmp(kv->key, "regex") == 0) { + rule->type = GREP_REGEX; + } + else if (strcasecmp(kv->key, "exclude") == 0) { + rule->type = GREP_EXCLUDE; + } + else { + /* Other property. Skip */ + flb_free(rule); + continue; + } + + if (ctx->logical_op != GREP_LOGICAL_OP_LEGACY && first_rule != GREP_NO_RULE) { + /* 'AND'/'OR' case */ + if (first_rule != rule->type) { + flb_plg_error(ctx->ins, "Both 'regex' and 'exclude' are set."); + delete_rules(ctx); + flb_free(rule); + return -1; + } + } + first_rule = rule->type; + + /* As a value we expect a pair of field name and a regular expression */ + split = flb_utils_split(kv->val, ' ', 1); + if (mk_list_size(split) != 2) { + flb_plg_error(ctx->ins, + "invalid regex, expected field and regular expression"); + delete_rules(ctx); + flb_free(rule); + flb_utils_split_free(split); + return -1; + } + + /* Get first value (field) */ + sentry = mk_list_entry_first(split, struct flb_split_entry, _head); + if (*sentry->value == '$') { + rule->field = flb_sds_create_len(sentry->value, sentry->len); + } + else { + rule->field = flb_sds_create_size(sentry->len + 2); + tmp = flb_sds_cat(rule->field, "$", 1); + rule->field = tmp; + + tmp = flb_sds_cat(rule->field, sentry->value, sentry->len); + rule->field = tmp; + } + + /* Get remaining content (regular expression) */ + sentry = mk_list_entry_last(split, struct flb_split_entry, _head); + rule->regex_pattern = flb_strndup(sentry->value, sentry->len); + if (rule->regex_pattern == NULL) { + flb_errno(); + delete_rules(ctx); + flb_free(rule); + flb_utils_split_free(split); + return -1; + } + + /* Release split */ + flb_utils_split_free(split); + + /* Create a record accessor context for this rule */ + rule->ra = flb_ra_create(rule->field, FLB_FALSE); + if (!rule->ra) { + flb_plg_error(ctx->ins, "invalid record accessor? '%s'", rule->field); + delete_rules(ctx); + flb_free(rule); + return -1; + } + + /* Convert string to regex pattern */ + rule->regex = flb_regex_create(rule->regex_pattern); + if (!rule->regex) { + flb_plg_error(ctx->ins, "could not compile regex pattern '%s'", + rule->regex_pattern); + delete_rules(ctx); + flb_free(rule); + return -1; + } + + /* Link to parent list */ + mk_list_add(&rule->_head, &ctx->rules); + } + + return 0; +} + +/* Given a msgpack record, do some filter action based on the defined rules */ +static inline int grep_filter_data(msgpack_object map, struct grep_ctx *ctx) +{ + ssize_t ret; + struct mk_list *head; + struct grep_rule *rule; + + /* For each rule, validate against map fields */ + mk_list_foreach(head, &ctx->rules) { + rule = mk_list_entry(head, struct grep_rule, _head); + + ret = flb_ra_regex_match(rule->ra, map, rule->regex, NULL); + if (ret <= 0) { /* no match */ + if (rule->type == GREP_REGEX) { + return GREP_RET_EXCLUDE; + } + } + else { + if (rule->type == GREP_EXCLUDE) { + return GREP_RET_EXCLUDE; + } + else { + return GREP_RET_KEEP; + } + } + } + + return GREP_RET_KEEP; +} + +static int cb_grep_init(struct flb_filter_instance *f_ins, + struct flb_config *config, + void *data) +{ + int ret; + size_t len; + const char* val; + struct grep_ctx *ctx; + + /* Create context */ + ctx = flb_malloc(sizeof(struct grep_ctx)); + if (!ctx) { + flb_errno(); + return -1; + } + if (flb_filter_config_map_set(f_ins, ctx) < 0) { + flb_errno(); + flb_plg_error(f_ins, "configuration error"); + flb_free(ctx); + return -1; + } + mk_list_init(&ctx->rules); + ctx->ins = f_ins; + + ctx->logical_op = GREP_LOGICAL_OP_LEGACY; + val = flb_filter_get_property("logical_op", f_ins); + if (val != NULL) { + len = strlen(val); + if (len == 3 && strncasecmp("AND", val, len) == 0) { + flb_plg_info(ctx->ins, "AND mode"); + ctx->logical_op = GREP_LOGICAL_OP_AND; + } + else if (len == 2 && strncasecmp("OR", val, len) == 0) { + flb_plg_info(ctx->ins, "OR mode"); + ctx->logical_op = GREP_LOGICAL_OP_OR; + } + else if (len == 6 && strncasecmp("legacy", val, len) == 0) { + flb_plg_info(ctx->ins, "legacy mode"); + ctx->logical_op = GREP_LOGICAL_OP_LEGACY; + } + } + + /* Load rules */ + ret = set_rules(ctx, f_ins); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + /* Set our context */ + flb_filter_set_context(f_ins, ctx); + return 0; +} + +static inline int grep_filter_data_and_or(msgpack_object map, struct grep_ctx *ctx) +{ + ssize_t ra_ret; + int found = FLB_FALSE; + struct mk_list *head; + struct grep_rule *rule; + + /* For each rule, validate against map fields */ + mk_list_foreach(head, &ctx->rules) { + found = FLB_FALSE; + rule = mk_list_entry(head, struct grep_rule, _head); + + ra_ret = flb_ra_regex_match(rule->ra, map, rule->regex, NULL); + if (ra_ret > 0) { + found = FLB_TRUE; + } + + if (ctx->logical_op == GREP_LOGICAL_OP_OR && found == FLB_TRUE) { + /* OR case: One rule is matched. */ + goto grep_filter_data_and_or_end; + } + else if (ctx->logical_op == GREP_LOGICAL_OP_AND && found == FLB_FALSE) { + /* AND case: One rule is not matched */ + goto grep_filter_data_and_or_end; + } + } + + grep_filter_data_and_or_end: + if (rule->type == GREP_REGEX) { + return found ? GREP_RET_KEEP : GREP_RET_EXCLUDE; + } + + /* rule is exclude */ + return found ? GREP_RET_EXCLUDE : GREP_RET_KEEP; +} + +static int cb_grep_filter(const void *data, size_t bytes, + const char *tag, int tag_len, + void **out_buf, size_t *out_size, + struct flb_filter_instance *f_ins, + struct flb_input_instance *i_ins, + void *context, + struct flb_config *config) +{ + int ret; + int old_size = 0; + int new_size = 0; + msgpack_object map; + struct flb_log_event_encoder log_encoder; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + struct grep_ctx *ctx; + + (void) f_ins; + (void) i_ins; + (void) config; + + ctx = (struct grep_ctx *) context; + + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + return FLB_FILTER_NOTOUCH; + } + + ret = flb_log_event_encoder_init(&log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event encoder initialization error : %d", ret); + + flb_log_event_decoder_destroy(&log_decoder); + + return FLB_FILTER_NOTOUCH; + } + + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + old_size++; + + /* get time and map */ + map = *log_event.body; + + if (ctx->logical_op == GREP_LOGICAL_OP_LEGACY) { + ret = grep_filter_data(map, ctx); + } + else { + ret = grep_filter_data_and_or(map, ctx); + } + + if (ret == GREP_RET_KEEP) { + ret = flb_log_event_encoder_emit_raw_record( + &log_encoder, + log_decoder.record_base, + log_decoder.record_length); + + new_size++; + } + else if (ret == GREP_RET_EXCLUDE) { + /* Do nothing */ + } + } + + if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA && + log_decoder.offset == bytes) { + ret = FLB_EVENT_ENCODER_SUCCESS; + } + + flb_log_event_decoder_destroy(&log_decoder); + + /* we keep everything ? */ + if (old_size == new_size) { + flb_log_event_encoder_destroy(&log_encoder); + + /* Destroy the buffer to avoid more overhead */ + return FLB_FILTER_NOTOUCH; + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + *out_buf = log_encoder.output_buffer; + *out_size = log_encoder.output_length; + + ret = FLB_FILTER_MODIFIED; + + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); + } + else { + flb_plg_error(ctx->ins, + "Log event encoder error : %d", ret); + + ret = FLB_FILTER_NOTOUCH; + } + + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + + return ret; +} + +static int cb_grep_exit(void *data, struct flb_config *config) +{ + struct grep_ctx *ctx = data; + + if (!ctx) { + return 0; + } + + delete_rules(ctx); + flb_free(ctx); + return 0; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "regex", NULL, + FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, + "Keep records in which the content of KEY matches the regular expression." + }, + { + FLB_CONFIG_MAP_STR, "exclude", NULL, + FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, + "Exclude records in which the content of KEY matches the regular expression." + }, + { + FLB_CONFIG_MAP_STR, "logical_op", "legacy", + 0, FLB_FALSE, 0, + "Specify whether to use logical conjuciton or disjunction. legacy, AND and OR are allowed." + }, + {0} +}; + +struct flb_filter_plugin filter_grep_plugin = { + .name = "grep", + .description = "grep events by specified field values", + .cb_init = cb_grep_init, + .cb_filter = cb_grep_filter, + .cb_exit = cb_grep_exit, + .config_map = config_map, + .flags = 0 +}; |