summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/out_es
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/plugins/out_es')
-rw-r--r--fluent-bit/plugins/out_es/CMakeLists.txt8
-rw-r--r--fluent-bit/plugins/out_es/es.c1230
-rw-r--r--fluent-bit/plugins/out_es/es.h140
-rw-r--r--fluent-bit/plugins/out_es/es_bulk.c113
-rw-r--r--fluent-bit/plugins/out_es/es_bulk.h46
-rw-r--r--fluent-bit/plugins/out_es/es_conf.c537
-rw-r--r--fluent-bit/plugins/out_es/es_conf.h33
-rw-r--r--fluent-bit/plugins/out_es/murmur3.c314
-rw-r--r--fluent-bit/plugins/out_es/murmur3.h29
9 files changed, 2450 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_es/CMakeLists.txt b/fluent-bit/plugins/out_es/CMakeLists.txt
new file mode 100644
index 00000000..4fad4f27
--- /dev/null
+++ b/fluent-bit/plugins/out_es/CMakeLists.txt
@@ -0,0 +1,8 @@
+set(src
+ es_bulk.c
+ es_conf.c
+ es.c
+ murmur3.c)
+
+FLB_PLUGIN(out_es "${src}" "mk_core")
+target_link_libraries(flb-plugin-out_es)
diff --git a/fluent-bit/plugins/out_es/es.c b/fluent-bit/plugins/out_es/es.c
new file mode 100644
index 00000000..db2bcee5
--- /dev/null
+++ b/fluent-bit/plugins/out_es/es.c
@@ -0,0 +1,1230 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_signv4.h>
+#include <fluent-bit/flb_aws_credentials.h>
+#include <fluent-bit/flb_gzip.h>
+#include <fluent-bit/flb_record_accessor.h>
+#include <fluent-bit/flb_ra_key.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <msgpack.h>
+
+#include <time.h>
+
+#include "es.h"
+#include "es_conf.h"
+#include "es_bulk.h"
+#include "murmur3.h"
+
+struct flb_output_plugin out_es_plugin;
+
+static int es_pack_array_content(msgpack_packer *tmp_pck,
+ msgpack_object array,
+ struct flb_elasticsearch *ctx);
+
+#ifdef FLB_HAVE_AWS
+static flb_sds_t add_aws_auth(struct flb_http_client *c,
+ struct flb_elasticsearch *ctx)
+{
+ flb_sds_t signature = NULL;
+ int ret;
+
+ flb_plg_debug(ctx->ins, "Signing request with AWS Sigv4");
+
+ /* Amazon OpenSearch Sigv4 does not allow the host header to include the port */
+ ret = flb_http_strip_port_from_host(c);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "could not strip port from host for sigv4");
+ return NULL;
+ }
+
+ /* AWS Fluent Bit user agent */
+ flb_http_add_header(c, "User-Agent", 10, "aws-fluent-bit-plugin", 21);
+
+ signature = flb_signv4_do(c, FLB_TRUE, FLB_TRUE, time(NULL),
+ ctx->aws_region, ctx->aws_service_name,
+ S3_MODE_SIGNED_PAYLOAD, ctx->aws_unsigned_headers,
+ ctx->aws_provider);
+ if (!signature) {
+ flb_plg_error(ctx->ins, "could not sign request with sigv4");
+ return NULL;
+ }
+ return signature;
+}
+#endif /* FLB_HAVE_AWS */
+
+static int es_pack_map_content(msgpack_packer *tmp_pck,
+ msgpack_object map,
+ struct flb_elasticsearch *ctx)
+{
+ int i;
+ char *ptr_key = NULL;
+ char buf_key[256];
+ msgpack_object *k;
+ msgpack_object *v;
+
+ for (i = 0; i < map.via.map.size; i++) {
+ k = &map.via.map.ptr[i].key;
+ v = &map.via.map.ptr[i].val;
+ ptr_key = NULL;
+
+ /* Store key */
+ const char *key_ptr = NULL;
+ size_t key_size = 0;
+
+ if (k->type == MSGPACK_OBJECT_BIN) {
+ key_ptr = k->via.bin.ptr;
+ key_size = k->via.bin.size;
+ }
+ else if (k->type == MSGPACK_OBJECT_STR) {
+ key_ptr = k->via.str.ptr;
+ key_size = k->via.str.size;
+ }
+
+ if (key_size < (sizeof(buf_key) - 1)) {
+ memcpy(buf_key, key_ptr, key_size);
+ buf_key[key_size] = '\0';
+ ptr_key = buf_key;
+ }
+ else {
+ /* Long map keys have a performance penalty */
+ ptr_key = flb_malloc(key_size + 1);
+ if (!ptr_key) {
+ flb_errno();
+ return -1;
+ }
+
+ memcpy(ptr_key, key_ptr, key_size);
+ ptr_key[key_size] = '\0';
+ }
+
+ /*
+ * Sanitize key name, Elastic Search 2.x don't allow dots
+ * in field names:
+ *
+ * https://goo.gl/R5NMTr
+ */
+ if (ctx->replace_dots == FLB_TRUE) {
+ char *p = ptr_key;
+ char *end = ptr_key + key_size;
+ while (p != end) {
+ if (*p == '.') *p = '_';
+ p++;
+ }
+ }
+
+ /* Append the key */
+ msgpack_pack_str(tmp_pck, key_size);
+ msgpack_pack_str_body(tmp_pck, ptr_key, key_size);
+
+ /* Release temporary key if was allocated */
+ if (ptr_key && ptr_key != buf_key) {
+ flb_free(ptr_key);
+ }
+ ptr_key = NULL;
+
+ /*
+ * The value can be any data type, if it's a map we need to
+ * sanitize to avoid dots.
+ */
+ if (v->type == MSGPACK_OBJECT_MAP) {
+ msgpack_pack_map(tmp_pck, v->via.map.size);
+ es_pack_map_content(tmp_pck, *v, ctx);
+ }
+ /*
+ * The value can be any data type, if it's an array we need to
+ * pass it to es_pack_array_content.
+ */
+ else if (v->type == MSGPACK_OBJECT_ARRAY) {
+ msgpack_pack_array(tmp_pck, v->via.array.size);
+ es_pack_array_content(tmp_pck, *v, ctx);
+ }
+ else {
+ msgpack_pack_object(tmp_pck, *v);
+ }
+ }
+ return 0;
+}
+
+/*
+ * Iterate through the array and sanitize elements.
+ * Mutual recursion with es_pack_map_content.
+ */
+static int es_pack_array_content(msgpack_packer *tmp_pck,
+ msgpack_object array,
+ struct flb_elasticsearch *ctx)
+{
+ int i;
+ msgpack_object *e;
+
+ for (i = 0; i < array.via.array.size; i++) {
+ e = &array.via.array.ptr[i];
+ if (e->type == MSGPACK_OBJECT_MAP)
+ {
+ msgpack_pack_map(tmp_pck, e->via.map.size);
+ es_pack_map_content(tmp_pck, *e, ctx);
+ }
+ else if (e->type == MSGPACK_OBJECT_ARRAY)
+ {
+ msgpack_pack_array(tmp_pck, e->via.array.size);
+ es_pack_array_content(tmp_pck, *e, ctx);
+ }
+ else
+ {
+ msgpack_pack_object(tmp_pck, *e);
+ }
+ }
+ return 0;
+}
+
+/*
+ * Get _id value from incoming record.
+ * If it successed, return the value as flb_sds_t.
+ * If it failed, return NULL.
+*/
+static flb_sds_t es_get_id_value(struct flb_elasticsearch *ctx,
+ msgpack_object *map)
+{
+ struct flb_ra_value *rval = NULL;
+ flb_sds_t tmp_str;
+ rval = flb_ra_get_value_object(ctx->ra_id_key, *map);
+ if (rval == NULL) {
+ flb_plg_warn(ctx->ins, "the value of %s is missing",
+ ctx->id_key);
+ return NULL;
+ }
+ else if(rval->o.type != MSGPACK_OBJECT_STR) {
+ flb_plg_warn(ctx->ins, "the value of %s is not string",
+ ctx->id_key);
+ flb_ra_key_value_destroy(rval);
+ return NULL;
+ }
+
+ tmp_str = flb_sds_create_len(rval->o.via.str.ptr,
+ rval->o.via.str.size);
+ if (tmp_str == NULL) {
+ flb_plg_warn(ctx->ins, "cannot create ID string from record");
+ flb_ra_key_value_destroy(rval);
+ return NULL;
+ }
+ flb_ra_key_value_destroy(rval);
+ return tmp_str;
+}
+
+static int compose_index_header(struct flb_elasticsearch *ctx,
+ int es_index_custom_len,
+ char *logstash_index, size_t logstash_index_size,
+ char *separator_str,
+ struct tm *tm)
+{
+ int ret;
+ int len;
+ char *p;
+ size_t s;
+
+ /* Compose Index header */
+ if (es_index_custom_len > 0) {
+ p = logstash_index + es_index_custom_len;
+ } else {
+ p = logstash_index + flb_sds_len(ctx->logstash_prefix);
+ }
+ len = p - logstash_index;
+ ret = snprintf(p, logstash_index_size - len, "%s",
+ separator_str);
+ if (ret > logstash_index_size - len) {
+ /* exceed limit */
+ return -1;
+ }
+ p += strlen(separator_str);
+ len += strlen(separator_str);
+
+ s = strftime(p, logstash_index_size - len,
+ ctx->logstash_dateformat, tm);
+ if (s==0) {
+ /* exceed limit */
+ return -1;
+ }
+ p += s;
+ *p++ = '\0';
+
+ return 0;
+}
+
+/*
+ * Convert the internal Fluent Bit data representation to the required
+ * one by Elasticsearch.
+ *
+ * 'Sadly' this process involves to convert from Msgpack to JSON.
+ */
+static int elasticsearch_format(struct flb_config *config,
+ struct flb_input_instance *ins,
+ void *plugin_context,
+ void *flush_ctx,
+ int event_type,
+ const char *tag, int tag_len,
+ const void *data, size_t bytes,
+ void **out_data, size_t *out_size)
+{
+ int ret;
+ int len;
+ int map_size;
+ int index_len = 0;
+ size_t s = 0;
+ size_t off = 0;
+ size_t off_prev = 0;
+ char *es_index;
+ char logstash_index[256];
+ char time_formatted[256];
+ char index_formatted[256];
+ char es_uuid[37];
+ flb_sds_t out_buf;
+ size_t out_buf_len = 0;
+ flb_sds_t tmp_buf;
+ flb_sds_t id_key_str = NULL;
+ // msgpack_unpacked result;
+ // msgpack_object root;
+ msgpack_object map;
+ // msgpack_object *obj;
+ flb_sds_t j_index;
+ struct es_bulk *bulk;
+ struct tm tm;
+ struct flb_time tms;
+ msgpack_sbuffer tmp_sbuf;
+ msgpack_packer tmp_pck;
+ uint16_t hash[8];
+ int es_index_custom_len;
+ struct flb_elasticsearch *ctx = plugin_context;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ j_index = flb_sds_create_size(ES_BULK_HEADER);
+ if (j_index == NULL) {
+ flb_errno();
+ return -1;
+ }
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+ flb_sds_destroy(j_index);
+
+ return -1;
+ }
+
+ /* Create the bulk composer */
+ bulk = es_bulk_create(bytes);
+ if (!bulk) {
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_sds_destroy(j_index);
+ return -1;
+ }
+
+ /* Copy logstash prefix if logstash format is enabled */
+ if (ctx->logstash_format == FLB_TRUE) {
+ strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index));
+ logstash_index[sizeof(logstash_index) - 1] = '\0';
+ }
+
+ /*
+ * If logstash format and id generation are disabled, pre-generate
+ * the index line for all records.
+ *
+ * The header stored in 'j_index' will be used for the all records on
+ * this payload.
+ */
+ if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
+ flb_time_get(&tms);
+ gmtime_r(&tms.tm.tv_sec, &tm);
+ strftime(index_formatted, sizeof(index_formatted) - 1,
+ ctx->index, &tm);
+ es_index = index_formatted;
+ if (ctx->suppress_type_name) {
+ index_len = flb_sds_snprintf(&j_index,
+ flb_sds_alloc(j_index),
+ ES_BULK_INDEX_FMT_WITHOUT_TYPE,
+ ctx->es_action,
+ es_index);
+ }
+ else {
+ index_len = flb_sds_snprintf(&j_index,
+ flb_sds_alloc(j_index),
+ ES_BULK_INDEX_FMT,
+ ctx->es_action,
+ es_index, ctx->type);
+ }
+ }
+
+ /*
+ * Some broken clients may have time drift up to year 1970
+ * this will generate corresponding index in Elasticsearch
+ * in order to prevent generating millions of indexes
+ * we can set to always use current time for index generation
+ */
+ if (ctx->current_time_index == FLB_TRUE) {
+ flb_time_get(&tms);
+ }
+
+ /* Iterate each record and do further formatting */
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+
+ /* Only pop time from record if current_time_index is disabled */
+ if (ctx->current_time_index == FLB_FALSE) {
+ flb_time_copy(&tms, &log_event.timestamp);
+ }
+
+ map = *log_event.body;
+ map_size = map.via.map.size;
+
+ es_index_custom_len = 0;
+ if (ctx->logstash_prefix_key) {
+ flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key,
+ (char *) tag, tag_len,
+ map, NULL);
+ if (v) {
+ len = flb_sds_len(v);
+ if (len > 128) {
+ len = 128;
+ memcpy(logstash_index, v, 128);
+ }
+ else {
+ memcpy(logstash_index, v, len);
+ }
+ es_index_custom_len = len;
+ flb_sds_destroy(v);
+ }
+ }
+
+ /* Create temporary msgpack buffer */
+ msgpack_sbuffer_init(&tmp_sbuf);
+ msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
+
+ if (ctx->include_tag_key == FLB_TRUE) {
+ map_size++;
+ }
+
+ /* Set the new map size */
+ msgpack_pack_map(&tmp_pck, map_size + 1);
+
+ /* Append the time key */
+ msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->time_key));
+ msgpack_pack_str_body(&tmp_pck, ctx->time_key, flb_sds_len(ctx->time_key));
+
+ /* Format the time */
+ gmtime_r(&tms.tm.tv_sec, &tm);
+ s = strftime(time_formatted, sizeof(time_formatted) - 1,
+ ctx->time_key_format, &tm);
+ if (ctx->time_key_nanos) {
+ len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
+ ".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);
+ } else {
+ len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
+ ".%03" PRIu64 "Z",
+ (uint64_t) tms.tm.tv_nsec / 1000000);
+ }
+
+ s += len;
+ msgpack_pack_str(&tmp_pck, s);
+ msgpack_pack_str_body(&tmp_pck, time_formatted, s);
+
+ es_index = ctx->index;
+ if (ctx->logstash_format == FLB_TRUE) {
+ ret = compose_index_header(ctx, es_index_custom_len,
+ &logstash_index[0], sizeof(logstash_index),
+ ctx->logstash_prefix_separator, &tm);
+ if (ret < 0) {
+ /* retry with default separator */
+ compose_index_header(ctx, es_index_custom_len,
+ &logstash_index[0], sizeof(logstash_index),
+ "-", &tm);
+ }
+
+ es_index = logstash_index;
+ if (ctx->generate_id == FLB_FALSE) {
+ if (ctx->suppress_type_name) {
+ index_len = flb_sds_snprintf(&j_index,
+ flb_sds_alloc(j_index),
+ ES_BULK_INDEX_FMT_WITHOUT_TYPE,
+ ctx->es_action,
+ es_index);
+ }
+ else {
+ index_len = flb_sds_snprintf(&j_index,
+ flb_sds_alloc(j_index),
+ ES_BULK_INDEX_FMT,
+ ctx->es_action,
+ es_index, ctx->type);
+ }
+ }
+ }
+ else if (ctx->current_time_index == FLB_TRUE) {
+ /* Make sure we handle index time format for index */
+ strftime(index_formatted, sizeof(index_formatted) - 1,
+ ctx->index, &tm);
+ es_index = index_formatted;
+ }
+
+ /* Tag Key */
+ if (ctx->include_tag_key == FLB_TRUE) {
+ msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->tag_key));
+ msgpack_pack_str_body(&tmp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key));
+ msgpack_pack_str(&tmp_pck, tag_len);
+ msgpack_pack_str_body(&tmp_pck, tag, tag_len);
+ }
+
+ /*
+ * The map_content routine iterate over each Key/Value pair found in
+ * the map and do some sanitization for the key names.
+ *
+ * Elasticsearch have a restriction that key names cannot contain
+ * a dot; if some dot is found, it's replaced with an underscore.
+ */
+ ret = es_pack_map_content(&tmp_pck, map, ctx);
+ if (ret == -1) {
+ flb_log_event_decoder_destroy(&log_decoder);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ es_bulk_destroy(bulk);
+ flb_sds_destroy(j_index);
+ return -1;
+ }
+
+ if (ctx->generate_id == FLB_TRUE) {
+ MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash);
+ snprintf(es_uuid, sizeof(es_uuid),
+ "%04x%04x-%04x-%04x-%04x-%04x%04x%04x",
+ hash[0], hash[1], hash[2], hash[3],
+ hash[4], hash[5], hash[6], hash[7]);
+ if (ctx->suppress_type_name) {
+ index_len = flb_sds_snprintf(&j_index,
+ flb_sds_alloc(j_index),
+ ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
+ ctx->es_action,
+ es_index, es_uuid);
+ }
+ else {
+ index_len = flb_sds_snprintf(&j_index,
+ flb_sds_alloc(j_index),
+ ES_BULK_INDEX_FMT_ID,
+ ctx->es_action,
+ es_index, ctx->type, es_uuid);
+ }
+ }
+ if (ctx->ra_id_key) {
+ id_key_str = es_get_id_value(ctx ,&map);
+ if (id_key_str) {
+ if (ctx->suppress_type_name) {
+ index_len = flb_sds_snprintf(&j_index,
+ flb_sds_alloc(j_index),
+ ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
+ ctx->es_action,
+ es_index, id_key_str);
+ }
+ else {
+ index_len = flb_sds_snprintf(&j_index,
+ flb_sds_alloc(j_index),
+ ES_BULK_INDEX_FMT_ID,
+ ctx->es_action,
+ es_index, ctx->type, id_key_str);
+ }
+ flb_sds_destroy(id_key_str);
+ id_key_str = NULL;
+ }
+ }
+
+ /* Convert msgpack to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ if (!out_buf) {
+ flb_log_event_decoder_destroy(&log_decoder);
+ es_bulk_destroy(bulk);
+ flb_sds_destroy(j_index);
+ return -1;
+ }
+
+ out_buf_len = flb_sds_len(out_buf);
+ if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) {
+ tmp_buf = out_buf;
+ out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPDATE_OP_BODY) - 2);
+ out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPDATE_OP_BODY, tmp_buf);
+ flb_sds_destroy(tmp_buf);
+ }
+ else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) {
+ tmp_buf = out_buf;
+ out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPSERT_OP_BODY) - 2);
+ out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPSERT_OP_BODY, tmp_buf);
+ flb_sds_destroy(tmp_buf);
+ }
+
+ ret = es_bulk_append(bulk, j_index, index_len,
+ out_buf, out_buf_len,
+ bytes, off_prev);
+ flb_sds_destroy(out_buf);
+
+ off_prev = off;
+ if (ret == -1) {
+ /* We likely ran out of memory, abort here */
+ flb_log_event_decoder_destroy(&log_decoder);
+ *out_size = 0;
+ es_bulk_destroy(bulk);
+ flb_sds_destroy(j_index);
+ return -1;
+ }
+ }
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ /* Set outgoing data */
+ *out_data = bulk->ptr;
+ *out_size = bulk->len;
+
+ /*
+ * Note: we don't destroy the bulk as we need to keep the allocated
+ * buffer with the data. Instead we just release the bulk context and
+ * return the bulk->ptr buffer
+ */
+ flb_free(bulk);
+ if (ctx->trace_output) {
+ fwrite(*out_data, 1, *out_size, stdout);
+ fflush(stdout);
+ }
+ flb_sds_destroy(j_index);
+ return 0;
+}
+
+static int cb_es_init(struct flb_output_instance *ins,
+ struct flb_config *config,
+ void *data)
+{
+ struct flb_elasticsearch *ctx;
+
+ ctx = flb_es_conf_create(ins, config);
+ if (!ctx) {
+ flb_plg_error(ins, "cannot initialize plugin");
+ return -1;
+ }
+
+ flb_plg_debug(ctx->ins, "host=%s port=%i uri=%s index=%s type=%s",
+ ins->host.name, ins->host.port, ctx->uri,
+ ctx->index, ctx->type);
+
+ flb_output_set_context(ins, ctx);
+
+ /*
+ * This plugin instance uses the HTTP client interface, let's register
+ * it debugging callbacks.
+ */
+ flb_output_set_http_debug_callbacks(ins);
+
+ return 0;
+}
+
+static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
+ struct flb_http_client *c)
+{
+ int i, j, k;
+ int ret;
+ int check = FLB_FALSE;
+ int root_type;
+ char *out_buf;
+ size_t off = 0;
+ size_t out_size;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object key;
+ msgpack_object val;
+ msgpack_object item;
+ msgpack_object item_key;
+ msgpack_object item_val;
+
+ /*
+ * Check if our payload is complete: there is such situations where
+ * the Elasticsearch HTTP response body is bigger than the HTTP client
+ * buffer so payload can be incomplete.
+ */
+ /* Convert JSON payload to msgpack */
+ ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
+ &out_buf, &out_size, &root_type, NULL);
+ if (ret == -1) {
+ /* Is this an incomplete HTTP Request ? */
+ if (c->resp.payload_size <= 0) {
+ return FLB_TRUE;
+ }
+
+ /* Lookup error field */
+ if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) {
+ return FLB_FALSE;
+ }
+
+ flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s",
+ c->resp.payload);
+ return FLB_TRUE;
+ }
+
+ /* Lookup error field */
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s",
+ c->resp.payload);
+ return FLB_TRUE;
+ }
+
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "unexpected payload type=%i",
+ root.type);
+ check = FLB_TRUE;
+ goto done;
+ }
+
+ for (i = 0; i < root.via.map.size; i++) {
+ key = root.via.map.ptr[i].key;
+ if (key.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "unexpected key type=%i",
+ key.type);
+ check = FLB_TRUE;
+ goto done;
+ }
+
+ if (key.via.str.size == 6 && strncmp(key.via.str.ptr, "errors", 6) == 0) {
+ val = root.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_BOOLEAN) {
+ flb_plg_error(ctx->ins, "unexpected 'error' value type=%i",
+ val.type);
+ check = FLB_TRUE;
+ goto done;
+ }
+
+ /* If error == false, we are OK (no errors = FLB_FALSE) */
+ if (!val.via.boolean) {
+ /* no errors */
+ check = FLB_FALSE;
+ goto done;
+ }
+ }
+ else if (key.via.str.size == 5 && strncmp(key.via.str.ptr, "items", 5) == 0) {
+ val = root.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_ARRAY) {
+ flb_plg_error(ctx->ins, "unexpected 'items' value type=%i",
+ val.type);
+ check = FLB_TRUE;
+ goto done;
+ }
+
+ for (j = 0; j < val.via.array.size; j++) {
+ item = val.via.array.ptr[j];
+ if (item.type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i",
+ item.type);
+ check = FLB_TRUE;
+ goto done;
+ }
+
+ if (item.via.map.size != 1) {
+ flb_plg_error(ctx->ins, "unexpected 'item' size=%i",
+ item.via.map.size);
+ check = FLB_TRUE;
+ goto done;
+ }
+
+ item = item.via.map.ptr[0].val;
+ if (item.type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i",
+ item.type);
+ check = FLB_TRUE;
+ goto done;
+ }
+
+ for (k = 0; k < item.via.map.size; k++) {
+ item_key = item.via.map.ptr[k].key;
+ if (item_key.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "unexpected key type=%i",
+ item_key.type);
+ check = FLB_TRUE;
+ goto done;
+ }
+
+ if (item_key.via.str.size == 6 && strncmp(item_key.via.str.ptr, "status", 6) == 0) {
+ item_val = item.via.map.ptr[k].val;
+
+ if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ flb_plg_error(ctx->ins, "unexpected 'status' value type=%i",
+ item_val.type);
+ check = FLB_TRUE;
+ goto done;
+ }
+ /* Check for errors other than version conflict (document already exists) */
+ if (item_val.via.i64 != 409) {
+ check = FLB_TRUE;
+ goto done;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ done:
+ flb_free(out_buf);
+ msgpack_unpacked_destroy(&result);
+ return check;
+}
+
+static void cb_es_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *ins, void *out_context,
+ struct flb_config *config)
+{
+ int ret;
+ size_t pack_size;
+ char *pack;
+ void *out_buf;
+ size_t out_size;
+ size_t b_sent;
+ struct flb_elasticsearch *ctx = out_context;
+ struct flb_connection *u_conn;
+ struct flb_http_client *c;
+ flb_sds_t signature = NULL;
+ int compressed = FLB_FALSE;
+
+ /* Get upstream connection */
+ u_conn = flb_upstream_conn_get(ctx->u);
+ if (!u_conn) {
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* Convert format */
+ ret = elasticsearch_format(config, ins,
+ ctx, NULL,
+ event_chunk->type,
+ event_chunk->tag, flb_sds_len(event_chunk->tag),
+ event_chunk->data, event_chunk->size,
+ &out_buf, &out_size);
+ if (ret != 0) {
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+
+ pack = (char *) out_buf;
+ pack_size = out_size;
+
+ /* Should we compress the payload ? */
+ if (ctx->compress_gzip == FLB_TRUE) {
+ ret = flb_gzip_compress((void *) pack, pack_size,
+ &out_buf, &out_size);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins,
+ "cannot gzip payload, disabling compression");
+ }
+ else {
+ compressed = FLB_TRUE;
+ }
+
+ /*
+ * The payload buffer is different than pack, means we must be free it.
+ */
+ if (out_buf != pack) {
+ flb_free(pack);
+ }
+
+ pack = (char *) out_buf;
+ pack_size = out_size;
+ }
+
+ /* Compose HTTP Client request */
+ c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri,
+ pack, pack_size, NULL, 0, NULL, 0);
+
+ flb_http_buffer_size(c, ctx->buffer_size);
+
+#ifndef FLB_HAVE_AWS
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+#endif
+
+ flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20);
+
+ if (ctx->http_user && ctx->http_passwd) {
+ flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd);
+ }
+ else if (ctx->cloud_user && ctx->cloud_passwd) {
+ flb_http_basic_auth(c, ctx->cloud_user, ctx->cloud_passwd);
+ }
+
+#ifdef FLB_HAVE_AWS
+ if (ctx->has_aws_auth == FLB_TRUE) {
+ signature = add_aws_auth(c, ctx);
+ if (!signature) {
+ goto retry;
+ }
+ }
+ else {
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+ }
+#endif
+
+ /* Content Encoding: gzip */
+ if (compressed == FLB_TRUE) {
+ flb_http_set_content_encoding_gzip(c);
+ }
+
+ /* Map debug callbacks */
+ flb_http_client_debug(c, ctx->ins->callback);
+
+ ret = flb_http_do(c, &b_sent);
+ if (ret != 0) {
+ flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri);
+ goto retry;
+ }
+ else {
+ /* The request was issued successfully, validate the 'error' field */
+ flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri);
+ if (c->resp.status != 200 && c->resp.status != 201) {
+ if (c->resp.payload_size > 0) {
+ flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n",
+ c->resp.status, ctx->uri, c->resp.payload);
+ }
+ else {
+ flb_plg_error(ctx->ins, "HTTP status=%i URI=%s",
+ c->resp.status, ctx->uri);
+ }
+ goto retry;
+ }
+
+ if (c->resp.payload_size > 0) {
+ /*
+ * Elasticsearch payload should be JSON, we convert it to msgpack
+ * and lookup the 'error' field.
+ */
+ ret = elasticsearch_error_check(ctx, c);
+ if (ret == FLB_TRUE) {
+ /* we got an error */
+ if (ctx->trace_error) {
+ /*
+ * If trace_error is set, trace the actual
+ * response from Elasticsearch explaining the problem.
+ * Trace_Output can be used to see the request.
+ */
+ if (pack_size < 4000) {
+ flb_plg_debug(ctx->ins, "error caused by: Input\n%.*s\n",
+ (int) pack_size, pack);
+ }
+ if (c->resp.payload_size < 4000) {
+ flb_plg_error(ctx->ins, "error: Output\n%s",
+ c->resp.payload);
+ } else {
+ /*
+ * We must use fwrite since the flb_log functions
+ * will truncate data at 4KB
+ */
+ fwrite(c->resp.payload, 1, c->resp.payload_size, stderr);
+ fflush(stderr);
+ }
+ }
+ goto retry;
+ }
+ else {
+ flb_plg_debug(ctx->ins, "Elasticsearch response\n%s",
+ c->resp.payload);
+ }
+ }
+ else {
+ goto retry;
+ }
+ }
+
+ /* Cleanup */
+ flb_http_client_destroy(c);
+ flb_free(pack);
+ flb_upstream_conn_release(u_conn);
+ if (signature) {
+ flb_sds_destroy(signature);
+ }
+ FLB_OUTPUT_RETURN(FLB_OK);
+
+ /* Issue a retry */
+ retry:
+ flb_http_client_destroy(c);
+ flb_free(pack);
+
+ if (out_buf != pack) {
+ flb_free(out_buf);
+ }
+
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+}
+
+static int cb_es_exit(void *data, struct flb_config *config)
+{
+ struct flb_elasticsearch *ctx = data;
+
+ flb_es_conf_destroy(ctx);
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, index),
+ "Set an index name"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, type),
+ "Set the document type property"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "suppress_type_name", "false",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, suppress_type_name),
+ "If true, mapping types is removed. (for v7.0.0 or later)"
+ },
+
+ /* HTTP Authentication */
+ {
+ FLB_CONFIG_MAP_STR, "http_user", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_user),
+ "Optional username credential for Elastic X-Pack access"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "http_passwd", "",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_passwd),
+ "Password for user defined in HTTP_User"
+ },
+
+ /* HTTP Compression */
+ {
+ FLB_CONFIG_MAP_STR, "compress", NULL,
+ 0, FLB_FALSE, 0,
+ "Set payload compression mechanism. Option available is 'gzip'"
+ },
+
+ /* Cloud Authentication */
+ {
+ FLB_CONFIG_MAP_STR, "cloud_id", NULL,
+ 0, FLB_FALSE, 0,
+ "Elastic cloud ID of the cluster to connect to"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "cloud_auth", NULL,
+ 0, FLB_FALSE, 0,
+ "Elastic cloud authentication credentials"
+ },
+
+ /* AWS Authentication */
+#ifdef FLB_HAVE_AWS
+ {
+ FLB_CONFIG_MAP_BOOL, "aws_auth", "false",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, has_aws_auth),
+ "Enable AWS Sigv4 Authentication"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "aws_region", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_region),
+ "AWS Region of your Amazon OpenSearch Service cluster"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "aws_sts_endpoint", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_sts_endpoint),
+ "Custom endpoint for the AWS STS API, used with the AWS_Role_ARN option"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "aws_role_arn", NULL,
+ 0, FLB_FALSE, 0,
+ "AWS IAM Role to assume to put records to your Amazon OpenSearch cluster"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "aws_external_id", NULL,
+ 0, FLB_FALSE, 0,
+ "External ID for the AWS IAM Role specified with `aws_role_arn`"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "aws_service_name", "es",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_service_name),
+ "AWS Service Name"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "aws_profile", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_profile),
+ "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in "
+ "$HOME/.aws/ directory."
+ },
+#endif
+
+ /* Logstash compatibility */
+ {
+ FLB_CONFIG_MAP_BOOL, "logstash_format", "false",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_format),
+ "Enable Logstash format compatibility"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix),
+ "When Logstash_Format is enabled, the Index name is composed using a prefix "
+ "and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will "
+ "become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date "
+ "when the data is being generated"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "logstash_prefix_separator", "-",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_separator),
+ "Set a separator between logstash_prefix and date."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_key),
+ "When included: the value in the record that belongs to the key will be looked "
+ "up and over-write the Logstash_Prefix for index generation. If the key/value "
+ "is not found in the record then the Logstash_Prefix option will act as a "
+ "fallback. Nested keys are supported through record accessor pattern"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_dateformat),
+ "Time format (based on strftime) to generate the second part of the Index name"
+ },
+
+ /* Custom Time and Tag keys */
+ {
+ FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key),
+ "When Logstash_Format is enabled, each record will get a new timestamp field. "
+ "The Time_Key property defines the name of that field"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_format),
+ "When Logstash_Format is enabled, this property defines the format of the "
+ "timestamp"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_nanos),
+ "When Logstash_Format is enabled, enabling this property sends nanosecond "
+ "precision timestamps"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, include_tag_key),
+ "When enabled, it append the Tag name to the record"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, tag_key),
+ "When Include_Tag_Key is enabled, this property defines the key name for the tag"
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, buffer_size),
+ "Specify the buffer size used to read the response from the Elasticsearch HTTP "
+ "service. This option is useful for debugging purposes where is required to read "
+ "full responses, note that response size grows depending of the number of records "
+ "inserted. To set an unlimited amount of memory set this value to 'false', "
+ "otherwise the value must be according to the Unit Size specification"
+ },
+
+ /* Elasticsearch specifics */
+ {
+ FLB_CONFIG_MAP_STR, "path", NULL,
+ 0, FLB_FALSE, 0,
+ "Elasticsearch accepts new data on HTTP query path '/_bulk'. But it is also "
+ "possible to serve Elasticsearch behind a reverse proxy on a subpath. This "
+ "option defines such path on the fluent-bit side. It simply adds a path "
+ "prefix in the indexing HTTP POST URI"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "pipeline", NULL,
+ 0, FLB_FALSE, 0,
+ "Newer versions of Elasticsearch allows to setup filters called pipelines. "
+ "This option allows to define which pipeline the database should use. For "
+ "performance reasons is strongly suggested to do parsing and filtering on "
+ "Fluent Bit side, avoid pipelines"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "generate_id", "false",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, generate_id),
+ "When enabled, generate _id for outgoing records. This prevents duplicate "
+ "records when retrying ES"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "write_operation", "create",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, write_operation),
+ "Operation to use to write in bulk requests"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "id_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key),
+ "If set, _id will be the value of the key from incoming record."
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots),
+ "When enabled, replace field name dots with underscore, required by Elasticsearch "
+ "2.0-2.3."
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "current_time_index", "false",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, current_time_index),
+ "Use current time for index generation instead of message record"
+ },
+
+ /* Trace */
+ {
+ FLB_CONFIG_MAP_BOOL, "trace_output", "false",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_output),
+ "When enabled print the Elasticsearch API calls to stdout (for diag only)"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "trace_error", "false",
+ 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_error),
+ "When enabled print the Elasticsearch exception to stderr (for diag only)"
+ },
+
+ /* EOF */
+ {0}
+};
+
+/* Plugin reference */
+struct flb_output_plugin out_es_plugin = {
+ .name = "es",
+ .description = "Elasticsearch",
+ .cb_init = cb_es_init,
+ .cb_pre_run = NULL,
+ .cb_flush = cb_es_flush,
+ .cb_exit = cb_es_exit,
+ .workers = 2,
+
+ /* Configuration */
+ .config_map = config_map,
+
+ /* Test */
+ .test_formatter.callback = elasticsearch_format,
+
+ /* Plugin flags */
+ .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
+};
diff --git a/fluent-bit/plugins/out_es/es.h b/fluent-bit/plugins/out_es/es.h
new file mode 100644
index 00000000..5d187049
--- /dev/null
+++ b/fluent-bit/plugins/out_es/es.h
@@ -0,0 +1,140 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_ES_H
+#define FLB_OUT_ES_H
+
+#define FLB_ES_DEFAULT_HOST "127.0.0.1"
+#define FLB_ES_DEFAULT_PORT 92000
+#define FLB_ES_DEFAULT_INDEX "fluent-bit"
+#define FLB_ES_DEFAULT_TYPE "_doc"
+#define FLB_ES_DEFAULT_PREFIX "logstash"
+#define FLB_ES_DEFAULT_TIME_FMT "%Y.%m.%d"
+#define FLB_ES_DEFAULT_TIME_KEY "@timestamp"
+#define FLB_ES_DEFAULT_TIME_KEYF "%Y-%m-%dT%H:%M:%S"
+#define FLB_ES_DEFAULT_TAG_KEY "flb-key"
+#define FLB_ES_DEFAULT_HTTP_MAX "512k"
+#define FLB_ES_DEFAULT_HTTPS_PORT 443
+#define FLB_ES_WRITE_OP_INDEX "index"
+#define FLB_ES_WRITE_OP_CREATE "create"
+#define FLB_ES_WRITE_OP_UPDATE "update"
+#define FLB_ES_WRITE_OP_UPSERT "upsert"
+
+struct flb_elasticsearch {
+ /* Elasticsearch index (database) and type (table) */
+ char *index;
+ char *type;
+ char suppress_type_name;
+
+ /* HTTP Auth */
+ char *http_user;
+ char *http_passwd;
+
+ /* Elastic Cloud Auth */
+ char *cloud_user;
+ char *cloud_passwd;
+
+ /* AWS Auth */
+#ifdef FLB_HAVE_AWS
+ int has_aws_auth;
+ char *aws_region;
+ char *aws_sts_endpoint;
+ char *aws_profile;
+ struct flb_aws_provider *aws_provider;
+ struct flb_aws_provider *base_aws_provider;
+ /* tls instances can't be re-used; aws provider requires a separate one */
+ struct flb_tls *aws_tls;
+ /* one for the standard chain provider, one for sts assume role */
+ struct flb_tls *aws_sts_tls;
+ char *aws_session_name;
+ char *aws_service_name;
+ struct mk_list *aws_unsigned_headers;
+#endif
+
+ /* HTTP Client Setup */
+ size_t buffer_size;
+
+ /*
+ * If enabled, replace field name dots with underscore, required for
+ * Elasticsearch 2.0-2.3.
+ */
+ int replace_dots;
+
+ int trace_output;
+ int trace_error;
+
+ /*
+ * Logstash compatibility options
+ * ==============================
+ */
+
+ /* enabled/disabled */
+ int logstash_format;
+ int generate_id;
+ int current_time_index;
+
+ /* prefix */
+ flb_sds_t logstash_prefix;
+ flb_sds_t logstash_prefix_separator;
+
+ /* prefix key */
+ flb_sds_t logstash_prefix_key;
+
+ /* date format */
+ flb_sds_t logstash_dateformat;
+
+ /* time key */
+ flb_sds_t time_key;
+
+ /* time key format */
+ flb_sds_t time_key_format;
+
+ /* time key nanoseconds */
+ int time_key_nanos;
+
+
+ /* write operation */
+ flb_sds_t write_operation;
+ /* write operation elasticsearch operation */
+ flb_sds_t es_action;
+
+ /* id_key */
+ flb_sds_t id_key;
+ struct flb_record_accessor *ra_id_key;
+
+ /* include_tag_key */
+ int include_tag_key;
+ flb_sds_t tag_key;
+
+ /* Elasticsearch HTTP API */
+ char uri[256];
+
+ struct flb_record_accessor *ra_prefix_key;
+
+ /* Compression mode (gzip) */
+ int compress_gzip;
+
+ /* Upstream connection to the backend server */
+ struct flb_upstream *u;
+
+ /* Plugin output instance reference */
+ struct flb_output_instance *ins;
+};
+
+#endif
diff --git a/fluent-bit/plugins/out_es/es_bulk.c b/fluent-bit/plugins/out_es/es_bulk.c
new file mode 100644
index 00000000..221f45eb
--- /dev/null
+++ b/fluent-bit/plugins/out_es/es_bulk.c
@@ -0,0 +1,113 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <math.h>
+
+#include <fluent-bit.h>
+#include "es_bulk.h"
+
+struct es_bulk *es_bulk_create(size_t estimated_size)
+{
+ struct es_bulk *b;
+
+ if (estimated_size < ES_BULK_CHUNK) {
+ estimated_size = ES_BULK_CHUNK;
+ }
+
+ b = flb_malloc(sizeof(struct es_bulk));
+ if (!b) {
+ perror("calloc");
+ return NULL;
+ }
+ b->ptr = flb_malloc(estimated_size);
+ if (b->ptr == NULL) {
+ perror("malloc");
+ flb_free(b);
+ return NULL;
+ }
+
+ b->size = estimated_size;
+ b->len = 0;
+
+ return b;
+}
+
+void es_bulk_destroy(struct es_bulk *bulk)
+{
+ if (bulk->size > 0) {
+ flb_free(bulk->ptr);
+ }
+ flb_free(bulk);
+}
+
+int es_bulk_append(struct es_bulk *bulk, char *index, int i_len,
+ char *json, size_t j_len,
+ size_t whole_size, size_t converted_size)
+{
+ int available;
+ int append_size;
+ int required;
+ int remaining_size;
+ char *ptr;
+
+ required = i_len + j_len + ES_BULK_HEADER + 1;
+ available = (bulk->size - bulk->len);
+
+ if (available < required) {
+ /*
+ * estimate a converted size of json
+ * calculate
+ * 1. rest of msgpack data size
+ * 2. ratio from bulk json size and processed msgpack size.
+ */
+ append_size = required - available;
+ if (converted_size == 0) {
+ /* converted_size = 0 causes div/0 */
+ flb_debug("[out_es] converted_size is 0");
+ } else {
+ remaining_size = ceil((whole_size - converted_size) /* rest of size to convert */
+ * ((double)bulk->size / converted_size)); /* = json size / msgpack size */
+ append_size = fmax(append_size, remaining_size);
+ }
+ if (append_size < ES_BULK_CHUNK) {
+ /* append at least ES_BULK_CHUNK size */
+ append_size = ES_BULK_CHUNK;
+ }
+ ptr = flb_realloc(bulk->ptr, bulk->size + append_size);
+ if (!ptr) {
+ flb_errno();
+ return -1;
+ }
+ bulk->ptr = ptr;
+ bulk->size += append_size;
+ }
+
+ memcpy(bulk->ptr + bulk->len, index, i_len);
+ bulk->len += i_len;
+
+ memcpy(bulk->ptr + bulk->len, json, j_len);
+ bulk->len += j_len;
+ bulk->ptr[bulk->len] = '\n';
+ bulk->len++;
+
+ return 0;
+};
diff --git a/fluent-bit/plugins/out_es/es_bulk.h b/fluent-bit/plugins/out_es/es_bulk.h
new file mode 100644
index 00000000..7bb66dbb
--- /dev/null
+++ b/fluent-bit/plugins/out_es/es_bulk.h
@@ -0,0 +1,46 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_ES_BULK_H
+#define FLB_OUT_ES_BULK_H
+
+#include <inttypes.h>
+
+#define ES_BULK_CHUNK 4096 /* Size of buffer chunks */
+#define ES_BULK_HEADER 165 /* ES Bulk API prefix line */
+#define ES_BULK_INDEX_FMT "{\"%s\":{\"_index\":\"%s\",\"_type\":\"%s\"}}\n"
+#define ES_BULK_INDEX_FMT_ID "{\"%s\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\"}}\n"
+#define ES_BULK_INDEX_FMT_WITHOUT_TYPE "{\"%s\":{\"_index\":\"%s\"}}\n"
+#define ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE "{\"%s\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n"
+#define ES_BULK_UPDATE_OP_BODY "{\"doc\":%s}"
+#define ES_BULK_UPSERT_OP_BODY "{\"doc_as_upsert\":true,\"doc\":%s}"
+
+struct es_bulk {
+ char *ptr;
+ uint32_t len;
+ uint32_t size;
+};
+
+struct es_bulk *es_bulk_create(size_t estimated_size);
+int es_bulk_append(struct es_bulk *bulk, char *index, int i_len,
+ char *json, size_t j_len,
+ size_t whole_size, size_t curr_size);
+void es_bulk_destroy(struct es_bulk *bulk);
+
+#endif
diff --git a/fluent-bit/plugins/out_es/es_conf.c b/fluent-bit/plugins/out_es/es_conf.c
new file mode 100644
index 00000000..48c8c3e2
--- /dev/null
+++ b/fluent-bit/plugins/out_es/es_conf.c
@@ -0,0 +1,537 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_record_accessor.h>
+#include <fluent-bit/flb_signv4.h>
+#include <fluent-bit/flb_aws_credentials.h>
+#include <fluent-bit/flb_base64.h>
+
+#include "es.h"
+#include "es_conf.h"
+
+/*
+ * extract_cloud_host extracts the public hostname
+ * of a deployment from a Cloud ID string.
+ *
+ * The Cloud ID string has the format "<deployment_name>:<base64_info>".
+ * Once decoded, the "base64_info" string has the format "<deployment_region>$<elasticsearch_hostname>$<kibana_hostname>"
+ * and the function returns "<elasticsearch_hostname>.<deployment_region>" token.
+ */
+static flb_sds_t extract_cloud_host(struct flb_elasticsearch *ctx,
+ const char *cloud_id)
+{
+
+ char *colon;
+ char *region;
+ char *host;
+ char *port = NULL;
+ char buf[256] = {0};
+ char cloud_host_buf[256] = {0};
+ const char dollar[2] = "$";
+ size_t len;
+ int ret;
+
+ /* keep only part after first ":" */
+ colon = strchr(cloud_id, ':');
+ if (colon == NULL) {
+ return NULL;
+ }
+ colon++;
+
+ /* decode base64 */
+ ret = flb_base64_decode((unsigned char *)buf, sizeof(buf), &len, (unsigned char *)colon, strlen(colon));
+ if (ret) {
+ flb_plg_error(ctx->ins, "cannot decode cloud_id");
+ return NULL;
+ }
+ region = strtok(buf, dollar);
+ if (region == NULL) {
+ return NULL;
+ }
+ host = strtok(NULL, dollar);
+ if (host == NULL) {
+ return NULL;
+ }
+
+ /*
+ * Some cloud id format is "<deployment_region>$<elasticsearch_hostname>:<port>$<kibana_hostname>" .
+ * e.g. https://github.com/elastic/beats/blob/v8.4.1/libbeat/cloudid/cloudid_test.go#L60
+ *
+ * It means the variable "host" can contains ':' and port number.
+ */
+ colon = strchr(host, ':');
+ if (colon != NULL) {
+ /* host contains host number */
+ *colon = '\0'; /* remove port number from host */
+ port = colon+1;
+ }
+
+ strcpy(cloud_host_buf, host);
+ strcat(cloud_host_buf, ".");
+ strcat(cloud_host_buf, region);
+ if (port != NULL) {
+ strcat(cloud_host_buf, ":");
+ strcat(cloud_host_buf, port);
+ }
+ return flb_sds_create(cloud_host_buf);
+}
+
+/*
+ * set_cloud_credentials gets a cloud_auth
+ * and sets the context's cloud_user and cloud_passwd.
+ * Example:
+ * cloud_auth = elastic:ZXVyb3BxxxxxxZTA1Ng
+ * ---->
+ * cloud_user = elastic
+ * cloud_passwd = ZXVyb3BxxxxxxZTA1Ng
+ */
+static void set_cloud_credentials(struct flb_elasticsearch *ctx,
+ const char *cloud_auth)
+{
+ /* extract strings */
+ int items = 0;
+ struct mk_list *toks;
+ struct mk_list *head;
+ struct flb_split_entry *entry;
+ toks = flb_utils_split((const char *)cloud_auth, ':', -1);
+ mk_list_foreach(head, toks) {
+ items++;
+ entry = mk_list_entry(head, struct flb_split_entry, _head);
+ if (items == 1) {
+ ctx->cloud_user = flb_strdup(entry->value);
+ }
+ if (items == 2) {
+ ctx->cloud_passwd = flb_strdup(entry->value);
+ }
+ }
+ flb_utils_split_free(toks);
+}
+
+struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
+ struct flb_config *config)
+{
+ int len;
+ int io_flags = 0;
+ ssize_t ret;
+ char *buf;
+ const char *tmp;
+ const char *path;
+#ifdef FLB_HAVE_AWS
+ char *aws_role_arn = NULL;
+ char *aws_external_id = NULL;
+ char *aws_session_name = NULL;
+#endif
+ char *cloud_port_char;
+ char *cloud_host = NULL;
+ int cloud_host_port = 0;
+ int cloud_port = FLB_ES_DEFAULT_HTTPS_PORT;
+ struct flb_uri *uri = ins->host.uri;
+ struct flb_uri_field *f_index = NULL;
+ struct flb_uri_field *f_type = NULL;
+ struct flb_upstream *upstream;
+ struct flb_elasticsearch *ctx;
+
+ /* Allocate context */
+ ctx = flb_calloc(1, sizeof(struct flb_elasticsearch));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->ins = ins;
+
+ if (uri) {
+ if (uri->count >= 2) {
+ f_index = flb_uri_get(uri, 0);
+ f_type = flb_uri_get(uri, 1);
+ }
+ }
+
+ /* handle cloud_id */
+ tmp = flb_output_get_property("cloud_id", ins);
+ if (tmp) {
+ cloud_host = extract_cloud_host(ctx, tmp);
+ if (cloud_host == NULL) {
+ flb_plg_error(ctx->ins, "cannot extract cloud_host");
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+ flb_plg_debug(ctx->ins, "extracted cloud_host: '%s'", cloud_host);
+
+ cloud_port_char = strchr(cloud_host, ':');
+
+ if (cloud_port_char == NULL) {
+ flb_plg_debug(ctx->ins, "cloud_host: '%s' does not contain a port: '%s'", cloud_host, cloud_host);
+ }
+ else {
+ cloud_port_char[0] = '\0';
+ cloud_port_char = &cloud_port_char[1];
+ flb_plg_debug(ctx->ins, "extracted cloud_port_char: '%s'", cloud_port_char);
+ cloud_host_port = (int) strtol(cloud_port_char, (char **) NULL, 10);
+ flb_plg_debug(ctx->ins, "converted cloud_port_char to port int: '%i'", cloud_host_port);
+ }
+
+ if (cloud_host_port == 0) {
+ cloud_host_port = cloud_port;
+ }
+
+ flb_plg_debug(ctx->ins,
+ "checked whether extracted port was null and set it to "
+ "default https port or not. Outcome: '%i' and cloud_host: '%s'.",
+ cloud_host_port, cloud_host);
+
+ if (ins->host.name != NULL) {
+ flb_sds_destroy(ins->host.name);
+ }
+
+ ins->host.name = cloud_host;
+ ins->host.port = cloud_host_port;
+ }
+
+ /* Set default network configuration */
+ flb_output_net_default("127.0.0.1", 9200, ins);
+
+ /* Populate context with config map defaults and incoming properties */
+ ret = flb_output_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "configuration error");
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* handle cloud_auth */
+ tmp = flb_output_get_property("cloud_auth", ins);
+ if (tmp) {
+ set_cloud_credentials(ctx, tmp);
+ }
+
+ /* use TLS ? */
+ if (ins->use_tls == FLB_TRUE) {
+ io_flags = FLB_IO_TLS;
+ }
+ else {
+ io_flags = FLB_IO_TCP;
+ }
+
+ if (ins->host.ipv6 == FLB_TRUE) {
+ io_flags |= FLB_IO_IPV6;
+ }
+
+ /* Compress (gzip) */
+ tmp = flb_output_get_property("compress", ins);
+ ctx->compress_gzip = FLB_FALSE;
+ if (tmp) {
+ if (strcasecmp(tmp, "gzip") == 0) {
+ ctx->compress_gzip = FLB_TRUE;
+ }
+ }
+
+ /* Prepare an upstream handler */
+ upstream = flb_upstream_create(config,
+ ins->host.name,
+ ins->host.port,
+ io_flags,
+ ins->tls);
+ if (!upstream) {
+ flb_plg_error(ctx->ins, "cannot create Upstream context");
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+ ctx->u = upstream;
+
+ /* Set instance flags into upstream */
+ flb_output_upstream_set(ctx->u, ins);
+
+ /* Set manual Index and Type */
+ if (f_index) {
+ ctx->index = flb_strdup(f_index->value); /* FIXME */
+ }
+
+ if (f_type) {
+ ctx->type = flb_strdup(f_type->value); /* FIXME */
+ }
+
+ /* HTTP Payload (response) maximum buffer size (0 == unlimited) */
+ if (ctx->buffer_size == -1) {
+ ctx->buffer_size = 0;
+ }
+
+ /* Elasticsearch: Path */
+ path = flb_output_get_property("path", ins);
+ if (!path) {
+ path = "";
+ }
+
+ /* Elasticsearch: Pipeline */
+ tmp = flb_output_get_property("pipeline", ins);
+ if (tmp) {
+ snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp);
+ }
+ else {
+ snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path);
+ }
+
+ if (ctx->id_key) {
+ ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE);
+ if (ctx->ra_id_key == NULL) {
+ flb_plg_error(ins, "could not create record accessor for Id Key");
+ }
+ if (ctx->generate_id == FLB_TRUE) {
+ flb_plg_warn(ins, "Generate_ID is ignored when ID_key is set");
+ ctx->generate_id = FLB_FALSE;
+ }
+ }
+
+ if (ctx->write_operation) {
+ if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_INDEX) == 0) {
+ ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_INDEX);
+ }
+ else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_CREATE) == 0) {
+ ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_CREATE);
+ }
+ else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0
+ || strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) {
+ ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_UPDATE);
+ }
+ else {
+ flb_plg_error(ins, "wrong Write_Operation (should be one of index, create, update, upsert)");
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+ if (strcasecmp(ctx->es_action, FLB_ES_WRITE_OP_UPDATE) == 0
+ && !ctx->ra_id_key && ctx->generate_id == FLB_FALSE) {
+ flb_plg_error(ins, "Id_Key or Generate_Id must be set when Write_Operation update or upsert");
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+ }
+
+ if (ctx->logstash_prefix_key) {
+ if (ctx->logstash_prefix_key[0] != '$') {
+ len = flb_sds_len(ctx->logstash_prefix_key);
+ buf = flb_malloc(len + 2);
+ if (!buf) {
+ flb_errno();
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+ buf[0] = '$';
+ memcpy(buf + 1, ctx->logstash_prefix_key, len);
+ buf[len + 1] = '\0';
+
+ ctx->ra_prefix_key = flb_ra_create(buf, FLB_TRUE);
+ flb_free(buf);
+ }
+ else {
+ ctx->ra_prefix_key = flb_ra_create(ctx->logstash_prefix_key, FLB_TRUE);
+ }
+
+ if (!ctx->ra_prefix_key) {
+ flb_plg_error(ins, "invalid logstash_prefix_key pattern '%s'", tmp);
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+ }
+
+#ifdef FLB_HAVE_AWS
+ /* AWS Auth Unsigned Headers */
+ ctx->aws_unsigned_headers = flb_malloc(sizeof(struct mk_list));
+ if (ret != 0) {
+ flb_es_conf_destroy(ctx);
+ }
+ flb_slist_create(ctx->aws_unsigned_headers);
+ ret = flb_slist_add(ctx->aws_unsigned_headers, "Content-Length");
+ if (ret != 0) {
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* AWS Auth */
+ ctx->has_aws_auth = FLB_FALSE;
+ tmp = flb_output_get_property("aws_auth", ins);
+ if (tmp) {
+ if (strncasecmp(tmp, "On", 2) == 0) {
+ ctx->has_aws_auth = FLB_TRUE;
+ flb_debug("[out_es] Enabled AWS Auth");
+
+ /* AWS provider needs a separate TLS instance */
+ ctx->aws_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
+ FLB_TRUE,
+ ins->tls_debug,
+ ins->tls_vhost,
+ ins->tls_ca_path,
+ ins->tls_ca_file,
+ ins->tls_crt_file,
+ ins->tls_key_file,
+ ins->tls_key_passwd);
+ if (!ctx->aws_tls) {
+ flb_errno();
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+
+ tmp = flb_output_get_property("aws_region", ins);
+ if (!tmp) {
+ flb_error("[out_es] aws_auth enabled but aws_region not set");
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+ ctx->aws_region = (char *) tmp;
+
+ tmp = flb_output_get_property("aws_sts_endpoint", ins);
+ if (tmp) {
+ ctx->aws_sts_endpoint = (char *) tmp;
+ }
+
+ ctx->aws_provider = flb_standard_chain_provider_create(config,
+ ctx->aws_tls,
+ ctx->aws_region,
+ ctx->aws_sts_endpoint,
+ NULL,
+ flb_aws_client_generator(),
+ ctx->aws_profile);
+ if (!ctx->aws_provider) {
+ flb_error("[out_es] Failed to create AWS Credential Provider");
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+
+ tmp = flb_output_get_property("aws_role_arn", ins);
+ if (tmp) {
+ /* Use the STS Provider */
+ ctx->base_aws_provider = ctx->aws_provider;
+ aws_role_arn = (char *) tmp;
+ aws_external_id = NULL;
+ tmp = flb_output_get_property("aws_external_id", ins);
+ if (tmp) {
+ aws_external_id = (char *) tmp;
+ }
+
+ aws_session_name = flb_sts_session_name();
+ if (!aws_session_name) {
+ flb_error("[out_es] Failed to create aws iam role "
+ "session name");
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* STS provider needs yet another separate TLS instance */
+ ctx->aws_sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
+ FLB_TRUE,
+ ins->tls_debug,
+ ins->tls_vhost,
+ ins->tls_ca_path,
+ ins->tls_ca_file,
+ ins->tls_crt_file,
+ ins->tls_key_file,
+ ins->tls_key_passwd);
+ if (!ctx->aws_sts_tls) {
+ flb_errno();
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+
+ ctx->aws_provider = flb_sts_provider_create(config,
+ ctx->aws_sts_tls,
+ ctx->
+ base_aws_provider,
+ aws_external_id,
+ aws_role_arn,
+ aws_session_name,
+ ctx->aws_region,
+ ctx->aws_sts_endpoint,
+ NULL,
+ flb_aws_client_generator());
+ /* Session name can be freed once provider is created */
+ flb_free(aws_session_name);
+ if (!ctx->aws_provider) {
+ flb_error("[out_es] Failed to create AWS STS Credential "
+ "Provider");
+ flb_es_conf_destroy(ctx);
+ return NULL;
+ }
+
+ }
+
+ /* initialize credentials in sync mode */
+ ctx->aws_provider->provider_vtable->sync(ctx->aws_provider);
+ ctx->aws_provider->provider_vtable->init(ctx->aws_provider);
+ /* set back to async */
+ ctx->aws_provider->provider_vtable->async(ctx->aws_provider);
+ ctx->aws_provider->provider_vtable->upstream_set(ctx->aws_provider, ctx->ins);
+ }
+ }
+#endif
+
+ return ctx;
+}
+
+int flb_es_conf_destroy(struct flb_elasticsearch *ctx)
+{
+ if (!ctx) {
+ return 0;
+ }
+
+ if (ctx->u) {
+ flb_upstream_destroy(ctx->u);
+ }
+ if (ctx->ra_id_key) {
+ flb_ra_destroy(ctx->ra_id_key);
+ ctx->ra_id_key = NULL;
+ }
+ if (ctx->es_action) {
+ flb_free(ctx->es_action);
+ }
+
+#ifdef FLB_HAVE_AWS
+ if (ctx->base_aws_provider) {
+ flb_aws_provider_destroy(ctx->base_aws_provider);
+ }
+
+ if (ctx->aws_provider) {
+ flb_aws_provider_destroy(ctx->aws_provider);
+ }
+
+ if (ctx->aws_tls) {
+ flb_tls_destroy(ctx->aws_tls);
+ }
+
+ if (ctx->aws_sts_tls) {
+ flb_tls_destroy(ctx->aws_sts_tls);
+ }
+
+ if (ctx->aws_unsigned_headers) {
+ flb_slist_destroy(ctx->aws_unsigned_headers);
+ flb_free(ctx->aws_unsigned_headers);
+ }
+#endif
+
+ if (ctx->ra_prefix_key) {
+ flb_ra_destroy(ctx->ra_prefix_key);
+ }
+
+ flb_free(ctx->cloud_passwd);
+ flb_free(ctx->cloud_user);
+ flb_free(ctx);
+
+ return 0;
+}
diff --git a/fluent-bit/plugins/out_es/es_conf.h b/fluent-bit/plugins/out_es/es_conf.h
new file mode 100644
index 00000000..3c421bec
--- /dev/null
+++ b/fluent-bit/plugins/out_es/es_conf.h
@@ -0,0 +1,33 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_ES_CONF_H
+#define FLB_OUT_ES_CONF_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_config.h>
+
+#include "es.h"
+
+struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
+ struct flb_config *config);
+int flb_es_conf_destroy(struct flb_elasticsearch *ctx);
+
+#endif
diff --git a/fluent-bit/plugins/out_es/murmur3.c b/fluent-bit/plugins/out_es/murmur3.c
new file mode 100644
index 00000000..8fccb6de
--- /dev/null
+++ b/fluent-bit/plugins/out_es/murmur3.c
@@ -0,0 +1,314 @@
+//-----------------------------------------------------------------------------
+// MurmurHash3 was written by Austin Appleby, and is placed in the public
+// domain. The author hereby disclaims copyright to this source code.
+
+// Note - The x86 and x64 versions do _not_ produce the same results, as the
+// algorithms are optimized for their respective platforms. You can still
+// compile and run any of them on any platform, but your performance with the
+// non-native version will be less than optimal.
+
+#include "murmur3.h"
+
+//-----------------------------------------------------------------------------
+// Platform-specific functions and macros
+
+#ifdef __GNUC__
+#define FORCE_INLINE __attribute__((always_inline)) inline
+#else
+#define FORCE_INLINE inline
+#endif
+
+static FORCE_INLINE uint32_t rotl32 ( uint32_t x, int8_t r )
+{
+ return (x << r) | (x >> (32 - r));
+}
+
+static FORCE_INLINE uint64_t rotl64 ( uint64_t x, int8_t r )
+{
+ return (x << r) | (x >> (64 - r));
+}
+
+#define ROTL32(x,y) rotl32(x,y)
+#define ROTL64(x,y) rotl64(x,y)
+
+#define BIG_CONSTANT(x) (x##LLU)
+
+//-----------------------------------------------------------------------------
+// Block read - if your platform needs to do endian-swapping or can only
+// handle aligned reads, do the conversion here
+
+#define getblock(p, i) (p[i])
+
+//-----------------------------------------------------------------------------
+// Finalization mix - force all bits of a hash block to avalanche
+
+static FORCE_INLINE uint32_t fmix32 ( uint32_t h )
+{
+ h ^= h >> 16;
+ h *= 0x85ebca6b;
+ h ^= h >> 13;
+ h *= 0xc2b2ae35;
+ h ^= h >> 16;
+
+ return h;
+}
+
+//----------
+
+static FORCE_INLINE uint64_t fmix64 ( uint64_t k )
+{
+ k ^= k >> 33;
+ k *= BIG_CONSTANT(0xff51afd7ed558ccd);
+ k ^= k >> 33;
+ k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53);
+ k ^= k >> 33;
+
+ return k;
+}
+
+//-----------------------------------------------------------------------------
+
+void MurmurHash3_x86_32 ( const void * key, int len,
+ uint32_t seed, void * out )
+{
+ const uint8_t * data = (const uint8_t*)key;
+ const int nblocks = len / 4;
+ int i;
+
+ uint32_t h1 = seed;
+
+ uint32_t c1 = 0xcc9e2d51;
+ uint32_t c2 = 0x1b873593;
+
+ //----------
+ // body
+
+ const uint32_t * blocks = (const uint32_t *)(data + nblocks*4);
+
+ for(i = -nblocks; i; i++)
+ {
+ uint32_t k1 = getblock(blocks,i);
+
+ k1 *= c1;
+ k1 = ROTL32(k1,15);
+ k1 *= c2;
+
+ h1 ^= k1;
+ h1 = ROTL32(h1,13);
+ h1 = h1*5+0xe6546b64;
+ }
+
+ //----------
+ // tail
+
+ const uint8_t * tail = (const uint8_t*)(data + nblocks*4);
+
+ uint32_t k1 = 0;
+
+ switch(len & 3)
+ {
+ case 3: k1 ^= tail[2] << 16;
+ case 2: k1 ^= tail[1] << 8;
+ case 1: k1 ^= tail[0];
+ k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
+ };
+
+ //----------
+ // finalization
+
+ h1 ^= len;
+
+ h1 = fmix32(h1);
+
+ *(uint32_t*)out = h1;
+}
+
+//-----------------------------------------------------------------------------
+
+void MurmurHash3_x86_128 ( const void * key, const int len,
+ uint32_t seed, void * out )
+{
+ const uint8_t * data = (const uint8_t*)key;
+ const int nblocks = len / 16;
+ int i;
+
+ uint32_t h1 = seed;
+ uint32_t h2 = seed;
+ uint32_t h3 = seed;
+ uint32_t h4 = seed;
+
+ uint32_t c1 = 0x239b961b;
+ uint32_t c2 = 0xab0e9789;
+ uint32_t c3 = 0x38b34ae5;
+ uint32_t c4 = 0xa1e38b93;
+
+ //----------
+ // body
+
+ const uint32_t * blocks = (const uint32_t *)(data + nblocks*16);
+
+ for(i = -nblocks; i; i++)
+ {
+ uint32_t k1 = getblock(blocks,i*4+0);
+ uint32_t k2 = getblock(blocks,i*4+1);
+ uint32_t k3 = getblock(blocks,i*4+2);
+ uint32_t k4 = getblock(blocks,i*4+3);
+
+ k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
+
+ h1 = ROTL32(h1,19); h1 += h2; h1 = h1*5+0x561ccd1b;
+
+ k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2;
+
+ h2 = ROTL32(h2,17); h2 += h3; h2 = h2*5+0x0bcaa747;
+
+ k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3;
+
+ h3 = ROTL32(h3,15); h3 += h4; h3 = h3*5+0x96cd1c35;
+
+ k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4;
+
+ h4 = ROTL32(h4,13); h4 += h1; h4 = h4*5+0x32ac3b17;
+ }
+
+ //----------
+ // tail
+
+ const uint8_t * tail = (const uint8_t*)(data + nblocks*16);
+
+ uint32_t k1 = 0;
+ uint32_t k2 = 0;
+ uint32_t k3 = 0;
+ uint32_t k4 = 0;
+
+ switch(len & 15)
+ {
+ case 15: k4 ^= tail[14] << 16;
+ case 14: k4 ^= tail[13] << 8;
+ case 13: k4 ^= tail[12] << 0;
+ k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4;
+
+ case 12: k3 ^= tail[11] << 24;
+ case 11: k3 ^= tail[10] << 16;
+ case 10: k3 ^= tail[ 9] << 8;
+ case 9: k3 ^= tail[ 8] << 0;
+ k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3;
+
+ case 8: k2 ^= tail[ 7] << 24;
+ case 7: k2 ^= tail[ 6] << 16;
+ case 6: k2 ^= tail[ 5] << 8;
+ case 5: k2 ^= tail[ 4] << 0;
+ k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2;
+
+ case 4: k1 ^= tail[ 3] << 24;
+ case 3: k1 ^= tail[ 2] << 16;
+ case 2: k1 ^= tail[ 1] << 8;
+ case 1: k1 ^= tail[ 0] << 0;
+ k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
+ };
+
+ //----------
+ // finalization
+
+ h1 ^= len; h2 ^= len; h3 ^= len; h4 ^= len;
+
+ h1 += h2; h1 += h3; h1 += h4;
+ h2 += h1; h3 += h1; h4 += h1;
+
+ h1 = fmix32(h1);
+ h2 = fmix32(h2);
+ h3 = fmix32(h3);
+ h4 = fmix32(h4);
+
+ h1 += h2; h1 += h3; h1 += h4;
+ h2 += h1; h3 += h1; h4 += h1;
+
+ ((uint32_t*)out)[0] = h1;
+ ((uint32_t*)out)[1] = h2;
+ ((uint32_t*)out)[2] = h3;
+ ((uint32_t*)out)[3] = h4;
+}
+
+//-----------------------------------------------------------------------------
+
+void MurmurHash3_x64_128 ( const void * key, const int len,
+ const uint32_t seed, void * out )
+{
+ const uint8_t * data = (const uint8_t*)key;
+ const int nblocks = len / 16;
+ int i;
+
+ uint64_t h1 = seed;
+ uint64_t h2 = seed;
+
+ uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5);
+ uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f);
+
+ //----------
+ // body
+
+ const uint64_t * blocks = (const uint64_t *)(data);
+
+ for(i = 0; i < nblocks; i++)
+ {
+ uint64_t k1 = getblock(blocks,i*2+0);
+ uint64_t k2 = getblock(blocks,i*2+1);
+
+ k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1;
+
+ h1 = ROTL64(h1,27); h1 += h2; h1 = h1*5+0x52dce729;
+
+ k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2;
+
+ h2 = ROTL64(h2,31); h2 += h1; h2 = h2*5+0x38495ab5;
+ }
+
+ //----------
+ // tail
+
+ const uint8_t * tail = (const uint8_t*)(data + nblocks*16);
+
+ uint64_t k1 = 0;
+ uint64_t k2 = 0;
+
+ switch(len & 15)
+ {
+ case 15: k2 ^= (uint64_t)(tail[14]) << 48;
+ case 14: k2 ^= (uint64_t)(tail[13]) << 40;
+ case 13: k2 ^= (uint64_t)(tail[12]) << 32;
+ case 12: k2 ^= (uint64_t)(tail[11]) << 24;
+ case 11: k2 ^= (uint64_t)(tail[10]) << 16;
+ case 10: k2 ^= (uint64_t)(tail[ 9]) << 8;
+ case 9: k2 ^= (uint64_t)(tail[ 8]) << 0;
+ k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2;
+
+ case 8: k1 ^= (uint64_t)(tail[ 7]) << 56;
+ case 7: k1 ^= (uint64_t)(tail[ 6]) << 48;
+ case 6: k1 ^= (uint64_t)(tail[ 5]) << 40;
+ case 5: k1 ^= (uint64_t)(tail[ 4]) << 32;
+ case 4: k1 ^= (uint64_t)(tail[ 3]) << 24;
+ case 3: k1 ^= (uint64_t)(tail[ 2]) << 16;
+ case 2: k1 ^= (uint64_t)(tail[ 1]) << 8;
+ case 1: k1 ^= (uint64_t)(tail[ 0]) << 0;
+ k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1;
+ };
+
+ //----------
+ // finalization
+
+ h1 ^= len; h2 ^= len;
+
+ h1 += h2;
+ h2 += h1;
+
+ h1 = fmix64(h1);
+ h2 = fmix64(h2);
+
+ h1 += h2;
+ h2 += h1;
+
+ ((uint64_t*)out)[0] = h1;
+ ((uint64_t*)out)[1] = h2;
+}
+
+//-----------------------------------------------------------------------------
diff --git a/fluent-bit/plugins/out_es/murmur3.h b/fluent-bit/plugins/out_es/murmur3.h
new file mode 100644
index 00000000..c85395a1
--- /dev/null
+++ b/fluent-bit/plugins/out_es/murmur3.h
@@ -0,0 +1,29 @@
+//-----------------------------------------------------------------------------
+// MurmurHash3 was written by Austin Appleby, and is placed in the
+// public domain. The author hereby disclaims copyright to this source
+// code.
+
+#ifndef _MURMURHASH3_H_
+#define _MURMURHASH3_H_
+
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+//-----------------------------------------------------------------------------
+
+void MurmurHash3_x86_32 (const void *key, int len, uint32_t seed, void *out);
+
+void MurmurHash3_x86_128(const void *key, int len, uint32_t seed, void *out);
+
+void MurmurHash3_x64_128(const void *key, int len, uint32_t seed, void *out);
+
+//-----------------------------------------------------------------------------
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // _MURMURHASH3_H_ \ No newline at end of file