diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_nrlogs/newrelic.c')
-rw-r--r-- | src/fluent-bit/plugins/out_nrlogs/newrelic.c | 566 |
1 files changed, 0 insertions, 566 deletions
diff --git a/src/fluent-bit/plugins/out_nrlogs/newrelic.c b/src/fluent-bit/plugins/out_nrlogs/newrelic.c deleted file mode 100644 index 54163a66a..000000000 --- a/src/fluent-bit/plugins/out_nrlogs/newrelic.c +++ /dev/null @@ -1,566 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_version.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_gzip.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_http_client.h> -#include <fluent-bit/flb_log_event_decoder.h> - -#include "newrelic.h" - -static inline uint64_t time_to_milliseconds(struct flb_time *tms) -{ - return ((tms->tm.tv_sec * 1000) + (tms->tm.tv_nsec / 1000000)); -} - -static inline int key_matches(msgpack_object k, char *name, int len) -{ - if (k.type != MSGPACK_OBJECT_STR) { - return FLB_FALSE; - } - - if (k.via.str.size != len) { - return FLB_FALSE; - } - - if (memcmp(k.via.str.ptr, name, len) == 0) { - return FLB_TRUE; - } - - return FLB_FALSE; -} - -static int package_record(struct flb_time *ts, msgpack_object *map, - msgpack_packer *mp_pck) -{ - int i; - int map_size = 0; - uint64_t timestamp_ms; - int log = -1; - int message = -1; - msgpack_object k; - msgpack_object v; - - /* Check if 'message' or 'log' key exists in the record */ - for (i = 0; i < map->via.map.size; i++) { - k = map->via.map.ptr[i].key; - - if (message == -1 && key_matches(k, "message", 7) == FLB_TRUE) { - message = i; - continue; - } - - /* If we find 'log', just stop iterating */ - if (log == -1 && key_matches(k, "log", 3) == FLB_TRUE) { - log = i; - break; - } - } - - /* The log map contains at least 2 entries: 'timestamp' and 'attributes' */ - map_size = 2; - - /* If 'log' or 'message' are set, we add the 'message' key */ - if (log >= 0 || message >= 0) { - map_size++; - } - - /* Package the final record */ - msgpack_pack_map(mp_pck, map_size); - - /* Convert timestamp to milliseconds */ - timestamp_ms = time_to_milliseconds(ts); - - /* Pack timestamp */ - msgpack_pack_str(mp_pck, 9); - msgpack_pack_str_body(mp_pck, "timestamp", 9); - msgpack_pack_uint64(mp_pck, timestamp_ms); - - /* Keep 'log' over 'message' */ - if (log >= 0) { - message = -1; - msgpack_pack_str(mp_pck, 7); - msgpack_pack_str_body(mp_pck, "message", 7); - v = map->via.map.ptr[log].val; - msgpack_pack_object(mp_pck, v); - } - else if (message >= 0) { - msgpack_pack_str(mp_pck, 7); - msgpack_pack_str_body(mp_pck, "message", 7); - v = map->via.map.ptr[message].val; - msgpack_pack_object(mp_pck, v); - } - - /* Adjust attributes map size */ - map_size = map->via.map.size; - if (log >= 0 || message >= 0) { - map_size--; - } - - msgpack_pack_str(mp_pck, 10); - msgpack_pack_str_body(mp_pck, "attributes", 10); - msgpack_pack_map(mp_pck, map_size); - - /* Pack remaining attributes */ - for (i = 0; i < map->via.map.size; i++) { - k = map->via.map.ptr[i].key; - v = map->via.map.ptr[i].val; - - if (log >= 0 && key_matches(k, "log", 3) == FLB_TRUE) { - continue; - } - - if (message >= 0 && key_matches(k, "message", 7) == FLB_TRUE) { - continue; - } - - msgpack_pack_object(mp_pck, k); - msgpack_pack_object(mp_pck, v); - } - - return 0; -} - -static flb_sds_t newrelic_compose_payload(struct flb_newrelic *ctx, - const void *data, size_t bytes) -{ - int total_records; - flb_sds_t json; - msgpack_packer mp_pck; - msgpack_sbuffer mp_sbuf; - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - int ret; - - /* - * Following the New Relic Fluentd implementation, this is the - * suggested structure for our payload: - * - * payload = {[ - * 'common' => { - * 'attributes' => { - * 'plugin' => { - * 'type' => 'fluentd', - * 'version' => NewrelicFluentdOutput::VERSION, - * } - * } - * }, - * 'logs' => [] - * ]} - */ - - ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - - return NULL; - } - - /* Count number of records */ - total_records = flb_mp_count(data, bytes); - - /* Initialize msgpack buffers */ - msgpack_sbuffer_init(&mp_sbuf); - msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); - - /* The New Relic MELT API format is wrapped in an array */ - msgpack_pack_array(&mp_pck, 1); - - /* Map for 'common' and 'logs' */ - msgpack_pack_map(&mp_pck, 2); - - /* 'common' map */ - msgpack_pack_str(&mp_pck, 6); - msgpack_pack_str_body(&mp_pck, "common", 6); - msgpack_pack_map(&mp_pck, 1); - - /* common['attributes'] */ - msgpack_pack_str(&mp_pck, 10); - msgpack_pack_str_body(&mp_pck, "attributes", 10); - msgpack_pack_map(&mp_pck, 1); - - /* common['attributes']['plugin'] */ - msgpack_pack_str(&mp_pck, 6); - msgpack_pack_str_body(&mp_pck, "plugin", 6); - msgpack_pack_map(&mp_pck, 2); - - /* common['attributes']['plugin']['type'] = 'Fluent Bit' */ - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "type", 4); - msgpack_pack_str(&mp_pck, 10); - msgpack_pack_str_body(&mp_pck, "Fluent Bit", 10); - - /* common['attributes']['plugin']['version'] = 'FLB_VERSION_STR' */ - msgpack_pack_str(&mp_pck, 7); - msgpack_pack_str_body(&mp_pck, "version", 7); - msgpack_pack_str(&mp_pck, sizeof(FLB_VERSION_STR) - 1); - msgpack_pack_str_body(&mp_pck, FLB_VERSION_STR, sizeof(FLB_VERSION_STR) - 1); - - /* 'logs' array */ - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "logs", 4); - msgpack_pack_array(&mp_pck, total_records); - - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - /* Package the record */ - package_record(&log_event.timestamp, log_event.body, &mp_pck); - } - - flb_log_event_decoder_destroy(&log_decoder); - - json = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); - - msgpack_sbuffer_destroy(&mp_sbuf); - - return json; -} - -static void newrelic_config_destroy(struct flb_newrelic *ctx) -{ - flb_free(ctx->nr_protocol); - flb_free(ctx->nr_host); - flb_free(ctx->nr_uri); - - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - flb_free(ctx); -} - -static struct flb_newrelic *newrelic_config_create(struct flb_output_instance *ins, - struct flb_config *config) -{ - int ret; - char *port = NULL; - struct flb_newrelic *ctx; - struct flb_upstream *upstream; - - /* Create context */ - ctx = flb_calloc(1, sizeof(struct flb_newrelic)); - if (!ctx) { - flb_errno(); - return NULL; - } - ctx->ins = ins; - - /* Load config map */ - ret = flb_output_config_map_set(ins, (void *) ctx); - if (ret == -1) { - newrelic_config_destroy(ctx); - return NULL; - } - - /* At least we need one of api_key or license_key */ - if (!ctx->api_key && !ctx->license_key) { - flb_plg_error(ctx->ins, "no 'api_key' or 'license_key' was configured"); - newrelic_config_destroy(ctx); - return NULL; - } - - /* Parse Base URL */ - ret = flb_utils_url_split(ctx->base_uri, - &ctx->nr_protocol, - &ctx->nr_host, - &port, - &ctx->nr_uri); - if (ret == -1) { - flb_plg_error(ctx->ins, "error parsing base_uri '%s'", ctx->base_uri); - newrelic_config_destroy(ctx); - return NULL; - } - ctx->nr_port = atoi(port); - flb_free(port); - - if (strcasecmp(ctx->compress, "gzip") == 0) { - ctx->compress_gzip = FLB_TRUE; - } - else if (flb_utils_bool(ctx->compress) == FLB_FALSE) { - ctx->compress_gzip = FLB_FALSE; - } - else { - flb_plg_warn(ctx->ins, - "unknown compress encoding value '%s', " - "payload compression has been disabled", - ctx->compress); - ctx->compress_gzip = FLB_FALSE; - } - - /* Create Upstream connection context */ - upstream = flb_upstream_create(config, - ctx->nr_host, - ctx->nr_port, - FLB_IO_TLS, ins->tls); - if (!upstream) { - flb_free(ctx); - return NULL; - } - ctx->u = upstream; - flb_output_upstream_set(ctx->u, ins); - - return ctx; -} - -static int cb_newrelic_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) -{ - struct flb_newrelic *ctx; - - /* Create plugin context */ - ctx = newrelic_config_create(ins, config); - if (!ctx) { - flb_plg_error(ins, "cannot initialize configuration"); - return -1; - } - - /* Register context with plugin instance */ - flb_output_set_context(ins, ctx); - - /* - * This plugin instance uses the HTTP client interface, let's register - * it debugging callbacks. - */ - flb_output_set_http_debug_callbacks(ins); - - flb_plg_info(ins, "configured, hostname=%s:%i", ctx->nr_host, ctx->nr_port); - return 0; -} - -static void cb_newrelic_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) -{ - int ret; - int out_ret = FLB_OK; - int compressed = FLB_FALSE; - size_t b_sent; - flb_sds_t payload; - void *payload_buf = NULL; - size_t payload_size = 0; - struct flb_newrelic *ctx = out_context; - struct flb_connection *u_conn; - struct flb_http_client *c; - - /* Format the data to the expected Newrelic Payload */ - payload = newrelic_compose_payload(ctx, - event_chunk->data, event_chunk->size); - if (!payload) { - flb_plg_error(ctx->ins, "cannot compose request payload"); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Map payload */ - payload_buf = (void *) payload; - payload_size = flb_sds_len(payload); - - /* Should we compress the payload ? */ - if (ctx->compress_gzip == FLB_TRUE) { - ret = flb_gzip_compress(payload, flb_sds_len(payload), - &payload_buf, &payload_size); - if (ret == -1) { - flb_plg_error(ctx->ins, - "cannot gzip payload, disabling compression"); - } - else { - compressed = FLB_TRUE; - flb_sds_destroy(payload); - } - } - - /* Lookup an available connection context */ - u_conn = flb_upstream_conn_get(ctx->u); - if (!u_conn) { - flb_plg_error(ctx->ins, "no upstream connections available"); - if (compressed == FLB_TRUE) { - flb_free(payload_buf); - } - else { - flb_sds_destroy(payload); - } - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Create HTTP client context */ - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->nr_uri, - payload_buf, payload_size, - ctx->nr_host, ctx->nr_port, - NULL, 0); - if (!c) { - flb_plg_error(ctx->ins, "cannot create HTTP client context"); - if (compressed == FLB_TRUE) { - flb_free(payload_buf); - } - else { - flb_sds_destroy(payload); - } - flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Set callback context to the HTTP client context */ - flb_http_set_callback_context(c, ctx->ins->callback); - - /* User Agent */ - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - - /* API / License Key */ - if (ctx->license_key) { - flb_http_add_header(c, - "X-License-Key", 13, - ctx->license_key, flb_sds_len(ctx->license_key)); - } - else if (ctx->api_key) { - flb_http_add_header(c, - "X-Insert-Key", 12, - ctx->api_key, flb_sds_len(ctx->api_key)); - } - - /* Add Content-Type header */ - flb_http_add_header(c, - FLB_NEWRELIC_CT, sizeof(FLB_NEWRELIC_CT) - 1, - FLB_NEWRELIC_CT_JSON, sizeof(FLB_NEWRELIC_CT_JSON) - 1); - - /* Encoding */ - if (compressed == FLB_TRUE) { - flb_http_set_content_encoding_gzip(c); - } - - /* Send HTTP request */ - ret = flb_http_do(c, &b_sent); - - /* Destroy buffers */ - if (compressed == FLB_FALSE) { - flb_sds_destroy(payload); - } - else { - flb_free(payload_buf); - } - - /* Validate HTTP client return status */ - if (ret == 0) { - /* - * Only allow the following HTTP status: - * - * - 200: OK - * - 201: Created - * - 202: Accepted - * - 203: no authorative resp - * - 204: No Content - * - 205: Reset content - * - */ - if (c->resp.status < 200 || c->resp.status > 205) { - if (c->resp.payload) { - flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i\n%s", - ctx->nr_host, ctx->nr_port, c->resp.status, - c->resp.payload); - } - else { - flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i", - ctx->nr_host, ctx->nr_port, c->resp.status); - } - out_ret = FLB_RETRY; - } - else { - if (c->resp.payload) { - flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s", - ctx->nr_host, ctx->nr_port, - c->resp.status, c->resp.payload); - } - else { - flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i", - ctx->nr_host, ctx->nr_port, - c->resp.status); - } - } - } - else { - flb_plg_error(ctx->ins, "could not flush records to %s:%i (http_do=%i)", - ctx->nr_host, ctx->nr_port, ret); - out_ret = FLB_RETRY; - } - - flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(out_ret); -} - -static int cb_newrelic_exit(void *data, struct flb_config *config) -{ - struct flb_newrelic *ctx = data; - - if (!ctx) { - return 0; - } - - newrelic_config_destroy(ctx); - - return 0; -} - -/* Configuration properties map */ -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "base_uri", FLB_NEWRELIC_BASE_URI, - 0, FLB_TRUE, offsetof(struct flb_newrelic, base_uri), - "New Relic Host address" - }, - - { - FLB_CONFIG_MAP_STR, "api_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_newrelic, api_key), - "New Relic API Key" - }, - - { - FLB_CONFIG_MAP_STR, "license_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_newrelic, license_key), - "New Relic License Key" - }, - - { - FLB_CONFIG_MAP_STR, "compress", "gzip", - 0, FLB_TRUE, offsetof(struct flb_newrelic, compress), - "Set payload compression mechanism", - }, - - /* EOF */ - {0} - -}; - -/* Plugin reference */ -struct flb_output_plugin out_nrlogs_plugin = { - .name = "nrlogs", - .description = "New Relic", - .cb_init = cb_newrelic_init, - .cb_flush = cb_newrelic_flush, - .cb_exit = cb_newrelic_exit, - .config_map = config_map, - .flags = FLB_OUTPUT_NET | FLB_IO_TLS, -}; |