diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_logdna/logdna.c')
-rw-r--r-- | src/fluent-bit/plugins/out_logdna/logdna.c | 591 |
1 files changed, 0 insertions, 591 deletions
diff --git a/src/fluent-bit/plugins/out_logdna/logdna.c b/src/fluent-bit/plugins/out_logdna/logdna.c deleted file mode 100644 index e3ab9f56f..000000000 --- a/src/fluent-bit/plugins/out_logdna/logdna.c +++ /dev/null @@ -1,591 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_mp.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_env.h> -#include <fluent-bit/flb_http_client.h> -#include <fluent-bit/flb_log_event_decoder.h> - -#include "logdna.h" - -static inline int primary_key_check(msgpack_object k, char *name, int len) -{ - if (k.type != MSGPACK_OBJECT_STR) { - return FLB_FALSE; - } - - if (k.via.str.size != len) { - return FLB_FALSE; - } - - if (memcmp(k.via.str.ptr, name, len) == 0) { - return FLB_TRUE; - } - - return FLB_FALSE; -} - -/* - * This function looks for the following keys and add them to the buffer - * - * - level or severity - * - file - * - app - * - meta - */ -static int record_append_primary_keys(struct flb_logdna *ctx, - msgpack_object *map, - msgpack_packer *mp_sbuf) -{ - int i; - int c = 0; - msgpack_object *level = NULL; - msgpack_object *file = NULL; - msgpack_object *app = NULL; - msgpack_object *meta = NULL; - msgpack_object k; - msgpack_object v; - - for (i = 0; i < map->via.array.size; i++) { - k = map->via.map.ptr[i].key; - v = map->via.map.ptr[i].val; - - /* Level - optional */ - if (!level && - (primary_key_check(k, "level", 5) == FLB_TRUE || - primary_key_check(k, "severity", 8) == FLB_TRUE)) { - level = &k; - msgpack_pack_str(mp_sbuf, 5); - msgpack_pack_str_body(mp_sbuf, "level", 5); - msgpack_pack_object(mp_sbuf, v); - c++; - } - - /* Meta - optional */ - if (!meta && primary_key_check(k, "meta", 4) == FLB_TRUE) { - meta = &k; - msgpack_pack_str(mp_sbuf, 4); - msgpack_pack_str_body(mp_sbuf, "meta", 4); - msgpack_pack_object(mp_sbuf, v); - c++; - } - - /* File */ - if (!file && primary_key_check(k, "file", 4) == FLB_TRUE) { - file = &k; - msgpack_pack_str(mp_sbuf, 4); - msgpack_pack_str_body(mp_sbuf, "file", 4); - msgpack_pack_object(mp_sbuf, v); - c++; - } - - /* App */ - if (primary_key_check(k, "app", 3) == FLB_TRUE) { - app = &k; - msgpack_pack_str(mp_sbuf, 3); - msgpack_pack_str_body(mp_sbuf, "app", 3); - msgpack_pack_object(mp_sbuf, v); - c++; - } - } - - /* Set the global file name if the record did not provided one */ - if (!file && ctx->file) { - msgpack_pack_str(mp_sbuf, 4); - msgpack_pack_str_body(mp_sbuf, "file", 4); - msgpack_pack_str(mp_sbuf, flb_sds_len(ctx->file)); - msgpack_pack_str_body(mp_sbuf, ctx->file, flb_sds_len(ctx->file)); - c++; - } - - - /* If no application name is set, set the default */ - if (!app) { - msgpack_pack_str(mp_sbuf, 3); - msgpack_pack_str_body(mp_sbuf, "app", 3); - msgpack_pack_str(mp_sbuf, flb_sds_len(ctx->app)); - msgpack_pack_str_body(mp_sbuf, ctx->app, flb_sds_len(ctx->app)); - c++; - } - - return c; -} - -static flb_sds_t logdna_compose_payload(struct flb_logdna *ctx, - const void *data, size_t bytes, - const char *tag, int tag_len) -{ - int ret; - int len; - int total_lines; - int array_size = 0; - off_t map_off; - char *line_json; - flb_sds_t json; - msgpack_packer mp_pck; - msgpack_sbuffer mp_sbuf; - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - - ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - - return NULL; - } - - /* Count number of records */ - total_lines = flb_mp_count(data, bytes); - - /* Initialize msgpack buffers */ - msgpack_sbuffer_init(&mp_sbuf); - msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); - - msgpack_pack_map(&mp_pck, 1); - - msgpack_pack_str(&mp_pck, 5); - msgpack_pack_str_body(&mp_pck, "lines", 5); - - msgpack_pack_array(&mp_pck, total_lines); - - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - map_off = mp_sbuf.size; - - array_size = 2; - msgpack_pack_map(&mp_pck, array_size); - - /* - * Append primary keys found, the return values is the number of appended - * keys to the record, we use that to adjust the map header size. - */ - ret = record_append_primary_keys(ctx, log_event.body, &mp_pck); - array_size += ret; - - /* Timestamp */ - msgpack_pack_str(&mp_pck, 9); - msgpack_pack_str_body(&mp_pck, "timestamp", 9); - msgpack_pack_int(&mp_pck, (int) flb_time_to_double(&log_event.timestamp)); - - /* Line */ - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "line", 4); - - line_json = flb_msgpack_to_json_str(1024, log_event.body); - len = strlen(line_json); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, line_json, len); - flb_free(line_json); - - /* Adjust map header size */ - flb_mp_set_map_header_size(mp_sbuf.data + map_off, array_size); - } - - flb_log_event_decoder_destroy(&log_decoder); - - json = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); - msgpack_sbuffer_destroy(&mp_sbuf); - - return json; -} - -static void logdna_config_destroy(struct flb_logdna *ctx) -{ - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - - if (ctx->tags_formatted) { - flb_sds_destroy(ctx->tags_formatted); - } - - flb_free(ctx); -} - -static struct flb_logdna *logdna_config_create(struct flb_output_instance *ins, - struct flb_config *config) -{ - int ret; - int len = 0; - char *hostname; - flb_sds_t tmp; - flb_sds_t encoded; - struct mk_list *head; - struct flb_slist_entry *tag_entry; - struct flb_logdna *ctx; - struct flb_upstream *upstream; - - /* Create context */ - ctx = flb_calloc(1, sizeof(struct flb_logdna)); - if (!ctx) { - flb_errno(); - return NULL; - } - ctx->ins = ins; - - /* Load config map */ - ret = flb_output_config_map_set(ins, (void *) ctx); - if (ret == -1) { - logdna_config_destroy(ctx); - return NULL; - } - - /* validate API key */ - if (!ctx->api_key) { - flb_plg_error(ins, "no `api_key` was set, this is a mandatory property"); - logdna_config_destroy(ctx); - return NULL; - } - - /* - * Tags: this value is a linked list of values created by the config map - * reader. - */ - if (ctx->tags) { - /* For every tag, make sure no empty spaces exists */ - mk_list_foreach(head, ctx->tags) { - tag_entry = mk_list_entry(head, struct flb_slist_entry, _head); - len += flb_sds_len(tag_entry->str) + 1; - } - - /* Compose a full tag for URI request */ - ctx->tags_formatted = flb_sds_create_size(len); - if (!ctx->tags_formatted) { - logdna_config_destroy(ctx); - return NULL; - } - - mk_list_foreach(head, ctx->tags) { - tag_entry = mk_list_entry(head, struct flb_slist_entry, _head); - - encoded = flb_uri_encode(tag_entry->str, - flb_sds_len(tag_entry->str)); - tmp = flb_sds_cat(ctx->tags_formatted, - encoded, flb_sds_len(encoded)); - ctx->tags_formatted = tmp; - flb_sds_destroy(encoded); - - if (tag_entry->_head.next != ctx->tags) { - tmp = flb_sds_cat(ctx->tags_formatted, ",", 1); - ctx->tags_formatted = tmp; - } - } - } - - /* - * Hostname: if the hostname was not set manually, try to get it from the - * environment variable. - * - * Note that hostname is populated by a config map, and config maps are - * immutable so we use an internal variable to do a final composition - * if required. - */ - if (!ctx->hostname) { - tmp = NULL; - hostname = (char *) flb_env_get(config->env, "HOSTNAME"); - if (hostname) { - ctx->_hostname = flb_sds_create(hostname); - } - else { - ctx->_hostname = flb_sds_create("unknown"); - } - } - else { - ctx->_hostname = flb_sds_create(ctx->hostname); - } - - /* Bail if unsuccessful hostname creation */ - if (!ctx->_hostname) { - flb_free(ctx); - return NULL; - } - - /* Create Upstream connection context */ - upstream = flb_upstream_create(config, - ctx->logdna_host, - ctx->logdna_port, - FLB_IO_TLS, ins->tls); - if (!upstream) { - flb_free(ctx); - return NULL; - } - ctx->u = upstream; - flb_output_upstream_set(ctx->u, ins); - - /* Set networking defaults */ - flb_output_net_default(FLB_LOGDNA_HOST, atoi(FLB_LOGDNA_PORT), ins); - return ctx; -} - -static int cb_logdna_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) -{ - struct flb_logdna *ctx; - - ctx = logdna_config_create(ins, config); - if (!ctx) { - flb_plg_error(ins, "cannot initialize configuration"); - return -1; - } - - flb_output_set_context(ins, ctx); - - /* - * This plugin instance uses the HTTP client interface, let's register - * it debugging callbacks. - */ - flb_output_set_http_debug_callbacks(ins); - - flb_plg_info(ins, "configured, hostname=%s", ctx->hostname); - return 0; -} - -static void cb_logdna_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) -{ - int ret; - int out_ret = FLB_OK; - size_t b_sent; - flb_sds_t uri; - flb_sds_t tmp; - flb_sds_t payload; - struct flb_logdna *ctx = out_context; - struct flb_connection *u_conn; - struct flb_http_client *c; - - /* Format the data to the expected LogDNA Payload */ - payload = logdna_compose_payload(ctx, - event_chunk->data, - event_chunk->size, - event_chunk->tag, - flb_sds_len(event_chunk->tag)); - if (!payload) { - flb_plg_error(ctx->ins, "cannot compose request payload"); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Lookup an available connection context */ - u_conn = flb_upstream_conn_get(ctx->u); - if (!u_conn) { - flb_plg_error(ctx->ins, "no upstream connections available"); - flb_sds_destroy(payload); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Compose the HTTP URI */ - uri = flb_sds_create_size(256); - if (!uri) { - flb_plg_error(ctx->ins, "cannot allocate buffer for URI"); - flb_sds_destroy(payload); - flb_free(ctx); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - tmp = flb_sds_printf(&uri, - "/logs/ingest?hostname=%s&mac=%s&ip=%s&now=%lu&tags=%s", - ctx->_hostname, - ctx->mac_addr, - ctx->ip_addr, - time(NULL), - ctx->tags_formatted); - if (!tmp) { - flb_plg_error(ctx->ins, "error formatting URI"); - flb_sds_destroy(payload); - flb_free(ctx); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Create HTTP client context */ - c = flb_http_client(u_conn, FLB_HTTP_POST, uri, - payload, flb_sds_len(payload), - ctx->logdna_host, ctx->logdna_port, - NULL, 0); - if (!c) { - flb_plg_error(ctx->ins, "cannot create HTTP client context"); - flb_sds_destroy(uri); - flb_sds_destroy(payload); - flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Set callback context to the HTTP client context */ - flb_http_set_callback_context(c, ctx->ins->callback); - - /* User Agent */ - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - - /* Add Content-Type header */ - flb_http_add_header(c, - FLB_LOGDNA_CT, sizeof(FLB_LOGDNA_CT) - 1, - FLB_LOGDNA_CT_JSON, sizeof(FLB_LOGDNA_CT_JSON) - 1); - - /* Add auth */ - flb_http_basic_auth(c, ctx->api_key, ""); - - flb_http_strip_port_from_host(c); - - /* Send HTTP request */ - ret = flb_http_do(c, &b_sent); - - /* Destroy buffers */ - flb_sds_destroy(uri); - flb_sds_destroy(payload); - - /* Validate HTTP client return status */ - if (ret == 0) { - /* - * Only allow the following HTTP status: - * - * - 200: OK - * - 201: Created - * - 202: Accepted - * - 203: no authorative resp - * - 204: No Content - * - 205: Reset content - * - */ - if (c->resp.status < 200 || c->resp.status > 205) { - if (c->resp.payload) { - flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i\n%s", - ctx->logdna_host, ctx->logdna_port, c->resp.status, - c->resp.payload); - } - else { - flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i", - ctx->logdna_host, ctx->logdna_port, c->resp.status); - } - out_ret = FLB_RETRY; - } - else { - if (c->resp.payload) { - flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s", - ctx->logdna_host, ctx->logdna_port, - c->resp.status, c->resp.payload); - } - else { - flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i", - ctx->logdna_host, ctx->logdna_port, - c->resp.status); - } - } - } - else { - flb_plg_error(ctx->ins, "could not flush records to %s:%s (http_do=%i)", - FLB_LOGDNA_HOST, FLB_LOGDNA_PORT, ret); - out_ret = FLB_RETRY; - } - - flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(out_ret); -} - -static int cb_logdna_exit(void *data, struct flb_config *config) -{ - struct flb_logdna *ctx = data; - - if (!ctx) { - return 0; - } - - if (ctx->_hostname) { - flb_sds_destroy(ctx->_hostname); - } - logdna_config_destroy(ctx); - return 0; -} - -/* Configuration properties map */ -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "logdna_host", FLB_LOGDNA_HOST, - 0, FLB_TRUE, offsetof(struct flb_logdna, logdna_host), - "LogDNA Host address" - }, - - { - FLB_CONFIG_MAP_INT, "logdna_port", FLB_LOGDNA_PORT, - 0, FLB_TRUE, offsetof(struct flb_logdna, logdna_port), - "LogDNA TCP port" - }, - - { - FLB_CONFIG_MAP_STR, "api_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_logdna, api_key), - "Logdna API key" - }, - - { - FLB_CONFIG_MAP_STR, "hostname", NULL, - 0, FLB_TRUE, offsetof(struct flb_logdna, hostname), - "Local Server or device host name" - }, - - { - FLB_CONFIG_MAP_STR, "mac", "", - 0, FLB_TRUE, offsetof(struct flb_logdna, mac_addr), - "MAC address (optional)" - }, - - { - FLB_CONFIG_MAP_STR, "ip", "", - 0, FLB_TRUE, offsetof(struct flb_logdna, ip_addr), - "IP address (optional)" - }, - - { - FLB_CONFIG_MAP_CLIST, "tags", "", - 0, FLB_TRUE, offsetof(struct flb_logdna, tags), - "Tags (optional)" - }, - - { - FLB_CONFIG_MAP_STR, "file", NULL, - 0, FLB_TRUE, offsetof(struct flb_logdna, file), - "Name of the monitored file (optional)" - }, - - { - FLB_CONFIG_MAP_STR, "app", "Fluent Bit", - 0, FLB_TRUE, offsetof(struct flb_logdna, app), - "Name of the application generating the data (optional)" - }, - - /* EOF */ - {0} - -}; - -/* Plugin reference */ -struct flb_output_plugin out_logdna_plugin = { - .name = "logdna", - .description = "LogDNA", - .cb_init = cb_logdna_init, - .cb_flush = cb_logdna_flush, - .cb_exit = cb_logdna_exit, - .config_map = config_map, - .flags = FLB_OUTPUT_NET | FLB_IO_TLS, -}; |