summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/filter_geoip2/geoip2.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/filter_geoip2/geoip2.c')
-rw-r--r--src/fluent-bit/plugins/filter_geoip2/geoip2.c519
1 files changed, 519 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_geoip2/geoip2.c b/src/fluent-bit/plugins/filter_geoip2/geoip2.c
new file mode 100644
index 000000000..28559dfef
--- /dev/null
+++ b/src/fluent-bit/plugins/filter_geoip2/geoip2.c
@@ -0,0 +1,519 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <sys/types.h>
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_hash_table.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_filter_plugin.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+#include <msgpack.h>
+
+#include "geoip2.h"
+
+static int configure(struct geoip2_ctx *ctx,
+ struct flb_filter_instance *f_ins)
+{
+ struct flb_kv *kv = NULL;
+ struct mk_list *head = NULL;
+ struct mk_list *split;
+ int status;
+ struct geoip2_record *record;
+ struct flb_split_entry *sentry;
+ struct flb_config_map_val *record_key;
+ int ret;
+
+ ctx->mmdb = flb_malloc(sizeof(MMDB_s));
+ ctx->lookup_keys_num = 0;
+ ctx->records_num = 0;
+
+ ret = flb_filter_config_map_set(f_ins, (void *)ctx);
+ if (ret == -1) {
+ flb_plg_error(f_ins, "unable to load configuration");
+ flb_free(ctx->mmdb);
+ return -1;
+ }
+
+ if (ctx->database) {
+ status = MMDB_open(ctx->database, MMDB_MODE_MMAP, ctx->mmdb);
+ if (status != MMDB_SUCCESS) {
+ flb_plg_error(f_ins, "Cannot open geoip2 database: %s: %s",
+ ctx->database, MMDB_strerror(status));
+ flb_free(ctx->mmdb);
+ return -1;
+ }
+ } else {
+ flb_plg_error(f_ins, "no geoip2 database has been loaded");
+ flb_free(ctx->mmdb);
+ return -1;
+ }
+
+ mk_list_foreach(head, ctx->lookup_keys) {
+ ctx->lookup_keys_num++;
+ }
+
+ flb_config_map_foreach(head, record_key, ctx->record_keys) {
+ record = flb_malloc(sizeof(struct geoip2_record));
+ if (!record) {
+ flb_errno();
+ continue;
+ }
+ split = flb_utils_split(record_key->val.str, ' ', 2);
+ if (mk_list_size(split) != 3) {
+ flb_plg_error(f_ins, "invalid record parameter: '%s'", kv->val);
+ flb_plg_error(f_ins, "expects 'KEY LOOKUP_KEY VALUE'");
+ flb_free(record);
+ flb_utils_split_free(split);
+ continue;
+ }
+
+ /* Get first value (field) */
+ sentry = mk_list_entry_first(split, struct flb_split_entry, _head);
+ record->key = flb_strndup(sentry->value, sentry->len);
+ record->key_len = sentry->len;
+
+ sentry = mk_list_entry_next(&sentry->_head, struct flb_split_entry,
+ _head, split);
+ record->lookup_key = flb_strndup(sentry->value, sentry->len);
+ record->lookup_key_len = sentry->len;
+
+ sentry = mk_list_entry_last(split, struct flb_split_entry, _head);
+ record->val = flb_strndup(sentry->value, sentry->len);
+ record->val_len = sentry->len;
+
+ flb_utils_split_free(split);
+ mk_list_add(&record->_head, &ctx->records);
+ ctx->records_num++;
+ }
+
+ if (ctx->lookup_keys_num <= 0) {
+ flb_plg_error(f_ins, "at least one lookup_key is required");
+ return -1;
+ }
+ if (ctx->records_num <= 0) {
+ flb_plg_error(f_ins, "at least one record is required");
+ return -1;
+ }
+ return 0;
+}
+
+static int delete_list(struct geoip2_ctx *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct geoip2_record *record;
+
+ mk_list_foreach_safe(head, tmp, &ctx->records) {
+ record = mk_list_entry(head, struct geoip2_record, _head);
+ flb_free(record->lookup_key);
+ flb_free(record->key);
+ flb_free(record->val);
+ mk_list_del(&record->_head);
+ flb_free(record);
+ }
+ return 0;
+}
+
+static struct flb_hash_table *prepare_lookup_keys(msgpack_object *map,
+ struct geoip2_ctx *ctx)
+{
+ msgpack_object_kv *kv;
+ msgpack_object *key;
+ msgpack_object *val;
+ struct mk_list *head;
+ struct flb_config_map_val *lookup_key;
+ struct flb_hash_table *ht;
+
+ ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, ctx->lookup_keys_num, -1);
+ if (!ht) {
+ return NULL;
+ }
+
+ kv = map->via.map.ptr;
+ for (int i = 0; i < map->via.map.size; i++) {
+ key = &(kv + i)->key;
+ val = &(kv + i)->val;
+ if (key->type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+ if (val->type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+ flb_config_map_foreach(head, lookup_key, ctx->lookup_keys) {
+ if (strncasecmp(key->via.str.ptr, lookup_key->val.str,
+ flb_sds_len(lookup_key->val.str)) == 0) {
+ flb_hash_table_add(ht, lookup_key->val.str, flb_sds_len(lookup_key->val.str),
+ (void *) val->via.str.ptr, val->via.str.size);
+ }
+ }
+ }
+
+ return ht;
+}
+
+static MMDB_lookup_result_s mmdb_lookup(struct geoip2_ctx *ctx, const char *ip)
+{
+ int gai_error;
+ int mmdb_error;
+ MMDB_lookup_result_s result;
+
+ result = MMDB_lookup_string(ctx->mmdb, ip, &gai_error, &mmdb_error);
+ if (gai_error != 0) {
+ flb_plg_error(ctx->ins, "getaddrinfo failed: %s", gai_strerror(gai_error));
+ }
+ if (mmdb_error != MMDB_SUCCESS) {
+ flb_plg_error(ctx->ins, "lookup failed : %s", MMDB_strerror(mmdb_error));
+ }
+
+ return result;
+}
+
+static void add_geoip_fields(msgpack_object *map,
+ struct flb_hash_table *lookup_keys,
+ struct geoip2_ctx *ctx,
+ struct flb_log_event_encoder *encoder)
+{
+ int ret;
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct geoip2_record *record;
+ const char *ip;
+ size_t ip_size;
+ MMDB_lookup_result_s result;
+ MMDB_entry_s entry;
+ MMDB_entry_data_s entry_data;
+ char **path;
+ int status;
+ char *pos;
+ char key[64];
+ struct mk_list *split;
+ int split_size;
+ struct mk_list *path_head;
+ struct mk_list *path_tmp;
+ struct flb_split_entry *sentry;
+ int i = 0;
+
+ mk_list_foreach_safe(head, tmp, &ctx->records) {
+ record = mk_list_entry(head, struct geoip2_record, _head);
+
+ flb_log_event_encoder_append_body_string(
+ encoder, record->key, record->key_len);
+
+ ret = flb_hash_table_get(lookup_keys, record->lookup_key, record->lookup_key_len,
+ (void *) &ip, &ip_size);
+ if (ret == -1) {
+ flb_log_event_encoder_append_body_null(encoder);
+ continue;
+ }
+
+ result = mmdb_lookup(ctx, ip);
+ if (!result.found_entry) {
+ flb_log_event_encoder_append_body_null(encoder);
+ continue;
+ }
+ entry = result.entry;
+ pos = strstr(record->val, "}");
+ memset(key, '\0', sizeof(key));
+ strncpy(key, record->val + 2, pos - (record->val + 2));
+ split = flb_utils_split(key, '.', 2);
+ split_size = mk_list_size(split);
+ path = flb_malloc(sizeof(char *) * (split_size + 1));
+ i = 0;
+ mk_list_foreach_safe(path_head, path_tmp, split) {
+ sentry = mk_list_entry(path_head, struct flb_split_entry, _head);
+ path[i] = flb_strndup(sentry->value, sentry->len);
+ i++;
+ }
+ path[split_size] = NULL;
+ status = MMDB_aget_value(&entry, &entry_data, (const char *const *const)path);
+ flb_utils_split_free(split);
+ for (int j = 0; j < split_size; j++) {
+ flb_free(path[j]);
+ }
+ flb_free(path);
+ if (status != MMDB_SUCCESS) {
+ flb_plg_warn(ctx->ins, "cannot get value: %s", MMDB_strerror(status));
+ flb_log_event_encoder_append_body_null(encoder);
+ continue;
+ }
+ if (!entry_data.has_data) {
+ flb_plg_warn(ctx->ins, "found entry does not have data");
+ flb_log_event_encoder_append_body_null(encoder);
+ continue;
+ }
+ if (entry_data.type == MMDB_DATA_TYPE_MAP ||
+ entry_data.type == MMDB_DATA_TYPE_ARRAY) {
+ flb_plg_warn(ctx->ins, "Not supported MAP and ARRAY");
+ flb_log_event_encoder_append_body_null(encoder);
+ continue;
+ }
+
+ switch (entry_data.type) {
+ case MMDB_DATA_TYPE_EXTENDED:
+ /* TODO: not implemented */
+ flb_log_event_encoder_append_body_null(encoder);
+ break;
+ case MMDB_DATA_TYPE_POINTER:
+ /* TODO: not implemented */
+ flb_log_event_encoder_append_body_null(encoder);
+ break;
+ case MMDB_DATA_TYPE_UTF8_STRING:
+ flb_log_event_encoder_append_body_string(
+ encoder,
+ (char *) entry_data.utf8_string,
+ entry_data.data_size);
+ break;
+ case MMDB_DATA_TYPE_DOUBLE:
+ flb_log_event_encoder_append_body_double(
+ encoder, entry_data.double_value);
+ break;
+ case MMDB_DATA_TYPE_BYTES:
+ flb_log_event_encoder_append_body_string(
+ encoder,
+ (char *) entry_data.bytes,
+ entry_data.data_size);
+ break;
+ case MMDB_DATA_TYPE_UINT16:
+ flb_log_event_encoder_append_body_uint16(
+ encoder, entry_data.uint16);
+ break;
+ case MMDB_DATA_TYPE_UINT32:
+ flb_log_event_encoder_append_body_uint32(
+ encoder, entry_data.uint32);
+ break;
+ case MMDB_DATA_TYPE_MAP:
+ /* TODO: not implemented */
+ flb_log_event_encoder_append_body_null(encoder);
+ break;
+ case MMDB_DATA_TYPE_INT32:
+ flb_log_event_encoder_append_body_int32(
+ encoder, entry_data.int32);
+ break;
+ case MMDB_DATA_TYPE_UINT64:
+ flb_log_event_encoder_append_body_uint64(
+ encoder, entry_data.uint64);
+ break;
+ case MMDB_DATA_TYPE_UINT128:
+#if !(MMDB_UINT128_IS_BYTE_ARRAY)
+ /* entry_data.uint128; */
+ flb_warn("Not supported uint128");
+#else
+ flb_warn("Not implemented when MMDB_UINT128_IS_BYTE_ARRAY");
+#endif
+ flb_log_event_encoder_append_body_null(encoder);
+ break;
+ case MMDB_DATA_TYPE_ARRAY:
+ /* TODO: not implemented */
+ flb_log_event_encoder_append_body_null(encoder);
+ break;
+ case MMDB_DATA_TYPE_CONTAINER:
+ /* TODO: not implemented */
+ flb_log_event_encoder_append_body_null(encoder);
+ break;
+ case MMDB_DATA_TYPE_END_MARKER:
+ break;
+ case MMDB_DATA_TYPE_BOOLEAN:
+ flb_log_event_encoder_append_body_boolean(
+ encoder, (int) entry_data.boolean);
+ break;
+ case MMDB_DATA_TYPE_FLOAT:
+ flb_log_event_encoder_append_body_double(
+ encoder, entry_data.float_value);
+ break;
+ default:
+ flb_error("Unknown type: %d", entry_data.type);
+ break;
+ }
+ }
+}
+
+static int cb_geoip2_init(struct flb_filter_instance *f_ins,
+ struct flb_config *config,
+ void *data)
+{
+ struct geoip2_ctx *ctx = NULL;
+ /* Create context */
+ ctx = flb_calloc(1, sizeof(struct geoip2_ctx));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+ mk_list_init(&ctx->records);
+
+
+ if (configure(ctx, f_ins) < 0) {
+ delete_list(ctx);
+ return -1;
+ }
+
+ ctx->ins = f_ins;
+ flb_filter_set_context(f_ins, ctx);
+
+ return 0;
+}
+
+static int cb_geoip2_filter(const void *data, size_t bytes,
+ const char *tag, int tag_len,
+ void **out_buf, size_t *out_size,
+ struct flb_filter_instance *f_ins,
+ struct flb_input_instance *i_ins,
+ void *context,
+ struct flb_config *config)
+{
+ struct geoip2_ctx *ctx = context;
+ msgpack_object_kv *kv;
+ struct flb_hash_table *lookup_keys_hash;
+ struct flb_log_event_encoder log_encoder;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int ret;
+ int i;
+
+ (void) i_ins;
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ ret = flb_log_event_encoder_init(&log_encoder,
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event encoder initialization error : %d", ret);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+
+ ret = flb_log_event_encoder_begin_record(&log_encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_timestamp(
+ &log_encoder, &log_event.timestamp);
+ }
+
+ kv = log_event.body->via.map.ptr;
+ for (i = 0;
+ i < log_event.body->via.map.size &&
+ ret == FLB_EVENT_ENCODER_SUCCESS ;
+ i++) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key),
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val));
+ }
+
+ lookup_keys_hash = prepare_lookup_keys(log_event.body, ctx);
+ add_geoip_fields(log_event.body, lookup_keys_hash, ctx, &log_encoder);
+ flb_hash_table_destroy(lookup_keys_hash);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_commit_record(&log_encoder);
+ }
+ }
+
+ if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA &&
+ log_decoder.offset == bytes) {
+ ret = FLB_EVENT_ENCODER_SUCCESS;
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ *out_buf = log_encoder.output_buffer;
+ *out_size = log_encoder.output_length;
+
+ ret = FLB_FILTER_MODIFIED;
+
+ flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "Log event encoder error : %d", ret);
+
+ ret = FLB_FILTER_NOTOUCH;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return ret;
+}
+
+static int cb_geoip2_exit(void *data, struct flb_config *config)
+{
+ struct geoip2_ctx *ctx = data;
+
+ if (ctx != NULL) {
+ delete_list(ctx);
+ MMDB_close(ctx->mmdb);
+ flb_free(ctx->mmdb);
+ flb_free(ctx);
+ }
+
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "database", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct geoip2_ctx, database),
+ "Set the geoip2 database path"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "lookup_key", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct geoip2_ctx, lookup_keys),
+ "Add a lookup_key"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "record", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct geoip2_ctx, record_keys),
+ "Add a record to the output base on geoip2"
+ },
+ /* EOF */
+ {0}
+};
+
+struct flb_filter_plugin filter_geoip2_plugin = {
+ .name = "geoip2",
+ .description = "add geoip information to records",
+ .cb_init = cb_geoip2_init,
+ .cb_filter = cb_geoip2_filter,
+ .cb_exit = cb_geoip2_exit,
+ .config_map = config_map,
+ .flags = 0,
+};