summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/filter_expect/expect.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/filter_expect/expect.c')
-rw-r--r--src/fluent-bit/plugins/filter_expect/expect.c614
1 files changed, 614 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_expect/expect.c b/src/fluent-bit/plugins/filter_expect/expect.c
new file mode 100644
index 000000000..102085c02
--- /dev/null
+++ b/src/fluent-bit/plugins/filter_expect/expect.c
@@ -0,0 +1,614 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_filter_plugin.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_ra_key.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#include "expect.h"
+#include <msgpack.h>
+
+static int key_to_type(char *key)
+{
+ if (strcasecmp(key, "key_exists") == 0) {
+ return FLB_EXP_KEY_EXISTS;
+ }
+ else if (strcasecmp(key, "key_not_exists") == 0) {
+ return FLB_EXP_KEY_NOT_EXISTS;
+ }
+ else if (strcasecmp(key, "key_val_is_null") == 0) {
+ return FLB_EXP_KEY_VAL_NULL;
+ }
+ else if (strcasecmp(key, "key_val_is_not_null") == 0) {
+ return FLB_EXP_KEY_VAL_NOT_NULL;
+ }
+ else if (strcasecmp(key, "key_val_eq") == 0) {
+ return FLB_EXP_KEY_VAL_EQ;
+ }
+
+ return -1;
+}
+
+/* Create a rule */
+static struct flb_expect_rule *rule_create(struct flb_expect *ctx,
+ int type, char *value)
+{
+ int ret;
+ struct mk_list *list;
+ struct flb_slist_entry *key;
+ struct flb_slist_entry *val;
+ struct flb_expect_rule *rule;
+
+ rule = flb_calloc(1, sizeof(struct flb_expect_rule));
+ if (!rule) {
+ flb_errno();
+ return NULL;
+ }
+ rule->type = type;
+ rule->value = value;
+ rule->expect = NULL;
+
+ /* Only the rule 'key_val_eq' expects two values from the configuration */
+ if (type == FLB_EXP_KEY_VAL_EQ) {
+ list = flb_malloc(sizeof(struct mk_list));
+ if (!list) {
+ flb_errno();
+ flb_free(rule);
+ return NULL;
+ }
+ mk_list_init(list);
+ ret = flb_slist_split_string(list, value, ' ', 1);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error reading list of options '%s'",
+ value);
+ flb_free(rule);
+ return NULL;
+ }
+
+ /* Get the 'key' and the expected value */
+ key = mk_list_entry_first(list, struct flb_slist_entry, _head);
+ val = mk_list_entry_last(list, struct flb_slist_entry, _head);
+
+ rule->ra = flb_ra_create(key->str, FLB_TRUE);
+ if (!rule->ra) {
+ flb_plg_error(ctx->ins, "error processing accessor key '%s'",
+ key->str);
+ flb_slist_destroy(list);
+ flb_free(list);
+ flb_free(rule);
+ return NULL;
+ }
+ rule->expect = flb_sds_create(val->str);
+ flb_slist_destroy(list);
+ flb_free(list);
+ }
+ else {
+ rule->ra = flb_ra_create(value, FLB_TRUE);
+ if (!rule->ra) {
+ flb_plg_error(ctx->ins, "error processing accessor key '%s'",
+ value);
+ flb_free(rule);
+ return NULL;
+ }
+ }
+
+ return rule;
+}
+
+static void rule_destroy(struct flb_expect_rule *rule)
+{
+ if (rule->expect) {
+ flb_sds_destroy(rule->expect);
+ }
+ if (rule->ra) {
+ flb_ra_destroy(rule->ra);
+ }
+
+ flb_free(rule);
+}
+
+static void context_destroy(struct flb_expect *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_expect_rule *rule;
+
+ mk_list_foreach_safe(head, tmp, &ctx->rules) {
+ rule = mk_list_entry(head, struct flb_expect_rule, _head);
+ mk_list_del(&rule->_head);
+ rule_destroy(rule);
+ }
+ flb_free(ctx);
+}
+
+static struct flb_expect *context_create(struct flb_filter_instance *ins,
+ struct flb_config *config)
+{
+ int i = 0;
+ int type;
+ int ret;
+ flb_sds_t tmp;
+ struct flb_kv *kv;
+ struct mk_list *head;
+ struct flb_expect *ctx;
+ struct flb_expect_rule *rule;
+
+ ctx = flb_calloc(1, sizeof(struct flb_expect));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->ins = ins;
+ ctx->action = FLB_EXP_WARN;
+ mk_list_init(&ctx->rules);
+
+ /* Get the action property */
+ tmp = (char *) flb_filter_get_property("action", ins);
+ if (tmp) {
+ if (strcasecmp(tmp, "warn") == 0) {
+ ctx->action = FLB_EXP_WARN;
+ }
+ else if (strcasecmp(tmp, "exit") == 0) {
+ ctx->action = FLB_EXP_EXIT;
+ }
+ else if (strcasecmp(tmp, "result_key") == 0) {
+ ctx->action = FLB_EXP_RESULT_KEY;
+ }
+ else {
+ flb_plg_error(ctx->ins, "unexpected 'action' value '%s'", tmp);
+ flb_free(ctx);
+ return NULL;
+ }
+ }
+
+ /* Load config map */
+ ret = flb_filter_config_map_set(ins, ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return NULL;
+ }
+
+ /* Read the configuration properties */
+ mk_list_foreach(head, &ins->properties) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+
+ /* Validate the type of the rule */
+ type = key_to_type(kv->key);
+ if (strcasecmp(kv->key, "result_key") == 0) {
+ /* skip */
+ continue;
+ }
+
+ if (type == -1 && strcasecmp(kv->key, "action") != 0) {
+ flb_plg_error(ctx->ins, "unknown configuration rule '%s'", kv->key);
+ context_destroy(ctx);
+ return NULL;
+ }
+
+ rule = rule_create(ctx, type, kv->val);
+ if (!rule) {
+ context_destroy(ctx);
+ return NULL;
+ }
+ mk_list_add(&rule->_head, &ctx->rules);
+
+ /* Debug message */
+ if (rule->type == -1) {
+ flb_plg_debug(ctx->ins, "action : '%s'", kv->val);
+ }
+ else {
+ flb_plg_debug(ctx->ins, "rule #%i: '%s', expects: '%s'",
+ i, kv->key, kv->val);
+ }
+ i++;
+ }
+
+ return ctx;
+
+}
+
+static int cb_expect_init(struct flb_filter_instance *ins,
+ struct flb_config *config,
+ void *data)
+{
+ struct flb_expect *ctx;
+
+ /* Create the plugin context */
+ ctx = context_create(ins, config);
+ if (!ctx) {
+ return -1;
+ }
+
+ /* Set filter context */
+ flb_filter_set_context(ins, ctx);
+
+ if (mk_list_size(&ctx->rules) == 0) {
+ flb_plg_warn(ctx->ins, "no rules has been defined");
+ }
+
+ return 0;
+}
+
+static char *ra_value_type_to_str(struct flb_ra_value *val)
+{
+ if (val->type == FLB_RA_BOOL) {
+ return "boolean";
+ }
+ else if (val->type == FLB_RA_INT) {
+ return "integer";
+ }
+ else if (val->type == FLB_RA_FLOAT) {
+ return "float / double";
+ }
+ else if (val->type == FLB_RA_STRING) {
+ return "string";
+ }
+ else if (val->type == FLB_RA_NULL) {
+ return "null";
+ }
+
+ return "UNKNOWN";
+}
+
+static int rule_apply(struct flb_expect *ctx, msgpack_object map)
+{
+ int n = 0;
+ char *json;
+ size_t size = 1024;
+ struct mk_list *head;
+ struct flb_expect_rule *rule;
+ struct flb_ra_value *val;
+
+ mk_list_foreach(head, &ctx->rules) {
+ rule = mk_list_entry(head, struct flb_expect_rule, _head);
+
+ val = flb_ra_get_value_object(rule->ra, map);
+ if (rule->type == FLB_EXP_KEY_EXISTS) {
+ if (val) {
+ flb_ra_key_value_destroy(val);
+ n++;
+ continue;
+ }
+
+ json = flb_msgpack_to_json_str(size, &map);
+ flb_plg_error(ctx->ins,
+ "exception on rule #%i 'key_exists', key '%s' "
+ "not found. Record content:\n%s",
+ n, rule->value, json);
+ flb_free(json);
+ return FLB_FALSE;
+ }
+ else if (rule->type == FLB_EXP_KEY_NOT_EXISTS) {
+ if (!val) {
+ n++;
+ continue;
+ }
+ json = flb_msgpack_to_json_str(size, &map);
+ flb_plg_error(ctx->ins,
+ "exception on rule #%i 'key_not_exists', key '%s' "
+ "exists. Record content:\n%s",
+ n, rule->value, json);
+ flb_free(json);
+ flb_ra_key_value_destroy(val);
+ return FLB_FALSE;
+ }
+ else if (rule->type == FLB_EXP_KEY_VAL_NULL) {
+ if (!val) {
+ json = flb_msgpack_to_json_str(size, &map);
+ flb_plg_error(ctx->ins,
+ "exception on rule #%i 'key_val_is_null', "
+ "key '%s' not found. Record content:\n%s",
+ n, rule->value, json);
+ flb_free(json);
+ return FLB_FALSE;
+ }
+ if (val->type != FLB_RA_NULL) {
+ json = flb_msgpack_to_json_str(size, &map);
+ flb_plg_error(ctx->ins,
+ "exception on rule #%i 'key_val_is_null', "
+ "key '%s' contains a value type '%s'. "
+ "Record content:\n%s",
+ n, rule->value,
+ ra_value_type_to_str(val), json);
+ flb_free(json);
+ flb_ra_key_value_destroy(val);
+ return FLB_FALSE;
+ }
+ flb_ra_key_value_destroy(val);
+ }
+ else if (rule->type == FLB_EXP_KEY_VAL_NOT_NULL) {
+ if (!val) {
+ json = flb_msgpack_to_json_str(size, &map);
+ flb_plg_error(ctx->ins,
+ "exception on rule #%i 'key_val_is_not_null', "
+ "key '%s' not found. Record content:\n%s",
+ n, rule->value, json);
+ flb_free(json);
+ return FLB_FALSE;
+ }
+ if (val->type == FLB_RA_NULL) {
+ json = flb_msgpack_to_json_str(size, &map);
+ flb_plg_error(ctx->ins,
+ "exception on rule #%i 'key_val_is_not_null', "
+ "key '%s' contains a value type '%s'. "
+ "Record content:\n%s",
+ n, rule->value,
+ ra_value_type_to_str(val), json);
+ flb_free(json);
+ flb_ra_key_value_destroy(val);
+ return FLB_FALSE;
+ }
+ flb_ra_key_value_destroy(val);
+ }
+ else if (rule->type == FLB_EXP_KEY_VAL_EQ) {
+ if (!val) {
+ json = flb_msgpack_to_json_str(size, &map);
+ flb_plg_error(ctx->ins,
+ "exception on rule #%i 'key_val_is_null', "
+ "key '%s' not found. Record content:\n%s",
+ n, rule->value, json);
+ flb_free(json);
+ return FLB_FALSE;
+ }
+
+ if (val->type == FLB_RA_STRING) {
+ if (flb_sds_cmp(val->val.string, rule->expect,
+ flb_sds_len(rule->expect)) != 0) {
+ json = flb_msgpack_to_json_str(size, &map);
+ flb_plg_error(ctx->ins,
+ "exception on rule #%i 'key_val_eq', "
+ "key value '%s' is different than "
+ "expected: '%s'. Record content:\n%s",
+ n, val->val.string, rule->expect, json);
+ flb_free(json);
+ flb_ra_key_value_destroy(val);
+ return FLB_FALSE;
+ }
+ }
+ flb_ra_key_value_destroy(val);
+ }
+ n++;
+ }
+
+ return FLB_TRUE;
+}
+
+static int cb_expect_filter(const void *data, size_t bytes,
+ const char *tag, int tag_len,
+ void **out_buf, size_t *out_bytes,
+ struct flb_filter_instance *f_ins,
+ struct flb_input_instance *i_ins,
+ void *filter_context,
+ struct flb_config *config)
+{
+ int ret;
+ int i;
+ int rule_matched = FLB_TRUE;
+ msgpack_object_kv *kv;
+ struct flb_expect *ctx = filter_context;
+ struct flb_log_event_encoder log_encoder;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ (void) out_buf;
+ (void) out_bytes;
+ (void) f_ins;
+ (void) i_ins;
+ (void) config;
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ ret = rule_apply(ctx, *log_event.body);
+ if (ret == FLB_TRUE) {
+ /* rule matches, we are good */
+ continue;
+ }
+ else {
+ if (ctx->action == FLB_EXP_WARN) {
+ flb_plg_warn(ctx->ins, "expect check failed");
+ }
+ else if (ctx->action == FLB_EXP_EXIT) {
+ flb_engine_exit_status(config, 255);
+ }
+ else if (ctx->action == FLB_EXP_RESULT_KEY) {
+ rule_matched = FLB_FALSE;
+ }
+ break;
+ }
+ }
+
+ ret = 0;
+ /* Append result key when action is "result_key"*/
+ if (ctx->action == FLB_EXP_RESULT_KEY) {
+ flb_log_event_decoder_reset(&log_decoder, (char *) data, bytes);
+
+ ret = flb_log_event_encoder_init(&log_encoder,
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event encoder initialization error : %d", ret);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ ret = flb_log_event_encoder_begin_record(&log_encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_timestamp(
+ &log_encoder, &log_event.timestamp);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_metadata_from_msgpack_object(&log_encoder,
+ log_event.metadata);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_STRING_VALUE(ctx->result_key, flb_sds_len(ctx->result_key)),
+ FLB_LOG_EVENT_BOOLEAN_VALUE(rule_matched));
+ }
+
+ kv = log_event.body->via.map.ptr;
+ for (i=0 ;
+ i < log_event.body->via.map.size &&
+ ret == FLB_EVENT_ENCODER_SUCCESS ;
+ i++) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key),
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val));
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_commit_record(&log_encoder);
+ }
+ }
+
+ if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA &&
+ log_decoder.offset == bytes) {
+ ret = FLB_EVENT_ENCODER_SUCCESS;
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ *out_buf = log_encoder.output_buffer;
+ *out_bytes = log_encoder.output_length;
+
+ ret = FLB_FILTER_MODIFIED;
+
+ flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "Log event encoder error : %d", ret);
+
+ ret = FLB_FILTER_NOTOUCH;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return ret;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return FLB_FILTER_NOTOUCH;
+}
+
+static int cb_expect_exit(void *data, struct flb_config *config)
+{
+ struct flb_expect *ctx = data;
+ (void) config;
+
+ if (!ctx) {
+ return 0;
+ }
+
+ context_destroy(ctx);
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] =
+{
+ /* rule: the key exists in the record */
+ {
+ FLB_CONFIG_MAP_STR, "key_exists", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "check that the given key name exists in the record"
+ },
+
+ /* rule: the key not exists in the record */
+ {
+ FLB_CONFIG_MAP_STR, "key_not_exists", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "check that the given key name do not exists in the record"
+ },
+
+ /* rule: the value of the key is NULL */
+ {
+ FLB_CONFIG_MAP_STR, "key_val_is_null", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "check that the value of the key is NULL"
+ },
+
+ /* rule: the value of the key is NOT NULL */
+ {
+ FLB_CONFIG_MAP_STR, "key_val_is_not_null", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "check that the value of the key is NOT NULL"
+ },
+
+ /* rule: the value of the key is equal a given value */
+ {
+ FLB_CONFIG_MAP_SLIST_1, "key_val_eq", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "check that the value of the key equals the given value"
+ },
+
+ /* rule action: the value of the key is equal a given value */
+ {
+ FLB_CONFIG_MAP_STR, "action", "warn",
+ 0, FLB_FALSE, 0,
+ "action to take when a rule does not match: 'warn', 'exit' or 'result_key'."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "result_key", "matched",
+ 0, FLB_TRUE, offsetof(struct flb_expect, result_key),
+ "specify the key name to append a boolean that indicates rule is matched or not. "
+ "This key is to be used only when 'action' is 'result_key'."
+ },
+
+ /* EOF */
+ {0}
+};
+
+struct flb_filter_plugin filter_expect_plugin = {
+ .name = "expect",
+ .description = "Validate expected keys and values",
+ .cb_init = cb_expect_init,
+ .cb_filter = cb_expect_filter,
+ .cb_exit = cb_expect_exit,
+ .config_map = config_map,
+ .flags = 0
+};