diff options
Diffstat (limited to 'fluent-bit/plugins/out_logdna/logdna.c')
-rw-r--r-- | fluent-bit/plugins/out_logdna/logdna.c | 591 |
1 files changed, 591 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_logdna/logdna.c b/fluent-bit/plugins/out_logdna/logdna.c new file mode 100644 index 000000000..e3ab9f56f --- /dev/null +++ b/fluent-bit/plugins/out_logdna/logdna.c @@ -0,0 +1,591 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_mp.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_env.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_log_event_decoder.h> + +#include "logdna.h" + +static inline int primary_key_check(msgpack_object k, char *name, int len) +{ + if (k.type != MSGPACK_OBJECT_STR) { + return FLB_FALSE; + } + + if (k.via.str.size != len) { + return FLB_FALSE; + } + + if (memcmp(k.via.str.ptr, name, len) == 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +/* + * This function looks for the following keys and add them to the buffer + * + * - level or severity + * - file + * - app + * - meta + */ +static int record_append_primary_keys(struct flb_logdna *ctx, + msgpack_object *map, + msgpack_packer *mp_sbuf) +{ + int i; + int c = 0; + msgpack_object *level = NULL; + msgpack_object *file = NULL; + msgpack_object *app = NULL; + msgpack_object *meta = NULL; + msgpack_object k; + msgpack_object v; + + for (i = 0; i < map->via.array.size; i++) { + k = map->via.map.ptr[i].key; + v = map->via.map.ptr[i].val; + + /* Level - optional */ + if (!level && + (primary_key_check(k, "level", 5) == FLB_TRUE || + primary_key_check(k, "severity", 8) == FLB_TRUE)) { + level = &k; + msgpack_pack_str(mp_sbuf, 5); + msgpack_pack_str_body(mp_sbuf, "level", 5); + msgpack_pack_object(mp_sbuf, v); + c++; + } + + /* Meta - optional */ + if (!meta && primary_key_check(k, "meta", 4) == FLB_TRUE) { + meta = &k; + msgpack_pack_str(mp_sbuf, 4); + msgpack_pack_str_body(mp_sbuf, "meta", 4); + msgpack_pack_object(mp_sbuf, v); + c++; + } + + /* File */ + if (!file && primary_key_check(k, "file", 4) == FLB_TRUE) { + file = &k; + msgpack_pack_str(mp_sbuf, 4); + msgpack_pack_str_body(mp_sbuf, "file", 4); + msgpack_pack_object(mp_sbuf, v); + c++; + } + + /* App */ + if (primary_key_check(k, "app", 3) == FLB_TRUE) { + app = &k; + msgpack_pack_str(mp_sbuf, 3); + msgpack_pack_str_body(mp_sbuf, "app", 3); + msgpack_pack_object(mp_sbuf, v); + c++; + } + } + + /* Set the global file name if the record did not provided one */ + if (!file && ctx->file) { + msgpack_pack_str(mp_sbuf, 4); + msgpack_pack_str_body(mp_sbuf, "file", 4); + msgpack_pack_str(mp_sbuf, flb_sds_len(ctx->file)); + msgpack_pack_str_body(mp_sbuf, ctx->file, flb_sds_len(ctx->file)); + c++; + } + + + /* If no application name is set, set the default */ + if (!app) { + msgpack_pack_str(mp_sbuf, 3); + msgpack_pack_str_body(mp_sbuf, "app", 3); + msgpack_pack_str(mp_sbuf, flb_sds_len(ctx->app)); + msgpack_pack_str_body(mp_sbuf, ctx->app, flb_sds_len(ctx->app)); + c++; + } + + return c; +} + +static flb_sds_t logdna_compose_payload(struct flb_logdna *ctx, + const void *data, size_t bytes, + const char *tag, int tag_len) +{ + int ret; + int len; + int total_lines; + int array_size = 0; + off_t map_off; + char *line_json; + flb_sds_t json; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + return NULL; + } + + /* Count number of records */ + total_lines = flb_mp_count(data, bytes); + + /* Initialize msgpack buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&mp_pck, 1); + + msgpack_pack_str(&mp_pck, 5); + msgpack_pack_str_body(&mp_pck, "lines", 5); + + msgpack_pack_array(&mp_pck, total_lines); + + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + map_off = mp_sbuf.size; + + array_size = 2; + msgpack_pack_map(&mp_pck, array_size); + + /* + * Append primary keys found, the return values is the number of appended + * keys to the record, we use that to adjust the map header size. + */ + ret = record_append_primary_keys(ctx, log_event.body, &mp_pck); + array_size += ret; + + /* Timestamp */ + msgpack_pack_str(&mp_pck, 9); + msgpack_pack_str_body(&mp_pck, "timestamp", 9); + msgpack_pack_int(&mp_pck, (int) flb_time_to_double(&log_event.timestamp)); + + /* Line */ + msgpack_pack_str(&mp_pck, 4); + msgpack_pack_str_body(&mp_pck, "line", 4); + + line_json = flb_msgpack_to_json_str(1024, log_event.body); + len = strlen(line_json); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, line_json, len); + flb_free(line_json); + + /* Adjust map header size */ + flb_mp_set_map_header_size(mp_sbuf.data + map_off, array_size); + } + + flb_log_event_decoder_destroy(&log_decoder); + + json = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); + + return json; +} + +static void logdna_config_destroy(struct flb_logdna *ctx) +{ + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + + if (ctx->tags_formatted) { + flb_sds_destroy(ctx->tags_formatted); + } + + flb_free(ctx); +} + +static struct flb_logdna *logdna_config_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret; + int len = 0; + char *hostname; + flb_sds_t tmp; + flb_sds_t encoded; + struct mk_list *head; + struct flb_slist_entry *tag_entry; + struct flb_logdna *ctx; + struct flb_upstream *upstream; + + /* Create context */ + ctx = flb_calloc(1, sizeof(struct flb_logdna)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + + /* Load config map */ + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + logdna_config_destroy(ctx); + return NULL; + } + + /* validate API key */ + if (!ctx->api_key) { + flb_plg_error(ins, "no `api_key` was set, this is a mandatory property"); + logdna_config_destroy(ctx); + return NULL; + } + + /* + * Tags: this value is a linked list of values created by the config map + * reader. + */ + if (ctx->tags) { + /* For every tag, make sure no empty spaces exists */ + mk_list_foreach(head, ctx->tags) { + tag_entry = mk_list_entry(head, struct flb_slist_entry, _head); + len += flb_sds_len(tag_entry->str) + 1; + } + + /* Compose a full tag for URI request */ + ctx->tags_formatted = flb_sds_create_size(len); + if (!ctx->tags_formatted) { + logdna_config_destroy(ctx); + return NULL; + } + + mk_list_foreach(head, ctx->tags) { + tag_entry = mk_list_entry(head, struct flb_slist_entry, _head); + + encoded = flb_uri_encode(tag_entry->str, + flb_sds_len(tag_entry->str)); + tmp = flb_sds_cat(ctx->tags_formatted, + encoded, flb_sds_len(encoded)); + ctx->tags_formatted = tmp; + flb_sds_destroy(encoded); + + if (tag_entry->_head.next != ctx->tags) { + tmp = flb_sds_cat(ctx->tags_formatted, ",", 1); + ctx->tags_formatted = tmp; + } + } + } + + /* + * Hostname: if the hostname was not set manually, try to get it from the + * environment variable. + * + * Note that hostname is populated by a config map, and config maps are + * immutable so we use an internal variable to do a final composition + * if required. + */ + if (!ctx->hostname) { + tmp = NULL; + hostname = (char *) flb_env_get(config->env, "HOSTNAME"); + if (hostname) { + ctx->_hostname = flb_sds_create(hostname); + } + else { + ctx->_hostname = flb_sds_create("unknown"); + } + } + else { + ctx->_hostname = flb_sds_create(ctx->hostname); + } + + /* Bail if unsuccessful hostname creation */ + if (!ctx->_hostname) { + flb_free(ctx); + return NULL; + } + + /* Create Upstream connection context */ + upstream = flb_upstream_create(config, + ctx->logdna_host, + ctx->logdna_port, + FLB_IO_TLS, ins->tls); + if (!upstream) { + flb_free(ctx); + return NULL; + } + ctx->u = upstream; + flb_output_upstream_set(ctx->u, ins); + + /* Set networking defaults */ + flb_output_net_default(FLB_LOGDNA_HOST, atoi(FLB_LOGDNA_PORT), ins); + return ctx; +} + +static int cb_logdna_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ + struct flb_logdna *ctx; + + ctx = logdna_config_create(ins, config); + if (!ctx) { + flb_plg_error(ins, "cannot initialize configuration"); + return -1; + } + + flb_output_set_context(ins, ctx); + + /* + * This plugin instance uses the HTTP client interface, let's register + * it debugging callbacks. + */ + flb_output_set_http_debug_callbacks(ins); + + flb_plg_info(ins, "configured, hostname=%s", ctx->hostname); + return 0; +} + +static void cb_logdna_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + int out_ret = FLB_OK; + size_t b_sent; + flb_sds_t uri; + flb_sds_t tmp; + flb_sds_t payload; + struct flb_logdna *ctx = out_context; + struct flb_connection *u_conn; + struct flb_http_client *c; + + /* Format the data to the expected LogDNA Payload */ + payload = logdna_compose_payload(ctx, + event_chunk->data, + event_chunk->size, + event_chunk->tag, + flb_sds_len(event_chunk->tag)); + if (!payload) { + flb_plg_error(ctx->ins, "cannot compose request payload"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Lookup an available connection context */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + flb_plg_error(ctx->ins, "no upstream connections available"); + flb_sds_destroy(payload); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Compose the HTTP URI */ + uri = flb_sds_create_size(256); + if (!uri) { + flb_plg_error(ctx->ins, "cannot allocate buffer for URI"); + flb_sds_destroy(payload); + flb_free(ctx); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + tmp = flb_sds_printf(&uri, + "/logs/ingest?hostname=%s&mac=%s&ip=%s&now=%lu&tags=%s", + ctx->_hostname, + ctx->mac_addr, + ctx->ip_addr, + time(NULL), + ctx->tags_formatted); + if (!tmp) { + flb_plg_error(ctx->ins, "error formatting URI"); + flb_sds_destroy(payload); + flb_free(ctx); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Create HTTP client context */ + c = flb_http_client(u_conn, FLB_HTTP_POST, uri, + payload, flb_sds_len(payload), + ctx->logdna_host, ctx->logdna_port, + NULL, 0); + if (!c) { + flb_plg_error(ctx->ins, "cannot create HTTP client context"); + flb_sds_destroy(uri); + flb_sds_destroy(payload); + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Set callback context to the HTTP client context */ + flb_http_set_callback_context(c, ctx->ins->callback); + + /* User Agent */ + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + + /* Add Content-Type header */ + flb_http_add_header(c, + FLB_LOGDNA_CT, sizeof(FLB_LOGDNA_CT) - 1, + FLB_LOGDNA_CT_JSON, sizeof(FLB_LOGDNA_CT_JSON) - 1); + + /* Add auth */ + flb_http_basic_auth(c, ctx->api_key, ""); + + flb_http_strip_port_from_host(c); + + /* Send HTTP request */ + ret = flb_http_do(c, &b_sent); + + /* Destroy buffers */ + flb_sds_destroy(uri); + flb_sds_destroy(payload); + + /* Validate HTTP client return status */ + if (ret == 0) { + /* + * Only allow the following HTTP status: + * + * - 200: OK + * - 201: Created + * - 202: Accepted + * - 203: no authorative resp + * - 204: No Content + * - 205: Reset content + * + */ + if (c->resp.status < 200 || c->resp.status > 205) { + if (c->resp.payload) { + flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i\n%s", + ctx->logdna_host, ctx->logdna_port, c->resp.status, + c->resp.payload); + } + else { + flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i", + ctx->logdna_host, ctx->logdna_port, c->resp.status); + } + out_ret = FLB_RETRY; + } + else { + if (c->resp.payload) { + flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s", + ctx->logdna_host, ctx->logdna_port, + c->resp.status, c->resp.payload); + } + else { + flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i", + ctx->logdna_host, ctx->logdna_port, + c->resp.status); + } + } + } + else { + flb_plg_error(ctx->ins, "could not flush records to %s:%s (http_do=%i)", + FLB_LOGDNA_HOST, FLB_LOGDNA_PORT, ret); + out_ret = FLB_RETRY; + } + + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(out_ret); +} + +static int cb_logdna_exit(void *data, struct flb_config *config) +{ + struct flb_logdna *ctx = data; + + if (!ctx) { + return 0; + } + + if (ctx->_hostname) { + flb_sds_destroy(ctx->_hostname); + } + logdna_config_destroy(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "logdna_host", FLB_LOGDNA_HOST, + 0, FLB_TRUE, offsetof(struct flb_logdna, logdna_host), + "LogDNA Host address" + }, + + { + FLB_CONFIG_MAP_INT, "logdna_port", FLB_LOGDNA_PORT, + 0, FLB_TRUE, offsetof(struct flb_logdna, logdna_port), + "LogDNA TCP port" + }, + + { + FLB_CONFIG_MAP_STR, "api_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_logdna, api_key), + "Logdna API key" + }, + + { + FLB_CONFIG_MAP_STR, "hostname", NULL, + 0, FLB_TRUE, offsetof(struct flb_logdna, hostname), + "Local Server or device host name" + }, + + { + FLB_CONFIG_MAP_STR, "mac", "", + 0, FLB_TRUE, offsetof(struct flb_logdna, mac_addr), + "MAC address (optional)" + }, + + { + FLB_CONFIG_MAP_STR, "ip", "", + 0, FLB_TRUE, offsetof(struct flb_logdna, ip_addr), + "IP address (optional)" + }, + + { + FLB_CONFIG_MAP_CLIST, "tags", "", + 0, FLB_TRUE, offsetof(struct flb_logdna, tags), + "Tags (optional)" + }, + + { + FLB_CONFIG_MAP_STR, "file", NULL, + 0, FLB_TRUE, offsetof(struct flb_logdna, file), + "Name of the monitored file (optional)" + }, + + { + FLB_CONFIG_MAP_STR, "app", "Fluent Bit", + 0, FLB_TRUE, offsetof(struct flb_logdna, app), + "Name of the application generating the data (optional)" + }, + + /* EOF */ + {0} + +}; + +/* Plugin reference */ +struct flb_output_plugin out_logdna_plugin = { + .name = "logdna", + .description = "LogDNA", + .cb_init = cb_logdna_init, + .cb_flush = cb_logdna_flush, + .cb_exit = cb_logdna_exit, + .config_map = config_map, + .flags = FLB_OUTPUT_NET | FLB_IO_TLS, +}; |