summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_datadog/datadog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/out_datadog/datadog.c')
-rw-r--r--src/fluent-bit/plugins/out_datadog/datadog.c568
1 files changed, 0 insertions, 568 deletions
diff --git a/src/fluent-bit/plugins/out_datadog/datadog.c b/src/fluent-bit/plugins/out_datadog/datadog.c
deleted file mode 100644
index 082ab0fac..000000000
--- a/src/fluent-bit/plugins/out_datadog/datadog.c
+++ /dev/null
@@ -1,568 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-#include <fluent-bit/flb_io.h>
-#include <fluent-bit/flb_log.h>
-#include <fluent-bit/flb_http_client.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_gzip.h>
-#include <fluent-bit/flb_config_map.h>
-#include <fluent-bit/flb_version.h>
-#include <fluent-bit/flb_log_event_decoder.h>
-
-#include <msgpack.h>
-
-#include "datadog.h"
-#include "datadog_conf.h"
-#include "datadog_remap.h"
-
-static int cb_datadog_init(struct flb_output_instance *ins,
- struct flb_config *config, void *data)
-{
- struct flb_out_datadog *ctx = NULL;
- (void) data;
-
- ctx = flb_datadog_conf_create(ins, config);
- if (!ctx) {
- return -1;
- }
-
- /* Set the plugin context */
- flb_output_set_context(ins, ctx);
- return 0;
-}
-
-static int64_t timestamp_format(const struct flb_time* tms) {
- int64_t timestamp = 0;
-
- /* Format the time, use milliseconds precision not nanoseconds */
- timestamp = tms->tm.tv_sec * 1000;
- timestamp += tms->tm.tv_nsec / 1000000;
-
- /* round up if necessary */
- if (tms->tm.tv_nsec % 1000000 >= 500000) {
- ++timestamp;
- }
- return timestamp;
-}
-
-static void dd_msgpack_pack_key_value_str(msgpack_packer* mp_pck,
- const char *key, size_t key_size,
- const char *val, size_t val_size)
-{
- msgpack_pack_str(mp_pck, key_size);
- msgpack_pack_str_body(mp_pck, key, key_size);
- msgpack_pack_str(mp_pck, val_size);
- msgpack_pack_str_body(mp_pck,val, val_size);
-}
-
-static int dd_compare_msgpack_obj_key_with_str(const msgpack_object obj, const char *key, size_t key_size) {
-
- if (obj.via.str.size == key_size && memcmp(obj.via.str.ptr,key, key_size) == 0) {
- return FLB_TRUE;
- }
-
- return FLB_FALSE;
-}
-
-static int datadog_format(struct flb_config *config,
- struct flb_input_instance *ins,
- void *plugin_context,
- void *flush_ctx,
- int event_type,
- const char *tag, int tag_len,
- const void *data, size_t bytes,
- void **out_data, size_t *out_size)
-{
- int i;
- int ind;
- int byte_cnt = 64;
- int remap_cnt;
- int ret;
- /* for msgpack global structs */
- size_t array_size = 0;
- msgpack_sbuffer mp_sbuf;
- msgpack_packer mp_pck;
- /* for sub msgpack objs */
- int map_size;
- int64_t timestamp;
- msgpack_object map;
- msgpack_object k;
- msgpack_object v;
- struct flb_out_datadog *ctx = plugin_context;
- struct flb_event_chunk *event_chunk;
-
- /* output buffer */
- flb_sds_t out_buf;
- flb_sds_t remapped_tags = NULL;
- flb_sds_t tmp = NULL;
- struct flb_log_event_decoder log_decoder;
- struct flb_log_event log_event;
-
- /* in normal flush callback we have the event_chunk set as flush context
- * so we don't need to calculate the event len.
- * But in test mode the formatter won't get the event_chunk as flush_ctx
- */
- if (flush_ctx != NULL) {
- event_chunk = flush_ctx;
- array_size = event_chunk->total_events;
- } else {
- array_size = flb_mp_count(data, bytes);
- }
-
- ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- flb_plg_error(ctx->ins,
- "Log event decoder initialization error : %d", ret);
-
- return -1;
- }
-
- /* Create temporary msgpack buffer */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- /* Prepare array for all entries */
- msgpack_pack_array(&mp_pck, array_size);
-
- while ((ret = flb_log_event_decoder_next(
- &log_decoder,
- &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
- timestamp = timestamp_format(&log_event.timestamp);
-
- map = *log_event.body;
- map_size = map.via.map.size;
-
- /*
- * msgpack requires knowing/allocating exact map size in advance, so we need to
- * loop through the map twice. First time here to count how many attr we can
- * remap to tags, and second time later where we actually perform the remapping.
- */
- remap_cnt = 0, byte_cnt = ctx->dd_tags ? flb_sds_len(ctx->dd_tags) : 0;
- if (ctx->remap) {
- for (i = 0; i < map_size; i++) {
- if (dd_attr_need_remapping(map.via.map.ptr[i].key,
- map.via.map.ptr[i].val) >= 0) {
- remap_cnt++;
- /*
- * here we also *estimated* the size of buffer needed to hold the
- * remapped tags. We can't know the size for sure until we do the
- * remapping, the estimation here is just for efficiency, so that
- * appending tags won't cause repeated resizing/copying
- */
- byte_cnt += 2 * (map.via.map.ptr[i].key.via.str.size +
- map.via.map.ptr[i].val.via.str.size);
- }
- }
-
- if (!remapped_tags) {
- remapped_tags = flb_sds_create_size(byte_cnt);
- if (!remapped_tags) {
- flb_errno();
- msgpack_sbuffer_destroy(&mp_sbuf);
- flb_log_event_decoder_destroy(&log_decoder);
- return -1;
- }
- }
- else if (flb_sds_len(remapped_tags) < byte_cnt) {
- tmp = flb_sds_increase(remapped_tags, byte_cnt - flb_sds_len(remapped_tags));
- if (!tmp) {
- flb_errno();
- flb_sds_destroy(remapped_tags);
- msgpack_sbuffer_destroy(&mp_sbuf);
- flb_log_event_decoder_destroy(&log_decoder);
- return -1;
- }
- remapped_tags = tmp;
- }
-
- /*
- * we reuse this buffer across messages, which means we have to clear it
- * for each message flb_sds doesn't have a clear function, so we copy a
- * empty string to achieve the same effect
- */
- remapped_tags = flb_sds_copy(remapped_tags, "", 0);
- }
-
- /*
- * build new object(map) with additional space for datadog entries for those
- * remapped attributes, we need to remove them from the map. Note: If there were
- * no dd_tags specified AND there will be remapped attributes, we need to add 1
- * to account for the new presense of the dd_tags
- */
- if (remap_cnt && (ctx->dd_tags == NULL)) {
- msgpack_pack_map(&mp_pck,
- ctx->nb_additional_entries + map_size + 1 - remap_cnt);
- }
- else {
- msgpack_pack_map(&mp_pck, ctx->nb_additional_entries + map_size - remap_cnt);
- }
-
- /* timestamp */
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->json_date_key));
- msgpack_pack_str_body(&mp_pck,
- ctx->json_date_key,
- flb_sds_len(ctx->json_date_key));
- msgpack_pack_int64(&mp_pck, timestamp);
-
- /* include_tag_key */
- if (ctx->include_tag_key == FLB_TRUE) {
- dd_msgpack_pack_key_value_str(&mp_pck,
- ctx->tag_key, flb_sds_len(ctx->tag_key),
- tag, tag_len);
- }
-
- /* dd_source */
- if (ctx->dd_source != NULL) {
- dd_msgpack_pack_key_value_str(&mp_pck,
- FLB_DATADOG_DD_SOURCE_KEY,
- sizeof(FLB_DATADOG_DD_SOURCE_KEY) -1,
- ctx->dd_source, flb_sds_len(ctx->dd_source));
- }
-
- /* dd_service */
- if (ctx->dd_service != NULL) {
- dd_msgpack_pack_key_value_str(&mp_pck,
- FLB_DATADOG_DD_SERVICE_KEY,
- sizeof(FLB_DATADOG_DD_SERVICE_KEY) -1,
- ctx->dd_service, flb_sds_len(ctx->dd_service));
- }
-
- /* Append initial object k/v */
- ind = 0;
- for (i = 0; i < map_size; i++) {
- k = map.via.map.ptr[i].key;
- v = map.via.map.ptr[i].val;
-
- /*
- * actually perform the remapping here. For matched attr, we remap and
- * append them to remapped_tags buffer, then skip the rest of processing
- * (so they won't be packed as attr)
- */
- if (ctx->remap && (ind = dd_attr_need_remapping(k, v)) >=0 ) {
- ret = remapping[ind].remap_to_tag(remapping[ind].remap_tag_name, v,
- &remapped_tags);
- if (ret < 0) {
- flb_plg_error(ctx->ins, "Failed to remap tag: %s, skipping", remapping[ind].remap_tag_name);
- }
- continue;
- }
-
- /* Mapping between input keys to specific datadog keys */
- if (ctx->dd_message_key != NULL &&
- dd_compare_msgpack_obj_key_with_str(k, ctx->dd_message_key,
- flb_sds_len(ctx->dd_message_key)) == FLB_TRUE) {
- msgpack_pack_str(&mp_pck, sizeof(FLB_DATADOG_DD_MESSAGE_KEY)-1);
- msgpack_pack_str_body(&mp_pck, FLB_DATADOG_DD_MESSAGE_KEY,
- sizeof(FLB_DATADOG_DD_MESSAGE_KEY)-1);
- }
- else {
- msgpack_pack_object(&mp_pck, k);
- }
-
- msgpack_pack_object(&mp_pck, v);
- }
-
- /* here we concatenate ctx->dd_tags and remapped_tags, depending on their presence */
- if (remap_cnt) {
- if (ctx->dd_tags != NULL) {
- tmp = flb_sds_cat(remapped_tags, FLB_DATADOG_TAG_SEPERATOR,
- strlen(FLB_DATADOG_TAG_SEPERATOR));
- if (!tmp) {
- flb_errno();
- flb_sds_destroy(remapped_tags);
- msgpack_sbuffer_destroy(&mp_sbuf);
- flb_log_event_decoder_destroy(&log_decoder);
- return -1;
- }
- remapped_tags = tmp;
- flb_sds_cat(remapped_tags, ctx->dd_tags, strlen(ctx->dd_tags));
- if (!tmp) {
- flb_errno();
- flb_sds_destroy(remapped_tags);
- msgpack_sbuffer_destroy(&mp_sbuf);
- flb_log_event_decoder_destroy(&log_decoder);
- return -1;
- }
- remapped_tags = tmp;
- }
- dd_msgpack_pack_key_value_str(&mp_pck,
- FLB_DATADOG_DD_TAGS_KEY,
- sizeof(FLB_DATADOG_DD_TAGS_KEY) -1,
- remapped_tags, flb_sds_len(remapped_tags));
- }
- else if (ctx->dd_tags != NULL) {
- dd_msgpack_pack_key_value_str(&mp_pck,
- FLB_DATADOG_DD_TAGS_KEY,
- sizeof(FLB_DATADOG_DD_TAGS_KEY) -1,
- ctx->dd_tags, flb_sds_len(ctx->dd_tags));
- }
- }
-
- /* Convert from msgpack to JSON */
- out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
- msgpack_sbuffer_destroy(&mp_sbuf);
-
- if (!out_buf) {
- flb_plg_error(ctx->ins, "error formatting JSON payload");
- if (remapped_tags) {
- flb_sds_destroy(remapped_tags);
- }
- flb_log_event_decoder_destroy(&log_decoder);
- return -1;
- }
-
- *out_data = out_buf;
- *out_size = flb_sds_len(out_buf);
-
- /* Cleanup */
- flb_log_event_decoder_destroy(&log_decoder);
-
- if (remapped_tags) {
- flb_sds_destroy(remapped_tags);
- }
-
- return 0;
-}
-
-static void cb_datadog_flush(struct flb_event_chunk *event_chunk,
- struct flb_output_flush *out_flush,
- struct flb_input_instance *i_ins,
- void *out_context,
- struct flb_config *config)
-{
- struct flb_out_datadog *ctx = out_context;
- struct flb_connection *upstream_conn;
- struct flb_http_client *client;
- void *out_buf;
- size_t out_size;
- flb_sds_t payload_buf;
- size_t payload_size = 0;
- void *final_payload_buf = NULL;
- size_t final_payload_size = 0;
- size_t b_sent;
- int ret = FLB_ERROR;
- int compressed = FLB_FALSE;
-
- /* Get upstream connection */
- upstream_conn = flb_upstream_conn_get(ctx->upstream);
- if (!upstream_conn) {
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
-
- /* Convert input data into a Datadog JSON payload */
- ret = datadog_format(config, i_ins,
- ctx, NULL,
- event_chunk->type,
- event_chunk->tag, flb_sds_len(event_chunk->tag),
- event_chunk->data, event_chunk->size,
- &out_buf, &out_size);
- if (ret == -1) {
- flb_upstream_conn_release(upstream_conn);
- FLB_OUTPUT_RETURN(FLB_ERROR);
- }
-
- payload_buf = (flb_sds_t) out_buf;
- payload_size = out_size;
-
- /* Should we compress the payload ? */
- if (ctx->compress_gzip == FLB_TRUE) {
- ret = flb_gzip_compress((void *) payload_buf, payload_size,
- &final_payload_buf, &final_payload_size);
- if (ret == -1) {
- flb_error("[out_http] cannot gzip payload, disabling compression");
- } else {
- compressed = FLB_TRUE;
- }
- } else {
- final_payload_buf = payload_buf;
- final_payload_size = payload_size;
- }
-
- /* Create HTTP client context */
- client = flb_http_client(upstream_conn, FLB_HTTP_POST, ctx->uri,
- final_payload_buf, final_payload_size,
- ctx->host, ctx->port,
- ctx->proxy, 0);
- if (!client) {
- flb_upstream_conn_release(upstream_conn);
- FLB_OUTPUT_RETURN(FLB_ERROR);
- }
-
- /* Add the required headers to the URI */
- flb_http_add_header(client, "User-Agent", 10, "Fluent-Bit", 10);
- flb_http_add_header(client, FLB_DATADOG_API_HDR, sizeof(FLB_DATADOG_API_HDR) - 1, ctx->api_key, flb_sds_len(ctx->api_key));
- flb_http_add_header(client, FLB_DATADOG_ORIGIN_HDR, sizeof(FLB_DATADOG_ORIGIN_HDR) - 1, "Fluent-Bit", 10);
- flb_http_add_header(client, FLB_DATADOG_ORIGIN_VERSION_HDR, sizeof(FLB_DATADOG_ORIGIN_VERSION_HDR) - 1, FLB_VERSION_STR, sizeof(FLB_VERSION_STR) - 1);
- flb_http_add_header(client,
- FLB_DATADOG_CONTENT_TYPE, sizeof(FLB_DATADOG_CONTENT_TYPE) - 1,
- FLB_DATADOG_MIME_JSON, sizeof(FLB_DATADOG_MIME_JSON) - 1);
-
- /* Content Encoding: gzip */
- if (compressed == FLB_TRUE) {
- flb_http_set_content_encoding_gzip(client);
- }
- /* TODO: Append other headers if needed*/
-
- /* finaly send the query */
- ret = flb_http_do(client, &b_sent);
- if (ret == 0) {
- if (client->resp.status < 200 || client->resp.status > 205) {
- flb_plg_error(ctx->ins, "%s%s:%i HTTP status=%i",
- ctx->scheme, ctx->host, ctx->port,
- client->resp.status);
- ret = FLB_RETRY;
- }
- else {
- if (client->resp.payload) {
- flb_plg_debug(ctx->ins, "%s%s, port=%i, HTTP status=%i payload=%s",
- ctx->scheme, ctx->host, ctx->port,
- client->resp.status, client->resp.payload);
- }
- else {
- flb_plg_debug(ctx->ins, "%s%s, port=%i, HTTP status=%i",
- ctx->scheme, ctx->host, ctx->port,
- client->resp.status);
- }
- ret = FLB_OK;
- }
- }
- else {
- flb_plg_error(ctx->ins, "could not flush records to %s:%i (http_do=%i)",
- ctx->host, ctx->port, ret);
- ret = FLB_RETRY;
- }
-
- /*
- * If the final_payload_buf buffer is different than payload_buf, means
- * we generated a different payload and must be freed.
- */
- if (final_payload_buf != payload_buf) {
- flb_free(final_payload_buf);
- }
- /* Destroy HTTP client context */
- flb_sds_destroy(payload_buf);
- flb_http_client_destroy(client);
- flb_upstream_conn_release(upstream_conn);
-
- FLB_OUTPUT_RETURN(ret);
-}
-
-
-static int cb_datadog_exit(void *data, struct flb_config *config)
-{
- struct flb_out_datadog *ctx = data;
-
- if (!ctx) {
- return 0;
- }
-
- flb_datadog_conf_destroy(ctx);
- return 0;
-}
-
-static struct flb_config_map config_map[] = {
- {
- FLB_CONFIG_MAP_STR, "compress", "false",
- 0, FLB_FALSE, 0,
- "compresses the payload in GZIP format, "
- "Datadog supports and recommends setting this to 'gzip'."
- },
- {
- FLB_CONFIG_MAP_STR, "apikey", NULL,
- 0, FLB_TRUE, offsetof(struct flb_out_datadog, api_key),
- "Datadog API key"
- },
- {
- FLB_CONFIG_MAP_STR, "dd_service", NULL,
- 0, FLB_TRUE, offsetof(struct flb_out_datadog, dd_service),
- "The human readable name for your service generating the logs "
- "- the name of your application or database."
- },
- {
- FLB_CONFIG_MAP_STR, "dd_source", NULL,
- 0, FLB_TRUE, offsetof(struct flb_out_datadog, dd_source),
- "A human readable name for the underlying technology of your service. "
- "For example, 'postgres' or 'nginx'."
- },
- {
- FLB_CONFIG_MAP_STR, "dd_tags", NULL,
- 0, FLB_TRUE, offsetof(struct flb_out_datadog, dd_tags),
- "The tags you want to assign to your logs in Datadog."
- },
-
- {
- FLB_CONFIG_MAP_STR, "proxy", NULL,
- 0, FLB_TRUE, offsetof(struct flb_out_datadog, proxy),
- "Specify an HTTP Proxy. The expected format of this value is http://host:port. "
- "Note that https is not supported yet."
- },
- {
- FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
- 0, FLB_TRUE, offsetof(struct flb_out_datadog, include_tag_key),
- "If enabled, tag is appended to output. "
- "The key name is used 'tag_key' property."
- },
- {
- FLB_CONFIG_MAP_STR, "tag_key", FLB_DATADOG_DEFAULT_TAG_KEY,
- 0, FLB_TRUE, offsetof(struct flb_out_datadog, tag_key),
- "The key name of tag. If 'include_tag_key' is false, "
- "This property is ignored"
- },
- {
- FLB_CONFIG_MAP_STR, "dd_message_key", NULL,
- 0, FLB_TRUE, offsetof(struct flb_out_datadog, dd_message_key),
- "By default, the plugin searches for the key 'log' "
- "and remap the value to the key 'message'. "
- "If the property is set, the plugin will search the property name key."
- },
- {
- FLB_CONFIG_MAP_STR, "provider", NULL,
- 0, FLB_FALSE, 0,
- "To activate the remapping, specify configuration flag provider with value 'ecs'"
- },
- {
- FLB_CONFIG_MAP_STR, "json_date_key", FLB_DATADOG_DEFAULT_TIME_KEY,
- 0, FLB_TRUE, offsetof(struct flb_out_datadog, json_date_key),
- "Date key name for output."
- },
-
- /* EOF */
- {0}
-};
-
-struct flb_output_plugin out_datadog_plugin = {
- .name = "datadog",
- .description = "Send events to DataDog HTTP Event Collector",
- .cb_init = cb_datadog_init,
- .cb_flush = cb_datadog_flush,
- .cb_exit = cb_datadog_exit,
-
- /* Test */
- .test_formatter.callback = datadog_format,
-
- /* Config map */
- .config_map = config_map,
-
- /* Plugin flags */
- .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
-};