summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/filter_modify/modify.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/filter_modify/modify.c')
-rw-r--r--src/fluent-bit/plugins/filter_modify/modify.c1659
1 files changed, 1659 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_modify/modify.c b/src/fluent-bit/plugins/filter_modify/modify.c
new file mode 100644
index 000000000..22d2e21c0
--- /dev/null
+++ b/src/fluent-bit/plugins/filter_modify/modify.c
@@ -0,0 +1,1659 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_filter_plugin.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_regex.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_record_accessor.h>
+#include <fluent-bit/flb_ra_key.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+#include <msgpack.h>
+
+#include "modify.h"
+
+#include <stdio.h>
+#include <sys/types.h>
+
+static void condition_free(struct modify_condition *condition)
+{
+ if (condition == NULL) {
+ return;
+ }
+
+ if (condition->a) {
+ flb_sds_destroy(condition->a);
+ }
+ if (condition->b) {
+ flb_free(condition->b);
+ }
+ if (condition->raw_k) {
+ flb_free(condition->raw_k);
+ }
+ if (condition->raw_v) {
+ flb_free(condition->raw_v);
+ }
+
+ if (condition->a_regex) {
+ flb_regex_destroy(condition->a_regex);
+ }
+ if (condition->b_regex) {
+ flb_regex_destroy(condition->b_regex);
+ }
+ if (condition->ra_a) {
+ flb_ra_destroy(condition->ra_a);
+ condition->ra_a = NULL;
+ }
+ if (!mk_list_entry_is_orphan(&condition->_head)) {
+ mk_list_del(&condition->_head);
+ }
+ flb_free(condition);
+}
+
+static void rule_free(struct modify_rule *rule)
+{
+ if (rule == NULL) {
+ return;
+ }
+
+ if (rule->key) {
+ flb_free(rule->key);
+ }
+ if (rule->val) {
+ flb_free(rule->val);
+ }
+ if (rule->raw_k) {
+ flb_free(rule->raw_k);
+ }
+ if (rule->raw_v) {
+ flb_free(rule->raw_v);
+ }
+ if (rule->key_regex) {
+ flb_regex_destroy(rule->key_regex);
+ }
+ if (rule->val_regex) {
+ flb_regex_destroy(rule->val_regex);
+ }
+ if (!mk_list_entry_is_orphan(&rule->_head)) {
+ mk_list_del(&rule->_head);
+ }
+ flb_free(rule);
+}
+
+static void teardown(struct filter_modify_ctx *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+
+ struct modify_rule *rule;
+ struct modify_condition *condition;
+
+ mk_list_foreach_safe(head, tmp, &ctx->conditions) {
+ condition = mk_list_entry(head, struct modify_condition, _head);
+ condition_free(condition);
+ }
+
+ mk_list_foreach_safe(head, tmp, &ctx->rules) {
+ rule = mk_list_entry(head, struct modify_rule, _head);
+ rule_free(rule);
+ }
+}
+
+static void helper_pack_string(struct filter_modify_ctx *ctx,
+ msgpack_packer *packer, const char *str,
+ int len)
+{
+
+ if (str == NULL) {
+ flb_plg_error(ctx->ins, "helper_pack_string : NULL passed");
+ msgpack_pack_nil(packer);
+ }
+ else {
+ msgpack_pack_str(packer, len);
+ msgpack_pack_str_body(packer, str, len);
+ }
+}
+
+static int setup(struct filter_modify_ctx *ctx,
+ struct flb_filter_instance *f_ins, struct flb_config *config)
+{
+ struct mk_list *head;
+ struct mk_list *split;
+ struct flb_kv *kv;
+ struct flb_split_entry *sentry;
+ struct modify_rule *rule = NULL;
+ struct modify_condition *condition;
+
+ int list_size;
+
+ // Split list
+ // - Arg 1 is condition?
+ // --> Setup Condition
+ // - Malloc Condition
+ // - Switch list size
+ // --> Setup Rule
+ // - Malloc Rule
+ // - Switch list size
+
+ if (flb_filter_config_map_set(f_ins, ctx) < 0) {
+ flb_errno();
+ flb_plg_error(f_ins, "configuration error");
+ return -1;
+ }
+
+ mk_list_foreach(head, &f_ins->properties) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+
+ split = flb_utils_split_quoted(kv->val, ' ', 3);
+ list_size = mk_list_size(split);
+
+ // Conditions are,
+ // CONDITION CONDITIONTYPE VAL_A VAL_B
+
+ if (list_size == 0 || list_size > 3) {
+ flb_plg_error(ctx->ins, "Invalid config for %s", kv->key);
+ teardown(ctx);
+ flb_utils_split_free(split);
+ return -1;
+ }
+ else if (strcasecmp(kv->key, "condition") == 0) {
+
+ //
+ // Build a condition
+ //
+
+ condition = flb_calloc(1, sizeof(struct modify_condition));
+ if (!condition) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "Unable to allocate memory for "
+ "condition");
+ teardown(ctx);
+ flb_utils_split_free(split);
+ return -1;
+ }
+
+ condition->a_is_regex = false;
+ condition->b_is_regex = false;
+ condition->ra_a = NULL;
+ condition->raw_k = flb_strndup(kv->key, flb_sds_len(kv->key));
+ if (condition->raw_k == NULL) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "Unable to allocate memory for "
+ "condition->raw_k");
+ teardown(ctx);
+ condition_free(condition);
+ flb_utils_split_free(split);
+ return -1;
+ }
+ condition->raw_v = flb_strndup(kv->val, flb_sds_len(kv->val));
+ if (condition->raw_v == NULL) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "Unable to allocate memory for "
+ "condition->raw_v");
+ teardown(ctx);
+ condition_free(condition);
+ flb_utils_split_free(split);
+ return -1;
+ }
+
+ sentry =
+ mk_list_entry_first(split, struct flb_split_entry, _head);
+
+ if (strcasecmp(sentry->value, "key_exists") == 0) {
+ condition->conditiontype = KEY_EXISTS;
+ }
+ else if (strcasecmp(sentry->value, "key_does_not_exist") == 0) {
+ condition->conditiontype = KEY_DOES_NOT_EXIST;
+ }
+ else if (strcasecmp(sentry->value, "a_key_matches") == 0) {
+ condition->conditiontype = A_KEY_MATCHES;
+ condition->a_is_regex = true;
+ }
+ else if (strcasecmp(sentry->value, "no_key_matches") == 0) {
+ condition->conditiontype = NO_KEY_MATCHES;
+ condition->a_is_regex = true;
+ }
+ else if (strcasecmp(sentry->value, "key_value_equals") == 0) {
+ condition->conditiontype = KEY_VALUE_EQUALS;
+ }
+ else if (strcasecmp(sentry->value, "key_value_does_not_equal") ==
+ 0) {
+ condition->conditiontype = KEY_VALUE_DOES_NOT_EQUAL;
+ }
+ else if (strcasecmp(sentry->value, "key_value_matches") == 0) {
+ condition->conditiontype = KEY_VALUE_MATCHES;
+ condition->b_is_regex = true;
+ }
+ else if (strcasecmp(sentry->value, "key_value_does_not_match") ==
+ 0) {
+ condition->conditiontype = KEY_VALUE_DOES_NOT_MATCH;
+ condition->b_is_regex = true;
+ }
+ else if (strcasecmp
+ (sentry->value,
+ "matching_keys_have_matching_values") == 0) {
+ condition->conditiontype = MATCHING_KEYS_HAVE_MATCHING_VALUES;
+ condition->a_is_regex = true;
+ condition->b_is_regex = true;
+ }
+ else if (strcasecmp
+ (sentry->value,
+ "matching_keys_do_not_have_matching_values") == 0) {
+ condition->conditiontype =
+ MATCHING_KEYS_DO_NOT_HAVE_MATCHING_VALUES;
+ condition->a_is_regex = true;
+ condition->b_is_regex = true;
+ }
+ else {
+ flb_plg_error(ctx->ins, "Invalid config for %s : %s",
+ kv->key, kv->val);
+ teardown(ctx);
+ condition_free(condition);
+ flb_utils_split_free(split);
+ return -1;
+ }
+
+ sentry =
+ mk_list_entry_next(&sentry->_head, struct flb_split_entry,
+ _head, split);
+ condition->a = flb_sds_create_len(sentry->value, sentry->len);
+ condition->a_len = sentry->len;
+ condition->ra_a = flb_ra_create(condition->a, FLB_FALSE);
+ if (list_size == 3) {
+ sentry =
+ mk_list_entry_last(split, struct flb_split_entry, _head);
+ condition->b = flb_strndup(sentry->value, sentry->len);
+ if (condition->b == NULL) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "Unable to allocate memory for "
+ "condition->b");
+ teardown(ctx);
+ condition_free(condition);
+ flb_utils_split_free(split);
+ return -1;
+ }
+ condition->b_len = sentry->len;
+ }
+ else {
+ condition->b = NULL;
+ condition->b_len = 0;
+ }
+
+ if (condition->a_is_regex) {
+ if (condition->a_len < 1) {
+ flb_plg_error(ctx->ins, "Unable to create regex for "
+ "condition %s %s",
+ condition->raw_k, condition->raw_v);
+ teardown(ctx);
+ condition_free(condition);
+ flb_utils_split_free(split);
+ return -1;
+ }
+ else {
+ flb_plg_debug(ctx->ins, "Creating regex for condition A : "
+ "%s %s : %s",
+ condition->raw_k, condition->raw_v,
+ condition->a);
+ condition->a_regex =
+ flb_regex_create(condition->a);
+ }
+ }
+
+ if (condition->b_is_regex) {
+ if (condition->b_len < 1) {
+ flb_plg_error(ctx->ins, "Unable to create regex "
+ "for condition %s %s",
+ condition->raw_k, condition->raw_v);
+ teardown(ctx);
+ condition_free(condition);
+ flb_utils_split_free(split);
+ return -1;
+ }
+ else {
+ flb_plg_debug(ctx->ins, "Creating regex for condition B : %s "
+ "%s : %s",
+ condition->raw_k, condition->raw_v, condition->b);
+ condition->b_regex =
+ flb_regex_create(condition->b);
+ }
+ }
+
+ flb_utils_split_free(split);
+
+ mk_list_add(&condition->_head, &ctx->conditions);
+ ctx->conditions_cnt++;
+ }
+ else {
+
+ //
+ // Build a rule
+ //
+
+ rule = flb_calloc(1, sizeof(struct modify_rule));
+ if (!rule) {
+ flb_plg_error(ctx->ins, "Unable to allocate memory for rule");
+ teardown(ctx);
+ flb_utils_split_free(split);
+ return -1;
+ }
+
+ rule->key_is_regex = false;
+ rule->val_is_regex = false;
+ rule->raw_k = flb_strndup(kv->key, flb_sds_len(kv->key));
+ if (rule->raw_k == NULL) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "Unable to allocate memory for rule->raw_k");
+ teardown(ctx);
+ rule_free(rule);
+ flb_utils_split_free(split);
+ return -1;
+ }
+ rule->raw_v = flb_strndup(kv->val, flb_sds_len(kv->val));
+ if (rule->raw_v == NULL) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "Unable to allocate memory for rule->raw_v");
+ teardown(ctx);
+ rule_free(rule);
+ flb_utils_split_free(split);
+ return -1;
+ }
+
+ sentry =
+ mk_list_entry_first(split, struct flb_split_entry, _head);
+ rule->key = flb_strndup(sentry->value, sentry->len);
+ if (rule->key == NULL) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "Unable to allocate memory for rule->key");
+ teardown(ctx);
+ rule_free(rule);
+ flb_utils_split_free(split);
+ return -1;
+ }
+ rule->key_len = sentry->len;
+
+ sentry = mk_list_entry_last(split, struct flb_split_entry, _head);
+ rule->val = flb_strndup(sentry->value, sentry->len);
+ if (rule->val == NULL) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "Unable to allocate memory for rule->val");
+ teardown(ctx);
+ rule_free(rule);
+ flb_utils_split_free(split);
+ return -1;
+ }
+ rule->val_len = sentry->len;
+
+ flb_utils_split_free(split);
+
+ if (list_size == 1) {
+ if (strcasecmp(kv->key, "remove") == 0) {
+ rule->ruletype = REMOVE;
+ }
+ else if (strcasecmp(kv->key, "remove_wildcard") == 0) {
+ rule->ruletype = REMOVE_WILDCARD;
+ }
+ else if (strcasecmp(kv->key, "remove_regex") == 0) {
+ rule->ruletype = REMOVE_REGEX;
+ rule->key_is_regex = true;
+ }
+ else if (strcasecmp(kv->key, "move_to_start") == 0) {
+ rule->ruletype = MOVE_TO_START;
+ }
+ else if (strcasecmp(kv->key, "move_to_end") == 0) {
+ rule->ruletype = MOVE_TO_END;
+ }
+ else {
+ flb_plg_error(ctx->ins, "Invalid operation %s : %s in "
+ "configuration", kv->key, kv->val);
+ teardown(ctx);
+ rule_free(rule);
+ return -1;
+ }
+ }
+ else if (list_size == 2) {
+ if (strcasecmp(kv->key, "rename") == 0) {
+ rule->ruletype = RENAME;
+ }
+ else if (strcasecmp(kv->key, "hard_rename") == 0) {
+ rule->ruletype = HARD_RENAME;
+ }
+ else if (strcasecmp(kv->key, "add") == 0) {
+ rule->ruletype = ADD;
+ }
+ else if (strcasecmp(kv->key, "add_if_not_present") == 0) {
+ flb_plg_info(ctx->ins, "DEPRECATED : Operation "
+ "'add_if_not_present' has been replaced "
+ "by 'add'.");
+ rule->ruletype = ADD;
+ }
+ else if (strcasecmp(kv->key, "set") == 0) {
+ rule->ruletype = SET;
+ }
+ else if (strcasecmp(kv->key, "copy") == 0) {
+ rule->ruletype = COPY;
+ }
+ else if (strcasecmp(kv->key, "hard_copy") == 0) {
+ rule->ruletype = HARD_COPY;
+ }
+ else {
+ flb_plg_error(ctx->ins, "Invalid operation %s : %s in "
+ "configuration", kv->key, kv->val);
+ teardown(ctx);
+ rule_free(rule);
+ return -1;
+ }
+ }
+
+ if (rule->key_is_regex && rule->key_len == 0) {
+ flb_plg_error(ctx->ins, "Unable to create regex for rule %s %s",
+ rule->raw_k, rule->raw_v);
+ teardown(ctx);
+ rule_free(rule);
+ return -1;
+ }
+ else {
+ rule->key_regex =
+ flb_regex_create(rule->key);
+ if (rule->key_regex == NULL) {
+ flb_plg_error(ctx->ins, "Unable to create regex(key) from %s",
+ rule->key);
+ teardown(ctx);
+ rule_free(rule);
+ return -1;
+ }
+ }
+
+ if (rule->val_is_regex && rule->val_len == 0) {
+ flb_plg_error(ctx->ins, "Unable to create regex for rule %s %s",
+ rule->raw_k, rule->raw_v);
+ teardown(ctx);
+ rule_free(rule);
+ return -1;
+ }
+ else {
+ rule->val_regex =
+ flb_regex_create(rule->val);
+ if (rule->val_regex == NULL) {
+ flb_plg_error(ctx->ins, "Unable to create regex(val) from %s",
+ rule->val);
+ teardown(ctx);
+ rule_free(rule);
+ return -1;
+ }
+ }
+
+ mk_list_add(&rule->_head, &ctx->rules);
+ ctx->rules_cnt++;
+ }
+
+ }
+
+ flb_plg_debug(ctx->ins, "Initialized modify filter with %d conditions "
+ "and %d rules",
+ ctx->conditions_cnt, ctx->rules_cnt);
+ return 0;
+}
+
+
+/* Regex matchers */
+static inline bool helper_msgpack_object_matches_regex(msgpack_object * obj,
+ struct flb_regex
+ *regex)
+{
+ int len;
+ const char *key;
+
+ if (obj->type == MSGPACK_OBJECT_BIN) {
+ return false;
+ }
+ else if (obj->type == MSGPACK_OBJECT_STR) {
+ key = obj->via.str.ptr;
+ len = obj->via.str.size;
+ }
+ else if (obj->type == MSGPACK_OBJECT_BOOLEAN) {
+ if (obj->via.boolean) {
+ key = "true";
+ len = 4;
+ }
+ else {
+ key = "false";
+ len = 5;
+ }
+ }
+ else {
+ return false;
+ }
+
+ return flb_regex_match(regex, (unsigned char *) key, len) > 0;
+}
+
+static inline bool kv_key_matches_regex(msgpack_object_kv * kv,
+ struct flb_regex *regex)
+{
+ return helper_msgpack_object_matches_regex(&kv->key, regex);
+}
+
+static inline bool kv_val_matches_regex(msgpack_object_kv * kv,
+ struct flb_regex *regex)
+{
+ return helper_msgpack_object_matches_regex(&kv->val, regex);
+}
+
+static inline bool kv_key_matches_regex_rule_key(msgpack_object_kv * kv,
+ struct modify_rule *rule)
+{
+ return kv_key_matches_regex(kv, rule->key_regex);
+}
+
+static inline bool kv_key_does_not_match_regex_rule_key(msgpack_object_kv *
+ kv,
+ struct modify_rule
+ *rule)
+{
+ return !kv_key_matches_regex_rule_key(kv, rule);
+}
+
+static inline int map_count_keys_matching_regex(msgpack_object * map,
+ struct flb_regex *regex)
+{
+ int i;
+ int count = 0;
+
+ for (i = 0; i < map->via.map.size; i++) {
+ if (kv_key_matches_regex(&map->via.map.ptr[i], regex)) {
+ count++;
+ }
+ }
+ return count;
+}
+
+
+/*
+ * Wildcard matchers
+ */
+
+static inline bool helper_msgpack_object_matches_wildcard(msgpack_object *
+ obj, char *str,
+ int len)
+{
+ const char *key;
+
+ if (obj->type == MSGPACK_OBJECT_BIN) {
+ key = obj->via.bin.ptr;
+ }
+ else if (obj->type == MSGPACK_OBJECT_STR) {
+ key = obj->via.str.ptr;
+ }
+ else {
+ return false;
+ }
+
+ return (strncmp(str, key, len) == 0);
+}
+
+static inline bool kv_key_matches_wildcard(msgpack_object_kv * kv,
+ char *str, int len)
+{
+ return helper_msgpack_object_matches_wildcard(&kv->key, str, len);
+}
+
+static inline bool kv_key_matches_wildcard_rule_key(msgpack_object_kv * kv,
+ struct modify_rule *rule)
+{
+ return kv_key_matches_wildcard(kv, rule->key, rule->key_len);
+}
+
+static inline bool kv_key_does_not_match_wildcard_rule_key(msgpack_object_kv *
+ kv,
+ struct modify_rule
+ *rule)
+{
+ return !kv_key_matches_wildcard_rule_key(kv, rule);
+}
+
+static inline int map_count_keys_matching_wildcard(msgpack_object * map,
+ char *str, int len)
+{
+ int i;
+ int count = 0;
+
+ for (i = 0; i < map->via.map.size; i++) {
+ if (kv_key_matches_wildcard(&map->via.map.ptr[i], str, len)) {
+ count++;
+ }
+ }
+ return count;
+}
+
+//
+// String matchers
+//
+
+static inline bool helper_msgpack_object_matches_str(msgpack_object * obj,
+ char *str, int len)
+{
+
+ const char *key;
+ int klen;
+
+ if (obj->type == MSGPACK_OBJECT_BIN) {
+ key = obj->via.bin.ptr;
+ klen = obj->via.bin.size;
+ }
+ else if (obj->type == MSGPACK_OBJECT_STR) {
+ key = obj->via.str.ptr;
+ klen = obj->via.str.size;
+ }
+ else {
+ return false;
+ }
+
+ return ((len == klen) && (strncmp(str, key, klen) == 0)
+ );
+}
+
+static inline bool kv_key_matches_str(msgpack_object_kv * kv,
+ char *str, int len)
+{
+ return helper_msgpack_object_matches_str(&kv->key, str, len);
+}
+
+static inline bool kv_key_matches_str_rule_key(msgpack_object_kv * kv,
+ struct modify_rule *rule)
+{
+ return kv_key_matches_str(kv, rule->key, rule->key_len);
+}
+
+static inline bool kv_key_does_not_match_str_rule_key(msgpack_object_kv * kv,
+ struct modify_rule
+ *rule)
+{
+ return !kv_key_matches_str_rule_key(kv, rule);
+}
+
+static inline bool kv_key_matches_str_rule_val(msgpack_object_kv * kv,
+ struct modify_rule *rule)
+{
+ return kv_key_matches_str(kv, rule->val, rule->val_len);
+}
+
+static inline int map_count_keys_matching_str(msgpack_object * map,
+ char *str, int len)
+{
+ int i;
+ int count = 0;
+
+ for (i = 0; i < map->via.map.size; i++) {
+ if (kv_key_matches_str(&map->via.map.ptr[i], str, len)) {
+ count++;
+ }
+ }
+ return count;
+}
+
+static inline void map_pack_each(msgpack_packer * packer,
+ msgpack_object * map)
+{
+ int i;
+
+ for (i = 0; i < map->via.map.size; i++) {
+ msgpack_pack_object(packer, map->via.map.ptr[i].key);
+ msgpack_pack_object(packer, map->via.map.ptr[i].val);
+ }
+}
+
+static inline void map_pack_each_fn(msgpack_packer * packer,
+ msgpack_object * map,
+ struct modify_rule *rule,
+ bool(*f) (msgpack_object_kv * kv,
+ struct modify_rule * rule)
+ )
+{
+ int i;
+
+ for (i = 0; i < map->via.map.size; i++) {
+ if ((*f) (&map->via.map.ptr[i], rule)) {
+ msgpack_pack_object(packer, map->via.map.ptr[i].key);
+ msgpack_pack_object(packer, map->via.map.ptr[i].val);
+ }
+ }
+}
+
+static inline bool evaluate_condition_KEY_EXISTS(msgpack_object * map,
+ struct modify_condition
+ *condition)
+{
+ msgpack_object *skey = NULL;
+ msgpack_object *okey = NULL;
+ msgpack_object *oval = NULL;
+
+ flb_ra_get_kv_pair(condition->ra_a, *map, &skey, &okey, &oval);
+ if (skey == NULL || okey == NULL || oval == NULL) {
+ return false;
+ }
+ return true;
+}
+
+static inline bool evaluate_condition_KEY_DOES_NOT_EXIST(msgpack_object * map,
+ struct
+ modify_condition
+ *condition)
+{
+ return !evaluate_condition_KEY_EXISTS(map, condition);
+}
+
+static inline bool evaluate_condition_A_KEY_MATCHES(msgpack_object * map,
+ struct modify_condition
+ *condition)
+{
+ return (map_count_keys_matching_regex(map, condition->a_regex) > 0);
+}
+
+static inline bool evaluate_condition_NO_KEY_MATCHES(msgpack_object * map,
+ struct
+ modify_condition
+ *condition)
+{
+ return !evaluate_condition_A_KEY_MATCHES(map, condition);
+}
+
+static inline bool evaluate_condition_KEY_VALUE_EQUALS(struct filter_modify_ctx *ctx,
+ msgpack_object * map,
+ struct
+ modify_condition
+ *condition)
+{
+ msgpack_object *skey = NULL;
+ msgpack_object *okey = NULL;
+ msgpack_object *oval = NULL;
+ bool ret = false;
+
+ flb_ra_get_kv_pair(condition->ra_a, *map, &skey, &okey, &oval);
+ if (skey == NULL || okey == NULL || oval == NULL) {
+ return false;
+ }
+ ret = helper_msgpack_object_matches_str(oval, condition->b, condition->b_len);
+ if (ret) {
+ flb_plg_debug(ctx->ins, "Match for condition KEY_VALUE_EQUALS %s",
+ condition->b);
+ }
+ return ret;
+}
+
+static inline
+bool evaluate_condition_KEY_VALUE_DOES_NOT_EQUAL(struct filter_modify_ctx *ctx,
+ msgpack_object
+ *map,
+ struct
+ modify_condition
+ *condition)
+{
+ if (!evaluate_condition_KEY_EXISTS(map, condition)) {
+ return false;
+ }
+ return !evaluate_condition_KEY_VALUE_EQUALS(ctx, map, condition);
+}
+
+static inline bool evaluate_condition_KEY_VALUE_MATCHES(struct filter_modify_ctx *ctx,
+ msgpack_object *map,
+ struct
+ modify_condition
+ *condition)
+{
+ msgpack_object *skey = NULL;
+ msgpack_object *okey = NULL;
+ msgpack_object *oval = NULL;
+ bool ret = false;
+
+ flb_ra_get_kv_pair(condition->ra_a, *map, &skey, &okey, &oval);
+ if (skey == NULL || okey == NULL || oval == NULL) {
+ return false;
+ }
+ ret = helper_msgpack_object_matches_regex(oval, condition->b_regex);
+ if (ret) {
+ flb_plg_debug(ctx->ins, "Match for condition KEY_VALUE_MATCHES "
+ "%s", condition->b);
+ }
+ return ret;
+}
+
+static inline
+bool evaluate_condition_KEY_VALUE_DOES_NOT_MATCH(struct filter_modify_ctx *ctx,
+ msgpack_object
+ * map,
+ struct
+ modify_condition
+ *condition)
+{
+ if (!evaluate_condition_KEY_EXISTS(map, condition)) {
+ return false;
+ }
+ return !evaluate_condition_KEY_VALUE_MATCHES(ctx, map, condition);
+}
+
+static inline bool
+evaluate_condition_MATCHING_KEYS_HAVE_MATCHING_VALUES(struct filter_modify_ctx *ctx,
+ msgpack_object *map,
+ struct modify_condition
+ *condition)
+{
+ int i;
+ bool match = true;
+ msgpack_object_kv *kv;
+
+ for (i = 0; i < map->via.map.size; i++) {
+ kv = &map->via.map.ptr[i];
+ if (kv_key_matches_regex(kv, condition->a_regex)) {
+ if (!kv_val_matches_regex(kv, condition->b_regex)) {
+ flb_plg_debug(ctx->ins, "Match MISSED for condition "
+ "MATCHING_KEYS_HAVE_MATCHING_VALUES %s",
+ condition->b);
+ match = false;
+ break;
+ }
+ }
+ }
+ return match;
+}
+
+static inline bool
+evaluate_condition_MATCHING_KEYS_DO_NOT_HAVE_MATCHING_VALUES(struct filter_modify_ctx *ctx,
+ msgpack_object *
+ map,
+ struct
+ modify_condition
+ *condition)
+{
+ return !evaluate_condition_MATCHING_KEYS_HAVE_MATCHING_VALUES(ctx,
+ map,
+ condition);
+}
+
+static inline bool evaluate_condition(struct filter_modify_ctx *ctx,
+ msgpack_object * map,
+ struct modify_condition *condition)
+{
+ switch (condition->conditiontype) {
+ case KEY_EXISTS:
+ return evaluate_condition_KEY_EXISTS(map, condition);
+ case KEY_DOES_NOT_EXIST:
+ return evaluate_condition_KEY_DOES_NOT_EXIST(map, condition);
+ case A_KEY_MATCHES:
+ return evaluate_condition_A_KEY_MATCHES(map, condition);
+ case NO_KEY_MATCHES:
+ return evaluate_condition_NO_KEY_MATCHES(map, condition);
+ case KEY_VALUE_EQUALS:
+ return evaluate_condition_KEY_VALUE_EQUALS(ctx, map, condition);
+ case KEY_VALUE_DOES_NOT_EQUAL:
+ return evaluate_condition_KEY_VALUE_DOES_NOT_EQUAL(ctx, map, condition);
+ case KEY_VALUE_MATCHES:
+ return evaluate_condition_KEY_VALUE_MATCHES(ctx, map, condition);
+ case KEY_VALUE_DOES_NOT_MATCH:
+ return evaluate_condition_KEY_VALUE_DOES_NOT_MATCH(ctx, map, condition);
+ case MATCHING_KEYS_HAVE_MATCHING_VALUES:
+ return evaluate_condition_MATCHING_KEYS_HAVE_MATCHING_VALUES(ctx,
+ map,
+ condition);
+ case MATCHING_KEYS_DO_NOT_HAVE_MATCHING_VALUES:
+ return
+ evaluate_condition_MATCHING_KEYS_DO_NOT_HAVE_MATCHING_VALUES(ctx,
+ map,
+ condition);
+ default:
+ flb_plg_warn(ctx->ins, "Unknown conditiontype for condition %s : %s, "
+ "assuming result FAILED TO MEET CONDITION",
+ condition->raw_k, condition->raw_v);
+ }
+ return false;
+}
+
+static inline bool evaluate_conditions(msgpack_object * map,
+ struct filter_modify_ctx *ctx)
+{
+ bool ok = true;
+
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct modify_condition *condition;
+
+ mk_list_foreach_safe(head, tmp, &ctx->conditions) {
+ condition = mk_list_entry(head, struct modify_condition, _head);
+ if (!evaluate_condition(ctx, map, condition)) {
+ flb_plg_debug(ctx->ins, "Condition not met : %s",
+ condition->raw_v);
+ ok = false;
+ }
+ }
+
+ return ok;
+}
+
+static inline int apply_rule_RENAME(struct filter_modify_ctx *ctx,
+ msgpack_packer *packer,
+ msgpack_object *map,
+ struct modify_rule *rule)
+{
+ int i;
+
+ int match_keys =
+ map_count_keys_matching_str(map, rule->key, rule->key_len);
+ int conflict_keys =
+ map_count_keys_matching_str(map, rule->val, rule->val_len);
+
+ if (match_keys == 0) {
+ flb_plg_debug(ctx->ins, "Rule RENAME %s TO %s : No keys matching %s "
+ "found, not applying rule",
+ rule->key, rule->val, rule->key);
+ return FLB_FILTER_NOTOUCH;
+ }
+ else if (conflict_keys > 0) {
+ flb_plg_debug(ctx->ins, "Rule RENAME %s TO %s : Existing key %s found, "
+ "not applying rule",
+ rule->key, rule->val, rule->key);
+ return FLB_FILTER_NOTOUCH;
+ }
+ else {
+ msgpack_pack_map(packer, map->via.map.size);
+ for (i = 0; i < map->via.map.size; i++) {
+ if (kv_key_matches_str_rule_key(&map->via.map.ptr[i], rule)) {
+ helper_pack_string(ctx, packer, rule->val, rule->val_len);
+ }
+ else {
+ msgpack_pack_object(packer, map->via.map.ptr[i].key);
+ }
+ msgpack_pack_object(packer, map->via.map.ptr[i].val);
+ }
+ return FLB_FILTER_MODIFIED;
+ }
+}
+
+static inline int apply_rule_HARD_RENAME(struct filter_modify_ctx *ctx,
+ msgpack_packer *packer,
+ msgpack_object *map,
+ struct modify_rule *rule)
+{
+ int i;
+
+ int match_keys =
+ map_count_keys_matching_str(map, rule->key, rule->key_len);
+ int conflict_keys =
+ map_count_keys_matching_str(map, rule->val, rule->val_len);
+ msgpack_object_kv *kv;
+
+ if (match_keys == 0) {
+ flb_plg_debug(ctx->ins, "Rule HARD_RENAME %s TO %s : No keys matching "
+ "%s found, not applying rule",
+ rule->key, rule->val, rule->key);
+ return FLB_FILTER_NOTOUCH;
+ }
+ else if (conflict_keys == 0) {
+ msgpack_pack_map(packer, map->via.map.size);
+ for (i = 0; i < map->via.map.size; i++) {
+ kv = &map->via.map.ptr[i];
+ if (kv_key_matches_str_rule_key(kv, rule)) {
+ helper_pack_string(ctx, packer, rule->val, rule->val_len);
+ }
+ else {
+ msgpack_pack_object(packer, kv->key);
+ }
+ msgpack_pack_object(packer, kv->val);
+ }
+ return FLB_FILTER_MODIFIED;
+ }
+ else {
+ msgpack_pack_map(packer, map->via.map.size - conflict_keys);
+
+ for (i = 0; i < map->via.map.size; i++) {
+ kv = &map->via.map.ptr[i];
+ // If this kv->key matches rule->val it's a conflict source key and
+ // will be skipped
+ if (!kv_key_matches_str_rule_val(kv, rule)) {
+ if (kv_key_matches_str_rule_key(kv, rule)) {
+ helper_pack_string(ctx, packer, rule->val, rule->val_len);
+ }
+ else {
+ msgpack_pack_object(packer, kv->key);
+ }
+
+ msgpack_pack_object(packer, kv->val);
+ }
+ }
+ return FLB_FILTER_MODIFIED;
+ }
+}
+
+static inline int apply_rule_COPY(struct filter_modify_ctx *ctx,
+ msgpack_packer *packer,
+ msgpack_object *map,
+ struct modify_rule *rule)
+{
+ int match_keys =
+ map_count_keys_matching_str(map, rule->key, rule->key_len);
+ int conflict_keys =
+ map_count_keys_matching_str(map, rule->val, rule->val_len);
+ int i;
+ msgpack_object_kv *kv;
+
+ if (match_keys < 1) {
+ flb_plg_debug(ctx->ins, "Rule COPY %s TO %s : No keys matching %s "
+ "found, not applying rule",
+ rule->key, rule->val, rule->key);
+ return FLB_FILTER_NOTOUCH;
+ }
+ else if (match_keys > 1) {
+ flb_plg_debug(ctx->ins, "Rule COPY %s TO %s : Multiple keys matching "
+ "%s found, not applying rule",
+ rule->key, rule->val, rule->key);
+ return FLB_FILTER_NOTOUCH;
+ }
+ else if (conflict_keys > 0) {
+ flb_plg_debug(ctx->ins, "Rule COPY %s TO %s : Existing keys matching "
+ "target %s found, not applying rule",
+ rule->key, rule->val, rule->key);
+ return FLB_FILTER_NOTOUCH;
+ }
+ else {
+ msgpack_pack_map(packer, map->via.map.size + 1);
+ for (i = 0; i < map->via.map.size; i++) {
+ kv = &map->via.map.ptr[i];
+
+ msgpack_pack_object(packer, kv->key);
+ msgpack_pack_object(packer, kv->val);
+
+ if (kv_key_matches_str_rule_key(kv, rule)) {
+ helper_pack_string(ctx, packer, rule->val, rule->val_len);
+ msgpack_pack_object(packer, kv->val);
+ }
+ }
+ return FLB_FILTER_MODIFIED;
+ }
+}
+
+static inline int apply_rule_HARD_COPY(struct filter_modify_ctx *ctx,
+ msgpack_packer *packer,
+ msgpack_object *map,
+ struct modify_rule *rule)
+{
+ int i;
+
+ int match_keys =
+ map_count_keys_matching_str(map, rule->key, rule->key_len);
+ int conflict_keys =
+ map_count_keys_matching_str(map, rule->val, rule->val_len);
+ msgpack_object_kv *kv;
+
+ if (match_keys < 1) {
+ flb_plg_debug(ctx->ins, "Rule HARD_COPY %s TO %s : No keys matching %s "
+ "found, not applying rule",
+ rule->key, rule->val, rule->key);
+ return FLB_FILTER_NOTOUCH;
+ }
+ else if (match_keys > 1) {
+ flb_plg_warn(ctx->ins, "Rule HARD_COPY %s TO %s : Multiple keys "
+ "matching %s found, not applying rule",
+ rule->key, rule->val, rule->key);
+ return FLB_FILTER_NOTOUCH;
+ }
+ else if (conflict_keys > 1) {
+ flb_plg_warn(ctx->ins, "Rule HARD_COPY %s TO %s : Multiple target keys "
+ "matching %s found, not applying rule",
+ rule->key, rule->val, rule->val);
+ return FLB_FILTER_NOTOUCH;
+ }
+ else if (conflict_keys == 0) {
+ msgpack_pack_map(packer, map->via.map.size + 1);
+ for (i = 0; i < map->via.map.size; i++) {
+ kv = &map->via.map.ptr[i];
+ msgpack_pack_object(packer, kv->key);
+ msgpack_pack_object(packer, kv->val);
+
+ // This is our copy
+ if (kv_key_matches_str_rule_key(kv, rule)) {
+ helper_pack_string(ctx, packer, rule->val, rule->val_len);
+ msgpack_pack_object(packer, kv->val);
+ }
+ }
+ return FLB_FILTER_MODIFIED;
+ }
+ else {
+ msgpack_pack_map(packer, map->via.map.size);
+
+ for (i = 0; i < map->via.map.size; i++) {
+ kv = &map->via.map.ptr[i];
+
+ // Skip the conflict key, we will create a new one
+ if (!kv_key_matches_str_rule_val(kv, rule)) {
+ msgpack_pack_object(packer, kv->key);
+ msgpack_pack_object(packer, kv->val);
+
+ // This is our copy
+ if (kv_key_matches_str_rule_key(kv, rule)) {
+ helper_pack_string(ctx, packer, rule->val, rule->val_len);
+ msgpack_pack_object(packer, kv->val);
+ }
+ }
+ }
+
+ return FLB_FILTER_MODIFIED;
+ }
+}
+
+static inline int apply_rule_ADD(struct filter_modify_ctx *ctx,
+ msgpack_packer *packer,
+ msgpack_object *map,
+ struct modify_rule *rule)
+{
+ if (map_count_keys_matching_str(map, rule->key, rule->key_len) == 0) {
+ msgpack_pack_map(packer, map->via.map.size + 1);
+ map_pack_each(packer, map);
+ helper_pack_string(ctx, packer, rule->key, rule->key_len);
+ helper_pack_string(ctx, packer, rule->val, rule->val_len);
+ return FLB_FILTER_MODIFIED;
+ }
+ else {
+ flb_plg_debug(ctx->ins, "Rule ADD %s : this key already exists, "
+ "skipping", rule->key);
+ return FLB_FILTER_NOTOUCH;
+ }
+}
+
+static inline int apply_rule_SET(struct filter_modify_ctx *ctx,
+ msgpack_packer * packer,
+ msgpack_object * map,
+ struct modify_rule *rule)
+{
+ int matches = map_count_keys_matching_str(map, rule->key, rule->key_len);
+
+ msgpack_pack_map(packer, map->via.map.size - matches + 1);
+
+ if (matches == 0) {
+ map_pack_each(packer, map);
+ helper_pack_string(ctx, packer, rule->key, rule->key_len);
+ helper_pack_string(ctx, packer, rule->val, rule->val_len);
+ }
+ else {
+ map_pack_each_fn(packer, map, rule,
+ kv_key_does_not_match_str_rule_key);
+ helper_pack_string(ctx, packer, rule->key, rule->key_len);
+ helper_pack_string(ctx, packer, rule->val, rule->val_len);
+ }
+
+ return FLB_FILTER_MODIFIED;
+}
+
+static inline int apply_rule_REMOVE(msgpack_packer *packer,
+ msgpack_object *map,
+ struct modify_rule *rule)
+{
+ int matches = map_count_keys_matching_str(map, rule->key, rule->key_len);
+
+ if (matches == 0) {
+ return FLB_FILTER_NOTOUCH;
+ }
+ else {
+ msgpack_pack_map(packer, map->via.map.size - matches);
+ map_pack_each_fn(packer, map, rule,
+ kv_key_does_not_match_str_rule_key);
+ return FLB_FILTER_MODIFIED;
+ }
+}
+
+static inline int apply_rule_REMOVE_WILDCARD(msgpack_packer * packer,
+ msgpack_object * map,
+ struct modify_rule *rule)
+{
+ int matches =
+ map_count_keys_matching_wildcard(map, rule->key, rule->key_len);
+
+ if (matches == 0) {
+ return FLB_FILTER_NOTOUCH;
+ }
+ else {
+ msgpack_pack_map(packer, map->via.map.size - matches);
+ map_pack_each_fn(packer, map, rule,
+ kv_key_does_not_match_wildcard_rule_key);
+ return FLB_FILTER_MODIFIED;
+ }
+}
+
+static inline int apply_rule_REMOVE_REGEX(msgpack_packer * packer,
+ msgpack_object * map,
+ struct modify_rule *rule)
+{
+ int matches = map_count_keys_matching_regex(map, rule->key_regex);
+
+ if (matches == 0) {
+ return FLB_FILTER_NOTOUCH;
+ }
+ else {
+ msgpack_pack_map(packer, map->via.map.size - matches);
+ map_pack_each_fn(packer, map, rule,
+ kv_key_does_not_match_regex_rule_key);
+ return FLB_FILTER_MODIFIED;
+ }
+}
+
+static inline int apply_rule_MOVE_TO_END(struct filter_modify_ctx *ctx,
+ msgpack_packer *packer,
+ msgpack_object *map,
+ struct modify_rule *rule)
+{
+
+ int match_keys =
+ map_count_keys_matching_wildcard(map, rule->key, rule->key_len);
+
+ if (match_keys == 0) {
+ return FLB_FILTER_NOTOUCH;
+ }
+ else {
+ msgpack_pack_map(packer, map->via.map.size);
+ map_pack_each_fn(packer, map, rule,
+ kv_key_does_not_match_wildcard_rule_key);
+ map_pack_each_fn(packer, map, rule,
+ kv_key_matches_wildcard_rule_key);
+ return FLB_FILTER_MODIFIED;
+ }
+}
+
+static inline int apply_rule_MOVE_TO_START(struct filter_modify_ctx *ctx,
+ msgpack_packer *packer,
+ msgpack_object *map,
+ struct modify_rule *rule)
+{
+
+ int match_keys =
+ map_count_keys_matching_wildcard(map, rule->key, rule->key_len);
+
+ if (match_keys == 0) {
+ return FLB_FILTER_NOTOUCH;
+ }
+ else {
+ msgpack_pack_map(packer, map->via.map.size);
+ map_pack_each_fn(packer, map, rule,
+ kv_key_matches_wildcard_rule_key);
+ map_pack_each_fn(packer, map, rule,
+ kv_key_does_not_match_wildcard_rule_key);
+ return FLB_FILTER_MODIFIED;
+ }
+}
+
+static inline int apply_modifying_rule(struct filter_modify_ctx *ctx,
+ msgpack_packer *packer,
+ msgpack_object *map,
+ struct modify_rule *rule)
+{
+ switch (rule->ruletype) {
+ case RENAME:
+ return apply_rule_RENAME(ctx, packer, map, rule);
+ case HARD_RENAME:
+ return apply_rule_HARD_RENAME(ctx, packer, map, rule);
+ case ADD:
+ return apply_rule_ADD(ctx, packer, map, rule);
+ case SET:
+ return apply_rule_SET(ctx, packer, map, rule);
+ case REMOVE:
+ return apply_rule_REMOVE(packer, map, rule);
+ case REMOVE_WILDCARD:
+ return apply_rule_REMOVE_WILDCARD(packer, map, rule);
+ case REMOVE_REGEX:
+ return apply_rule_REMOVE_REGEX(packer, map, rule);
+ case COPY:
+ return apply_rule_COPY(ctx, packer, map, rule);
+ case HARD_COPY:
+ return apply_rule_HARD_COPY(ctx, packer, map, rule);
+ case MOVE_TO_START:
+ return apply_rule_MOVE_TO_START(ctx, packer, map, rule);
+ case MOVE_TO_END:
+ return apply_rule_MOVE_TO_END(ctx, packer, map, rule);
+ default:
+ flb_plg_warn(ctx->ins, "Unknown ruletype for rule with key %s, ignoring",
+ rule->key);
+ }
+ return FLB_FILTER_NOTOUCH;
+}
+
+
+
+static inline int apply_modifying_rules(
+ struct flb_log_event_encoder *log_encoder,
+ struct flb_log_event *log_event,
+ struct filter_modify_ctx *ctx)
+{
+ int ret;
+ int records_in;
+ msgpack_object map;
+ struct modify_rule *rule;
+ msgpack_sbuffer sbuffer;
+ msgpack_packer in_packer;
+ msgpack_unpacker unpacker;
+ msgpack_unpacked unpacked;
+ int initial_buffer_size = 1024 * 8;
+ int new_buffer_size = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ bool has_modifications = false;
+
+ map = *log_event->body;
+ records_in = map.via.map.size;
+
+ if (!evaluate_conditions(&map, ctx)) {
+ flb_plg_debug(ctx->ins, "Conditions not met, not touching record");
+ return 0;
+ }
+
+ msgpack_sbuffer_init(&sbuffer);
+ msgpack_packer_init(&in_packer, &sbuffer, msgpack_sbuffer_write);
+ msgpack_unpacked_init(&unpacked);
+
+ if (!msgpack_unpacker_init(&unpacker, initial_buffer_size)) {
+ flb_plg_error(ctx->ins, "Unable to allocate memory for unpacker, aborting");
+ return -1;
+ }
+
+ mk_list_foreach_safe(head, tmp, &ctx->rules) {
+ rule = mk_list_entry(head, struct modify_rule, _head);
+
+ msgpack_sbuffer_clear(&sbuffer);
+
+ if (apply_modifying_rule(ctx, &in_packer, &map, rule) !=
+ FLB_FILTER_NOTOUCH) {
+
+ has_modifications = true;
+ new_buffer_size = sbuffer.size * 2;
+
+ if (msgpack_unpacker_buffer_capacity(&unpacker) < new_buffer_size) {
+ if (!msgpack_unpacker_reserve_buffer
+ (&unpacker, new_buffer_size)) {
+ flb_plg_error(ctx->ins, "Unable to re-allocate memory for "
+ "unpacker, aborting");
+ return -1;
+ }
+ }
+
+ memcpy(msgpack_unpacker_buffer(&unpacker), sbuffer.data,
+ sbuffer.size);
+ msgpack_unpacker_buffer_consumed(&unpacker, sbuffer.size);
+
+ msgpack_unpacker_next(&unpacker, &unpacked);
+
+ if (unpacked.data.type == MSGPACK_OBJECT_MAP) {
+ map = unpacked.data;
+ }
+ else {
+ flb_plg_error(ctx->ins, "Expected MSGPACK_MAP, this is not a "
+ "valid return value, skipping");
+ }
+ }
+ }
+
+ if (has_modifications) {
+ ret = flb_log_event_encoder_begin_record(log_encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_timestamp(
+ log_encoder, &log_event->timestamp);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
+ log_encoder, log_event->metadata);
+ }
+
+ flb_plg_trace(ctx->ins, "Input map size %d elements, output map size "
+ "%d elements", records_in, map.via.map.size);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_body_from_msgpack_object(
+ log_encoder, &map);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_commit_record(log_encoder);
+ }
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins, "log event encoding error : %d", ret);
+
+ flb_log_event_encoder_rollback_record(log_encoder);
+
+ has_modifications = FLB_FALSE;
+ }
+ }
+
+ msgpack_unpacked_destroy(&unpacked);
+ msgpack_unpacker_destroy(&unpacker);
+ msgpack_sbuffer_destroy(&sbuffer);
+
+ return has_modifications ? 1 : 0;
+
+}
+
+static int cb_modify_init(struct flb_filter_instance *f_ins,
+ struct flb_config *config, void *data)
+{
+ struct filter_modify_ctx *ctx;
+
+ // Create context
+ ctx = flb_malloc(sizeof(struct filter_modify_ctx));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+ mk_list_init(&ctx->conditions);
+ mk_list_init(&ctx->rules);
+ ctx->ins = f_ins;
+ ctx->rules_cnt = 0;
+ ctx->conditions_cnt = 0;
+
+ if (setup(ctx, f_ins, config) < 0) {
+ flb_free(ctx);
+ return -1;
+ }
+
+ // Set context
+ flb_filter_set_context(f_ins, ctx);
+ return 0;
+}
+
+static int cb_modify_filter(const void *data, size_t bytes,
+ const char *tag, int tag_len,
+ void **out_buf, size_t * out_size,
+ struct flb_filter_instance *f_ins,
+ struct flb_input_instance *i_ins,
+ void *context, struct flb_config *config)
+{
+ struct flb_log_event_encoder log_encoder;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ struct filter_modify_ctx *ctx = context;
+ int modifications = 0;
+ int total_modifications = 0;
+ int ret;
+
+ (void) f_ins;
+ (void) i_ins;
+ (void) config;
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ ret = flb_log_event_encoder_init(&log_encoder,
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event encoder initialization error : %d", ret);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ modifications =
+ apply_modifying_rules(&log_encoder, &log_event, ctx);
+
+ if (modifications == 0) {
+ /* not matched, so copy original event. */
+ ret = flb_log_event_encoder_emit_raw_record(
+ &log_encoder,
+ log_decoder.record_base,
+ log_decoder.record_length);
+ }
+
+ total_modifications += modifications;
+ }
+
+ if(total_modifications > 0) {
+ if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA &&
+ log_decoder.offset == bytes) {
+ ret = FLB_EVENT_ENCODER_SUCCESS;
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ *out_buf = log_encoder.output_buffer;
+ *out_size = log_encoder.output_length;
+
+ ret = FLB_FILTER_MODIFIED;
+
+ flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "Log event encoder error : %d", ret);
+
+ ret = FLB_FILTER_NOTOUCH;
+ }
+ }
+ else {
+ ret = FLB_FILTER_NOTOUCH;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return ret;
+}
+
+static int cb_modify_exit(void *data, struct flb_config *config)
+{
+ struct filter_modify_ctx *ctx = data;
+
+ teardown(ctx);
+ flb_free(ctx);
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "Set", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Add a key/value pair with key KEY and value VALUE. "
+ "If KEY already exists, this field is overwritten."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "Add", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Add a key/value pair with key KEY and value VALUE if KEY does not exist"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "Remove", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Remove a key/value pair with key KEY if it exists"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "Remove_wildcard", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Remove all key/value pairs with key matching wildcard KEY"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "Remove_regex", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Remove all key/value pairs with key matching regexp KEY"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "Move_To_Start", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Move key/value pairs with keys matching KEY to the start of the message"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "Move_To_End", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Move key/value pairs with keys matching KEY to the end of the message"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "Rename", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Rename a key/value pair with key KEY to RENAMED_KEY "
+ "if KEY exists AND RENAMED_KEY does not exist"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "Hard_Rename", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Rename a key/value pair with key KEY to RENAMED_KEY if KEY exists. "
+ "If RENAMED_KEY already exists, this field is overwritten"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "Copy", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Copy a key/value pair with key KEY to COPIED_KEY "
+ "if KEY exists AND COPIED_KEY does not exist"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "Hard_copy", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Copy a key/value pair with key KEY to COPIED_KEY if KEY exists. "
+ "If COPIED_KEY already exists, this field is overwritten"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "Condition", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Set the condition to modify. Key_exists, Key_does_not_exist, A_key_matches, "
+ "No_key_matches, Key_value_equals, Key_value_does_not_equal, Key_value_matches, "
+ "Key_value_does_not_match, Matching_keys_have_matching_values "
+ "and Matching_keys_do_not_have_matching_values are supported."
+ },
+ {0}
+};
+
+struct flb_filter_plugin filter_modify_plugin = {
+ .name = "modify",
+ .description = "modify records by applying rules",
+ .cb_init = cb_modify_init,
+ .cb_filter = cb_modify_filter,
+ .cb_exit = cb_modify_exit,
+ .config_map = config_map,
+ .flags = 0
+};