summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_es/es.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/out_es/es.c')
-rw-r--r--src/fluent-bit/plugins/out_es/es.c1230
1 files changed, 0 insertions, 1230 deletions
diff --git a/src/fluent-bit/plugins/out_es/es.c b/src/fluent-bit/plugins/out_es/es.c
deleted file mode 100644
index db2bcee5b..000000000
--- a/src/fluent-bit/plugins/out_es/es.c
+++ /dev/null
@@ -1,1230 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-#include <fluent-bit/flb_utils.h>
-#include <fluent-bit/flb_network.h>
-#include <fluent-bit/flb_http_client.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_signv4.h>
-#include <fluent-bit/flb_aws_credentials.h>
-#include <fluent-bit/flb_gzip.h>
-#include <fluent-bit/flb_record_accessor.h>
-#include <fluent-bit/flb_ra_key.h>
-#include <fluent-bit/flb_log_event_decoder.h>
-#include <msgpack.h>
-
-#include <time.h>
-
-#include "es.h"
-#include "es_conf.h"
-#include "es_bulk.h"
-#include "murmur3.h"
-
-struct flb_output_plugin out_es_plugin;
-
-static int es_pack_array_content(msgpack_packer *tmp_pck,
- msgpack_object array,
- struct flb_elasticsearch *ctx);
-
-#ifdef FLB_HAVE_AWS
-static flb_sds_t add_aws_auth(struct flb_http_client *c,
- struct flb_elasticsearch *ctx)
-{
- flb_sds_t signature = NULL;
- int ret;
-
- flb_plg_debug(ctx->ins, "Signing request with AWS Sigv4");
-
- /* Amazon OpenSearch Sigv4 does not allow the host header to include the port */
- ret = flb_http_strip_port_from_host(c);
- if (ret < 0) {
- flb_plg_error(ctx->ins, "could not strip port from host for sigv4");
- return NULL;
- }
-
- /* AWS Fluent Bit user agent */
- flb_http_add_header(c, "User-Agent", 10, "aws-fluent-bit-plugin", 21);
-
- signature = flb_signv4_do(c, FLB_TRUE, FLB_TRUE, time(NULL),
- ctx->aws_region, ctx->aws_service_name,
- S3_MODE_SIGNED_PAYLOAD, ctx->aws_unsigned_headers,
- ctx->aws_provider);
- if (!signature) {
- flb_plg_error(ctx->ins, "could not sign request with sigv4");
- return NULL;
- }
- return signature;
-}
-#endif /* FLB_HAVE_AWS */
-
-static int es_pack_map_content(msgpack_packer *tmp_pck,
- msgpack_object map,
- struct flb_elasticsearch *ctx)
-{
- int i;
- char *ptr_key = NULL;
- char buf_key[256];
- msgpack_object *k;
- msgpack_object *v;
-
- for (i = 0; i < map.via.map.size; i++) {
- k = &map.via.map.ptr[i].key;
- v = &map.via.map.ptr[i].val;
- ptr_key = NULL;
-
- /* Store key */
- const char *key_ptr = NULL;
- size_t key_size = 0;
-
- if (k->type == MSGPACK_OBJECT_BIN) {
- key_ptr = k->via.bin.ptr;
- key_size = k->via.bin.size;
- }
- else if (k->type == MSGPACK_OBJECT_STR) {
- key_ptr = k->via.str.ptr;
- key_size = k->via.str.size;
- }
-
- if (key_size < (sizeof(buf_key) - 1)) {
- memcpy(buf_key, key_ptr, key_size);
- buf_key[key_size] = '\0';
- ptr_key = buf_key;
- }
- else {
- /* Long map keys have a performance penalty */
- ptr_key = flb_malloc(key_size + 1);
- if (!ptr_key) {
- flb_errno();
- return -1;
- }
-
- memcpy(ptr_key, key_ptr, key_size);
- ptr_key[key_size] = '\0';
- }
-
- /*
- * Sanitize key name, Elastic Search 2.x don't allow dots
- * in field names:
- *
- * https://goo.gl/R5NMTr
- */
- if (ctx->replace_dots == FLB_TRUE) {
- char *p = ptr_key;
- char *end = ptr_key + key_size;
- while (p != end) {
- if (*p == '.') *p = '_';
- p++;
- }
- }
-
- /* Append the key */
- msgpack_pack_str(tmp_pck, key_size);
- msgpack_pack_str_body(tmp_pck, ptr_key, key_size);
-
- /* Release temporary key if was allocated */
- if (ptr_key && ptr_key != buf_key) {
- flb_free(ptr_key);
- }
- ptr_key = NULL;
-
- /*
- * The value can be any data type, if it's a map we need to
- * sanitize to avoid dots.
- */
- if (v->type == MSGPACK_OBJECT_MAP) {
- msgpack_pack_map(tmp_pck, v->via.map.size);
- es_pack_map_content(tmp_pck, *v, ctx);
- }
- /*
- * The value can be any data type, if it's an array we need to
- * pass it to es_pack_array_content.
- */
- else if (v->type == MSGPACK_OBJECT_ARRAY) {
- msgpack_pack_array(tmp_pck, v->via.array.size);
- es_pack_array_content(tmp_pck, *v, ctx);
- }
- else {
- msgpack_pack_object(tmp_pck, *v);
- }
- }
- return 0;
-}
-
-/*
- * Iterate through the array and sanitize elements.
- * Mutual recursion with es_pack_map_content.
- */
-static int es_pack_array_content(msgpack_packer *tmp_pck,
- msgpack_object array,
- struct flb_elasticsearch *ctx)
-{
- int i;
- msgpack_object *e;
-
- for (i = 0; i < array.via.array.size; i++) {
- e = &array.via.array.ptr[i];
- if (e->type == MSGPACK_OBJECT_MAP)
- {
- msgpack_pack_map(tmp_pck, e->via.map.size);
- es_pack_map_content(tmp_pck, *e, ctx);
- }
- else if (e->type == MSGPACK_OBJECT_ARRAY)
- {
- msgpack_pack_array(tmp_pck, e->via.array.size);
- es_pack_array_content(tmp_pck, *e, ctx);
- }
- else
- {
- msgpack_pack_object(tmp_pck, *e);
- }
- }
- return 0;
-}
-
-/*
- * Get _id value from incoming record.
- * If it successed, return the value as flb_sds_t.
- * If it failed, return NULL.
-*/
-static flb_sds_t es_get_id_value(struct flb_elasticsearch *ctx,
- msgpack_object *map)
-{
- struct flb_ra_value *rval = NULL;
- flb_sds_t tmp_str;
- rval = flb_ra_get_value_object(ctx->ra_id_key, *map);
- if (rval == NULL) {
- flb_plg_warn(ctx->ins, "the value of %s is missing",
- ctx->id_key);
- return NULL;
- }
- else if(rval->o.type != MSGPACK_OBJECT_STR) {
- flb_plg_warn(ctx->ins, "the value of %s is not string",
- ctx->id_key);
- flb_ra_key_value_destroy(rval);
- return NULL;
- }
-
- tmp_str = flb_sds_create_len(rval->o.via.str.ptr,
- rval->o.via.str.size);
- if (tmp_str == NULL) {
- flb_plg_warn(ctx->ins, "cannot create ID string from record");
- flb_ra_key_value_destroy(rval);
- return NULL;
- }
- flb_ra_key_value_destroy(rval);
- return tmp_str;
-}
-
-static int compose_index_header(struct flb_elasticsearch *ctx,
- int es_index_custom_len,
- char *logstash_index, size_t logstash_index_size,
- char *separator_str,
- struct tm *tm)
-{
- int ret;
- int len;
- char *p;
- size_t s;
-
- /* Compose Index header */
- if (es_index_custom_len > 0) {
- p = logstash_index + es_index_custom_len;
- } else {
- p = logstash_index + flb_sds_len(ctx->logstash_prefix);
- }
- len = p - logstash_index;
- ret = snprintf(p, logstash_index_size - len, "%s",
- separator_str);
- if (ret > logstash_index_size - len) {
- /* exceed limit */
- return -1;
- }
- p += strlen(separator_str);
- len += strlen(separator_str);
-
- s = strftime(p, logstash_index_size - len,
- ctx->logstash_dateformat, tm);
- if (s==0) {
- /* exceed limit */
- return -1;
- }
- p += s;
- *p++ = '\0';
-
- return 0;
-}
-
-/*
- * Convert the internal Fluent Bit data representation to the required
- * one by Elasticsearch.
- *
- * 'Sadly' this process involves to convert from Msgpack to JSON.
- */
-static int elasticsearch_format(struct flb_config *config,
- struct flb_input_instance *ins,
- void *plugin_context,
- void *flush_ctx,
- int event_type,
- const char *tag, int tag_len,
- const void *data, size_t bytes,
- void **out_data, size_t *out_size)
-{
- int ret;
- int len;
- int map_size;
- int index_len = 0;
- size_t s = 0;
- size_t off = 0;
- size_t off_prev = 0;
- char *es_index;
- char logstash_index[256];
- char time_formatted[256];
- char index_formatted[256];
- char es_uuid[37];
- flb_sds_t out_buf;
- size_t out_buf_len = 0;
- flb_sds_t tmp_buf;
- flb_sds_t id_key_str = NULL;
- // msgpack_unpacked result;
- // msgpack_object root;
- msgpack_object map;
- // msgpack_object *obj;
- flb_sds_t j_index;
- struct es_bulk *bulk;
- struct tm tm;
- struct flb_time tms;
- msgpack_sbuffer tmp_sbuf;
- msgpack_packer tmp_pck;
- uint16_t hash[8];
- int es_index_custom_len;
- struct flb_elasticsearch *ctx = plugin_context;
- struct flb_log_event_decoder log_decoder;
- struct flb_log_event log_event;
-
- j_index = flb_sds_create_size(ES_BULK_HEADER);
- if (j_index == NULL) {
- flb_errno();
- return -1;
- }
-
- ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- flb_plg_error(ctx->ins,
- "Log event decoder initialization error : %d", ret);
- flb_sds_destroy(j_index);
-
- return -1;
- }
-
- /* Create the bulk composer */
- bulk = es_bulk_create(bytes);
- if (!bulk) {
- flb_log_event_decoder_destroy(&log_decoder);
- flb_sds_destroy(j_index);
- return -1;
- }
-
- /* Copy logstash prefix if logstash format is enabled */
- if (ctx->logstash_format == FLB_TRUE) {
- strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index));
- logstash_index[sizeof(logstash_index) - 1] = '\0';
- }
-
- /*
- * If logstash format and id generation are disabled, pre-generate
- * the index line for all records.
- *
- * The header stored in 'j_index' will be used for the all records on
- * this payload.
- */
- if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
- flb_time_get(&tms);
- gmtime_r(&tms.tm.tv_sec, &tm);
- strftime(index_formatted, sizeof(index_formatted) - 1,
- ctx->index, &tm);
- es_index = index_formatted;
- if (ctx->suppress_type_name) {
- index_len = flb_sds_snprintf(&j_index,
- flb_sds_alloc(j_index),
- ES_BULK_INDEX_FMT_WITHOUT_TYPE,
- ctx->es_action,
- es_index);
- }
- else {
- index_len = flb_sds_snprintf(&j_index,
- flb_sds_alloc(j_index),
- ES_BULK_INDEX_FMT,
- ctx->es_action,
- es_index, ctx->type);
- }
- }
-
- /*
- * Some broken clients may have time drift up to year 1970
- * this will generate corresponding index in Elasticsearch
- * in order to prevent generating millions of indexes
- * we can set to always use current time for index generation
- */
- if (ctx->current_time_index == FLB_TRUE) {
- flb_time_get(&tms);
- }
-
- /* Iterate each record and do further formatting */
- while ((ret = flb_log_event_decoder_next(
- &log_decoder,
- &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
-
- /* Only pop time from record if current_time_index is disabled */
- if (ctx->current_time_index == FLB_FALSE) {
- flb_time_copy(&tms, &log_event.timestamp);
- }
-
- map = *log_event.body;
- map_size = map.via.map.size;
-
- es_index_custom_len = 0;
- if (ctx->logstash_prefix_key) {
- flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key,
- (char *) tag, tag_len,
- map, NULL);
- if (v) {
- len = flb_sds_len(v);
- if (len > 128) {
- len = 128;
- memcpy(logstash_index, v, 128);
- }
- else {
- memcpy(logstash_index, v, len);
- }
- es_index_custom_len = len;
- flb_sds_destroy(v);
- }
- }
-
- /* Create temporary msgpack buffer */
- msgpack_sbuffer_init(&tmp_sbuf);
- msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
-
- if (ctx->include_tag_key == FLB_TRUE) {
- map_size++;
- }
-
- /* Set the new map size */
- msgpack_pack_map(&tmp_pck, map_size + 1);
-
- /* Append the time key */
- msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->time_key));
- msgpack_pack_str_body(&tmp_pck, ctx->time_key, flb_sds_len(ctx->time_key));
-
- /* Format the time */
- gmtime_r(&tms.tm.tv_sec, &tm);
- s = strftime(time_formatted, sizeof(time_formatted) - 1,
- ctx->time_key_format, &tm);
- if (ctx->time_key_nanos) {
- len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
- ".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);
- } else {
- len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
- ".%03" PRIu64 "Z",
- (uint64_t) tms.tm.tv_nsec / 1000000);
- }
-
- s += len;
- msgpack_pack_str(&tmp_pck, s);
- msgpack_pack_str_body(&tmp_pck, time_formatted, s);
-
- es_index = ctx->index;
- if (ctx->logstash_format == FLB_TRUE) {
- ret = compose_index_header(ctx, es_index_custom_len,
- &logstash_index[0], sizeof(logstash_index),
- ctx->logstash_prefix_separator, &tm);
- if (ret < 0) {
- /* retry with default separator */
- compose_index_header(ctx, es_index_custom_len,
- &logstash_index[0], sizeof(logstash_index),
- "-", &tm);
- }
-
- es_index = logstash_index;
- if (ctx->generate_id == FLB_FALSE) {
- if (ctx->suppress_type_name) {
- index_len = flb_sds_snprintf(&j_index,
- flb_sds_alloc(j_index),
- ES_BULK_INDEX_FMT_WITHOUT_TYPE,
- ctx->es_action,
- es_index);
- }
- else {
- index_len = flb_sds_snprintf(&j_index,
- flb_sds_alloc(j_index),
- ES_BULK_INDEX_FMT,
- ctx->es_action,
- es_index, ctx->type);
- }
- }
- }
- else if (ctx->current_time_index == FLB_TRUE) {
- /* Make sure we handle index time format for index */
- strftime(index_formatted, sizeof(index_formatted) - 1,
- ctx->index, &tm);
- es_index = index_formatted;
- }
-
- /* Tag Key */
- if (ctx->include_tag_key == FLB_TRUE) {
- msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->tag_key));
- msgpack_pack_str_body(&tmp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key));
- msgpack_pack_str(&tmp_pck, tag_len);
- msgpack_pack_str_body(&tmp_pck, tag, tag_len);
- }
-
- /*
- * The map_content routine iterate over each Key/Value pair found in
- * the map and do some sanitization for the key names.
- *
- * Elasticsearch have a restriction that key names cannot contain
- * a dot; if some dot is found, it's replaced with an underscore.
- */
- ret = es_pack_map_content(&tmp_pck, map, ctx);
- if (ret == -1) {
- flb_log_event_decoder_destroy(&log_decoder);
- msgpack_sbuffer_destroy(&tmp_sbuf);
- es_bulk_destroy(bulk);
- flb_sds_destroy(j_index);
- return -1;
- }
-
- if (ctx->generate_id == FLB_TRUE) {
- MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash);
- snprintf(es_uuid, sizeof(es_uuid),
- "%04x%04x-%04x-%04x-%04x-%04x%04x%04x",
- hash[0], hash[1], hash[2], hash[3],
- hash[4], hash[5], hash[6], hash[7]);
- if (ctx->suppress_type_name) {
- index_len = flb_sds_snprintf(&j_index,
- flb_sds_alloc(j_index),
- ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
- ctx->es_action,
- es_index, es_uuid);
- }
- else {
- index_len = flb_sds_snprintf(&j_index,
- flb_sds_alloc(j_index),
- ES_BULK_INDEX_FMT_ID,
- ctx->es_action,
- es_index, ctx->type, es_uuid);
- }
- }
- if (ctx->ra_id_key) {
- id_key_str = es_get_id_value(ctx ,&map);
- if (id_key_str) {
- if (ctx->suppress_type_name) {
- index_len = flb_sds_snprintf(&j_index,
- flb_sds_alloc(j_index),
- ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
- ctx->es_action,
- es_index, id_key_str);
- }
- else {
- index_len = flb_sds_snprintf(&j_index,
- flb_sds_alloc(j_index),
- ES_BULK_INDEX_FMT_ID,
- ctx->es_action,
- es_index, ctx->type, id_key_str);
- }
- flb_sds_destroy(id_key_str);
- id_key_str = NULL;
- }
- }
-
- /* Convert msgpack to JSON */
- out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
- msgpack_sbuffer_destroy(&tmp_sbuf);
- if (!out_buf) {
- flb_log_event_decoder_destroy(&log_decoder);
- es_bulk_destroy(bulk);
- flb_sds_destroy(j_index);
- return -1;
- }
-
- out_buf_len = flb_sds_len(out_buf);
- if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) {
- tmp_buf = out_buf;
- out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPDATE_OP_BODY) - 2);
- out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPDATE_OP_BODY, tmp_buf);
- flb_sds_destroy(tmp_buf);
- }
- else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) {
- tmp_buf = out_buf;
- out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPSERT_OP_BODY) - 2);
- out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPSERT_OP_BODY, tmp_buf);
- flb_sds_destroy(tmp_buf);
- }
-
- ret = es_bulk_append(bulk, j_index, index_len,
- out_buf, out_buf_len,
- bytes, off_prev);
- flb_sds_destroy(out_buf);
-
- off_prev = off;
- if (ret == -1) {
- /* We likely ran out of memory, abort here */
- flb_log_event_decoder_destroy(&log_decoder);
- *out_size = 0;
- es_bulk_destroy(bulk);
- flb_sds_destroy(j_index);
- return -1;
- }
- }
- flb_log_event_decoder_destroy(&log_decoder);
-
- /* Set outgoing data */
- *out_data = bulk->ptr;
- *out_size = bulk->len;
-
- /*
- * Note: we don't destroy the bulk as we need to keep the allocated
- * buffer with the data. Instead we just release the bulk context and
- * return the bulk->ptr buffer
- */
- flb_free(bulk);
- if (ctx->trace_output) {
- fwrite(*out_data, 1, *out_size, stdout);
- fflush(stdout);
- }
- flb_sds_destroy(j_index);
- return 0;
-}
-
-static int cb_es_init(struct flb_output_instance *ins,
- struct flb_config *config,
- void *data)
-{
- struct flb_elasticsearch *ctx;
-
- ctx = flb_es_conf_create(ins, config);
- if (!ctx) {
- flb_plg_error(ins, "cannot initialize plugin");
- return -1;
- }
-
- flb_plg_debug(ctx->ins, "host=%s port=%i uri=%s index=%s type=%s",
- ins->host.name, ins->host.port, ctx->uri,
- ctx->index, ctx->type);
-
- flb_output_set_context(ins, ctx);
-
- /*
- * This plugin instance uses the HTTP client interface, let's register
- * it debugging callbacks.
- */
- flb_output_set_http_debug_callbacks(ins);
-
- return 0;
-}
-
-static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
- struct flb_http_client *c)
-{
- int i, j, k;
- int ret;
- int check = FLB_FALSE;
- int root_type;
- char *out_buf;
- size_t off = 0;
- size_t out_size;
- msgpack_unpacked result;
- msgpack_object root;
- msgpack_object key;
- msgpack_object val;
- msgpack_object item;
- msgpack_object item_key;
- msgpack_object item_val;
-
- /*
- * Check if our payload is complete: there is such situations where
- * the Elasticsearch HTTP response body is bigger than the HTTP client
- * buffer so payload can be incomplete.
- */
- /* Convert JSON payload to msgpack */
- ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
- &out_buf, &out_size, &root_type, NULL);
- if (ret == -1) {
- /* Is this an incomplete HTTP Request ? */
- if (c->resp.payload_size <= 0) {
- return FLB_TRUE;
- }
-
- /* Lookup error field */
- if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) {
- return FLB_FALSE;
- }
-
- flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s",
- c->resp.payload);
- return FLB_TRUE;
- }
-
- /* Lookup error field */
- msgpack_unpacked_init(&result);
- ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
- if (ret != MSGPACK_UNPACK_SUCCESS) {
- flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s",
- c->resp.payload);
- return FLB_TRUE;
- }
-
- root = result.data;
- if (root.type != MSGPACK_OBJECT_MAP) {
- flb_plg_error(ctx->ins, "unexpected payload type=%i",
- root.type);
- check = FLB_TRUE;
- goto done;
- }
-
- for (i = 0; i < root.via.map.size; i++) {
- key = root.via.map.ptr[i].key;
- if (key.type != MSGPACK_OBJECT_STR) {
- flb_plg_error(ctx->ins, "unexpected key type=%i",
- key.type);
- check = FLB_TRUE;
- goto done;
- }
-
- if (key.via.str.size == 6 && strncmp(key.via.str.ptr, "errors", 6) == 0) {
- val = root.via.map.ptr[i].val;
- if (val.type != MSGPACK_OBJECT_BOOLEAN) {
- flb_plg_error(ctx->ins, "unexpected 'error' value type=%i",
- val.type);
- check = FLB_TRUE;
- goto done;
- }
-
- /* If error == false, we are OK (no errors = FLB_FALSE) */
- if (!val.via.boolean) {
- /* no errors */
- check = FLB_FALSE;
- goto done;
- }
- }
- else if (key.via.str.size == 5 && strncmp(key.via.str.ptr, "items", 5) == 0) {
- val = root.via.map.ptr[i].val;
- if (val.type != MSGPACK_OBJECT_ARRAY) {
- flb_plg_error(ctx->ins, "unexpected 'items' value type=%i",
- val.type);
- check = FLB_TRUE;
- goto done;
- }
-
- for (j = 0; j < val.via.array.size; j++) {
- item = val.via.array.ptr[j];
- if (item.type != MSGPACK_OBJECT_MAP) {
- flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i",
- item.type);
- check = FLB_TRUE;
- goto done;
- }
-
- if (item.via.map.size != 1) {
- flb_plg_error(ctx->ins, "unexpected 'item' size=%i",
- item.via.map.size);
- check = FLB_TRUE;
- goto done;
- }
-
- item = item.via.map.ptr[0].val;
- if (item.type != MSGPACK_OBJECT_MAP) {
- flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i",
- item.type);
- check = FLB_TRUE;
- goto done;
- }
-
- for (k = 0; k < item.via.map.size; k++) {
- item_key = item.via.map.ptr[k].key;
- if (item_key.type != MSGPACK_OBJECT_STR) {
- flb_plg_error(ctx->ins, "unexpected key type=%i",
- item_key.type);
- check = FLB_TRUE;
- goto done;
- }
-
- if (item_key.via.str.size == 6 && strncmp(item_key.via.str.ptr, "status", 6) == 0) {
- item_val = item.via.map.ptr[k].val;
-
- if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
- flb_plg_error(ctx->ins, "unexpected 'status' value type=%i",
- item_val.type);
- check = FLB_TRUE;
- goto done;
- }
- /* Check for errors other than version conflict (document already exists) */
- if (item_val.via.i64 != 409) {
- check = FLB_TRUE;
- goto done;
- }
- }
- }
- }
- }
- }
-
- done:
- flb_free(out_buf);
- msgpack_unpacked_destroy(&result);
- return check;
-}
-
-static void cb_es_flush(struct flb_event_chunk *event_chunk,
- struct flb_output_flush *out_flush,
- struct flb_input_instance *ins, void *out_context,
- struct flb_config *config)
-{
- int ret;
- size_t pack_size;
- char *pack;
- void *out_buf;
- size_t out_size;
- size_t b_sent;
- struct flb_elasticsearch *ctx = out_context;
- struct flb_connection *u_conn;
- struct flb_http_client *c;
- flb_sds_t signature = NULL;
- int compressed = FLB_FALSE;
-
- /* Get upstream connection */
- u_conn = flb_upstream_conn_get(ctx->u);
- if (!u_conn) {
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
-
- /* Convert format */
- ret = elasticsearch_format(config, ins,
- ctx, NULL,
- event_chunk->type,
- event_chunk->tag, flb_sds_len(event_chunk->tag),
- event_chunk->data, event_chunk->size,
- &out_buf, &out_size);
- if (ret != 0) {
- flb_upstream_conn_release(u_conn);
- FLB_OUTPUT_RETURN(FLB_ERROR);
- }
-
- pack = (char *) out_buf;
- pack_size = out_size;
-
- /* Should we compress the payload ? */
- if (ctx->compress_gzip == FLB_TRUE) {
- ret = flb_gzip_compress((void *) pack, pack_size,
- &out_buf, &out_size);
- if (ret == -1) {
- flb_plg_error(ctx->ins,
- "cannot gzip payload, disabling compression");
- }
- else {
- compressed = FLB_TRUE;
- }
-
- /*
- * The payload buffer is different than pack, means we must be free it.
- */
- if (out_buf != pack) {
- flb_free(pack);
- }
-
- pack = (char *) out_buf;
- pack_size = out_size;
- }
-
- /* Compose HTTP Client request */
- c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri,
- pack, pack_size, NULL, 0, NULL, 0);
-
- flb_http_buffer_size(c, ctx->buffer_size);
-
-#ifndef FLB_HAVE_AWS
- flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
-#endif
-
- flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20);
-
- if (ctx->http_user && ctx->http_passwd) {
- flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd);
- }
- else if (ctx->cloud_user && ctx->cloud_passwd) {
- flb_http_basic_auth(c, ctx->cloud_user, ctx->cloud_passwd);
- }
-
-#ifdef FLB_HAVE_AWS
- if (ctx->has_aws_auth == FLB_TRUE) {
- signature = add_aws_auth(c, ctx);
- if (!signature) {
- goto retry;
- }
- }
- else {
- flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
- }
-#endif
-
- /* Content Encoding: gzip */
- if (compressed == FLB_TRUE) {
- flb_http_set_content_encoding_gzip(c);
- }
-
- /* Map debug callbacks */
- flb_http_client_debug(c, ctx->ins->callback);
-
- ret = flb_http_do(c, &b_sent);
- if (ret != 0) {
- flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri);
- goto retry;
- }
- else {
- /* The request was issued successfully, validate the 'error' field */
- flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri);
- if (c->resp.status != 200 && c->resp.status != 201) {
- if (c->resp.payload_size > 0) {
- flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n",
- c->resp.status, ctx->uri, c->resp.payload);
- }
- else {
- flb_plg_error(ctx->ins, "HTTP status=%i URI=%s",
- c->resp.status, ctx->uri);
- }
- goto retry;
- }
-
- if (c->resp.payload_size > 0) {
- /*
- * Elasticsearch payload should be JSON, we convert it to msgpack
- * and lookup the 'error' field.
- */
- ret = elasticsearch_error_check(ctx, c);
- if (ret == FLB_TRUE) {
- /* we got an error */
- if (ctx->trace_error) {
- /*
- * If trace_error is set, trace the actual
- * response from Elasticsearch explaining the problem.
- * Trace_Output can be used to see the request.
- */
- if (pack_size < 4000) {
- flb_plg_debug(ctx->ins, "error caused by: Input\n%.*s\n",
- (int) pack_size, pack);
- }
- if (c->resp.payload_size < 4000) {
- flb_plg_error(ctx->ins, "error: Output\n%s",
- c->resp.payload);
- } else {
- /*
- * We must use fwrite since the flb_log functions
- * will truncate data at 4KB
- */
- fwrite(c->resp.payload, 1, c->resp.payload_size, stderr);
- fflush(stderr);
- }
- }
- goto retry;
- }
- else {
- flb_plg_debug(ctx->ins, "Elasticsearch response\n%s",
- c->resp.payload);
- }
- }
- else {
- goto retry;
- }
- }
-
- /* Cleanup */
- flb_http_client_destroy(c);
- flb_free(pack);
- flb_upstream_conn_release(u_conn);
- if (signature) {
- flb_sds_destroy(signature);
- }
- FLB_OUTPUT_RETURN(FLB_OK);
-
- /* Issue a retry */
- retry:
- flb_http_client_destroy(c);
- flb_free(pack);
-
- if (out_buf != pack) {
- flb_free(out_buf);
- }
-
- flb_upstream_conn_release(u_conn);
- FLB_OUTPUT_RETURN(FLB_RETRY);
-}
-
-static int cb_es_exit(void *data, struct flb_config *config)
-{
- struct flb_elasticsearch *ctx = data;
-
- flb_es_conf_destroy(ctx);
- return 0;
-}
-
-/* Configuration properties map */
-static struct flb_config_map config_map[] = {
- {
- FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, index),
- "Set an index name"
- },
- {
- FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, type),
- "Set the document type property"
- },
- {
- FLB_CONFIG_MAP_BOOL, "suppress_type_name", "false",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, suppress_type_name),
- "If true, mapping types is removed. (for v7.0.0 or later)"
- },
-
- /* HTTP Authentication */
- {
- FLB_CONFIG_MAP_STR, "http_user", NULL,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_user),
- "Optional username credential for Elastic X-Pack access"
- },
- {
- FLB_CONFIG_MAP_STR, "http_passwd", "",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_passwd),
- "Password for user defined in HTTP_User"
- },
-
- /* HTTP Compression */
- {
- FLB_CONFIG_MAP_STR, "compress", NULL,
- 0, FLB_FALSE, 0,
- "Set payload compression mechanism. Option available is 'gzip'"
- },
-
- /* Cloud Authentication */
- {
- FLB_CONFIG_MAP_STR, "cloud_id", NULL,
- 0, FLB_FALSE, 0,
- "Elastic cloud ID of the cluster to connect to"
- },
- {
- FLB_CONFIG_MAP_STR, "cloud_auth", NULL,
- 0, FLB_FALSE, 0,
- "Elastic cloud authentication credentials"
- },
-
- /* AWS Authentication */
-#ifdef FLB_HAVE_AWS
- {
- FLB_CONFIG_MAP_BOOL, "aws_auth", "false",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, has_aws_auth),
- "Enable AWS Sigv4 Authentication"
- },
- {
- FLB_CONFIG_MAP_STR, "aws_region", NULL,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_region),
- "AWS Region of your Amazon OpenSearch Service cluster"
- },
- {
- FLB_CONFIG_MAP_STR, "aws_sts_endpoint", NULL,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_sts_endpoint),
- "Custom endpoint for the AWS STS API, used with the AWS_Role_ARN option"
- },
- {
- FLB_CONFIG_MAP_STR, "aws_role_arn", NULL,
- 0, FLB_FALSE, 0,
- "AWS IAM Role to assume to put records to your Amazon OpenSearch cluster"
- },
- {
- FLB_CONFIG_MAP_STR, "aws_external_id", NULL,
- 0, FLB_FALSE, 0,
- "External ID for the AWS IAM Role specified with `aws_role_arn`"
- },
- {
- FLB_CONFIG_MAP_STR, "aws_service_name", "es",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_service_name),
- "AWS Service Name"
- },
- {
- FLB_CONFIG_MAP_STR, "aws_profile", NULL,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_profile),
- "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in "
- "$HOME/.aws/ directory."
- },
-#endif
-
- /* Logstash compatibility */
- {
- FLB_CONFIG_MAP_BOOL, "logstash_format", "false",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_format),
- "Enable Logstash format compatibility"
- },
- {
- FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix),
- "When Logstash_Format is enabled, the Index name is composed using a prefix "
- "and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will "
- "become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date "
- "when the data is being generated"
- },
- {
- FLB_CONFIG_MAP_STR, "logstash_prefix_separator", "-",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_separator),
- "Set a separator between logstash_prefix and date."
- },
- {
- FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_key),
- "When included: the value in the record that belongs to the key will be looked "
- "up and over-write the Logstash_Prefix for index generation. If the key/value "
- "is not found in the record then the Logstash_Prefix option will act as a "
- "fallback. Nested keys are supported through record accessor pattern"
- },
- {
- FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_dateformat),
- "Time format (based on strftime) to generate the second part of the Index name"
- },
-
- /* Custom Time and Tag keys */
- {
- FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key),
- "When Logstash_Format is enabled, each record will get a new timestamp field. "
- "The Time_Key property defines the name of that field"
- },
- {
- FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_format),
- "When Logstash_Format is enabled, this property defines the format of the "
- "timestamp"
- },
- {
- FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_nanos),
- "When Logstash_Format is enabled, enabling this property sends nanosecond "
- "precision timestamps"
- },
- {
- FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, include_tag_key),
- "When enabled, it append the Tag name to the record"
- },
- {
- FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, tag_key),
- "When Include_Tag_Key is enabled, this property defines the key name for the tag"
- },
- {
- FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, buffer_size),
- "Specify the buffer size used to read the response from the Elasticsearch HTTP "
- "service. This option is useful for debugging purposes where is required to read "
- "full responses, note that response size grows depending of the number of records "
- "inserted. To set an unlimited amount of memory set this value to 'false', "
- "otherwise the value must be according to the Unit Size specification"
- },
-
- /* Elasticsearch specifics */
- {
- FLB_CONFIG_MAP_STR, "path", NULL,
- 0, FLB_FALSE, 0,
- "Elasticsearch accepts new data on HTTP query path '/_bulk'. But it is also "
- "possible to serve Elasticsearch behind a reverse proxy on a subpath. This "
- "option defines such path on the fluent-bit side. It simply adds a path "
- "prefix in the indexing HTTP POST URI"
- },
- {
- FLB_CONFIG_MAP_STR, "pipeline", NULL,
- 0, FLB_FALSE, 0,
- "Newer versions of Elasticsearch allows to setup filters called pipelines. "
- "This option allows to define which pipeline the database should use. For "
- "performance reasons is strongly suggested to do parsing and filtering on "
- "Fluent Bit side, avoid pipelines"
- },
- {
- FLB_CONFIG_MAP_BOOL, "generate_id", "false",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, generate_id),
- "When enabled, generate _id for outgoing records. This prevents duplicate "
- "records when retrying ES"
- },
- {
- FLB_CONFIG_MAP_STR, "write_operation", "create",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, write_operation),
- "Operation to use to write in bulk requests"
- },
- {
- FLB_CONFIG_MAP_STR, "id_key", NULL,
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key),
- "If set, _id will be the value of the key from incoming record."
- },
- {
- FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots),
- "When enabled, replace field name dots with underscore, required by Elasticsearch "
- "2.0-2.3."
- },
-
- {
- FLB_CONFIG_MAP_BOOL, "current_time_index", "false",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, current_time_index),
- "Use current time for index generation instead of message record"
- },
-
- /* Trace */
- {
- FLB_CONFIG_MAP_BOOL, "trace_output", "false",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_output),
- "When enabled print the Elasticsearch API calls to stdout (for diag only)"
- },
- {
- FLB_CONFIG_MAP_BOOL, "trace_error", "false",
- 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_error),
- "When enabled print the Elasticsearch exception to stderr (for diag only)"
- },
-
- /* EOF */
- {0}
-};
-
-/* Plugin reference */
-struct flb_output_plugin out_es_plugin = {
- .name = "es",
- .description = "Elasticsearch",
- .cb_init = cb_es_init,
- .cb_pre_run = NULL,
- .cb_flush = cb_es_flush,
- .cb_exit = cb_es_exit,
- .workers = 2,
-
- /* Configuration */
- .config_map = config_map,
-
- /* Test */
- .test_formatter.callback = elasticsearch_format,
-
- /* Plugin flags */
- .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
-};