diff options
Diffstat (limited to 'fluent-bit/plugins/out_es')
-rw-r--r-- | fluent-bit/plugins/out_es/CMakeLists.txt | 8 | ||||
-rw-r--r-- | fluent-bit/plugins/out_es/es.c | 1230 | ||||
-rw-r--r-- | fluent-bit/plugins/out_es/es.h | 140 | ||||
-rw-r--r-- | fluent-bit/plugins/out_es/es_bulk.c | 113 | ||||
-rw-r--r-- | fluent-bit/plugins/out_es/es_bulk.h | 46 | ||||
-rw-r--r-- | fluent-bit/plugins/out_es/es_conf.c | 537 | ||||
-rw-r--r-- | fluent-bit/plugins/out_es/es_conf.h | 33 | ||||
-rw-r--r-- | fluent-bit/plugins/out_es/murmur3.c | 314 | ||||
-rw-r--r-- | fluent-bit/plugins/out_es/murmur3.h | 29 |
9 files changed, 2450 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_es/CMakeLists.txt b/fluent-bit/plugins/out_es/CMakeLists.txt new file mode 100644 index 00000000..4fad4f27 --- /dev/null +++ b/fluent-bit/plugins/out_es/CMakeLists.txt @@ -0,0 +1,8 @@ +set(src + es_bulk.c + es_conf.c + es.c + murmur3.c) + +FLB_PLUGIN(out_es "${src}" "mk_core") +target_link_libraries(flb-plugin-out_es) diff --git a/fluent-bit/plugins/out_es/es.c b/fluent-bit/plugins/out_es/es.c new file mode 100644 index 00000000..db2bcee5 --- /dev/null +++ b/fluent-bit/plugins/out_es/es.c @@ -0,0 +1,1230 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_network.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_signv4.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_gzip.h> +#include <fluent-bit/flb_record_accessor.h> +#include <fluent-bit/flb_ra_key.h> +#include <fluent-bit/flb_log_event_decoder.h> +#include <msgpack.h> + +#include <time.h> + +#include "es.h" +#include "es_conf.h" +#include "es_bulk.h" +#include "murmur3.h" + +struct flb_output_plugin out_es_plugin; + +static int es_pack_array_content(msgpack_packer *tmp_pck, + msgpack_object array, + struct flb_elasticsearch *ctx); + +#ifdef FLB_HAVE_AWS +static flb_sds_t add_aws_auth(struct flb_http_client *c, + struct flb_elasticsearch *ctx) +{ + flb_sds_t signature = NULL; + int ret; + + flb_plg_debug(ctx->ins, "Signing request with AWS Sigv4"); + + /* Amazon OpenSearch Sigv4 does not allow the host header to include the port */ + ret = flb_http_strip_port_from_host(c); + if (ret < 0) { + flb_plg_error(ctx->ins, "could not strip port from host for sigv4"); + return NULL; + } + + /* AWS Fluent Bit user agent */ + flb_http_add_header(c, "User-Agent", 10, "aws-fluent-bit-plugin", 21); + + signature = flb_signv4_do(c, FLB_TRUE, FLB_TRUE, time(NULL), + ctx->aws_region, ctx->aws_service_name, + S3_MODE_SIGNED_PAYLOAD, ctx->aws_unsigned_headers, + ctx->aws_provider); + if (!signature) { + flb_plg_error(ctx->ins, "could not sign request with sigv4"); + return NULL; + } + return signature; +} +#endif /* FLB_HAVE_AWS */ + +static int es_pack_map_content(msgpack_packer *tmp_pck, + msgpack_object map, + struct flb_elasticsearch *ctx) +{ + int i; + char *ptr_key = NULL; + char buf_key[256]; + msgpack_object *k; + msgpack_object *v; + + for (i = 0; i < map.via.map.size; i++) { + k = &map.via.map.ptr[i].key; + v = &map.via.map.ptr[i].val; + ptr_key = NULL; + + /* Store key */ + const char *key_ptr = NULL; + size_t key_size = 0; + + if (k->type == MSGPACK_OBJECT_BIN) { + key_ptr = k->via.bin.ptr; + key_size = k->via.bin.size; + } + else if (k->type == MSGPACK_OBJECT_STR) { + key_ptr = k->via.str.ptr; + key_size = k->via.str.size; + } + + if (key_size < (sizeof(buf_key) - 1)) { + memcpy(buf_key, key_ptr, key_size); + buf_key[key_size] = '\0'; + ptr_key = buf_key; + } + else { + /* Long map keys have a performance penalty */ + ptr_key = flb_malloc(key_size + 1); + if (!ptr_key) { + flb_errno(); + return -1; + } + + memcpy(ptr_key, key_ptr, key_size); + ptr_key[key_size] = '\0'; + } + + /* + * Sanitize key name, Elastic Search 2.x don't allow dots + * in field names: + * + * https://goo.gl/R5NMTr + */ + if (ctx->replace_dots == FLB_TRUE) { + char *p = ptr_key; + char *end = ptr_key + key_size; + while (p != end) { + if (*p == '.') *p = '_'; + p++; + } + } + + /* Append the key */ + msgpack_pack_str(tmp_pck, key_size); + msgpack_pack_str_body(tmp_pck, ptr_key, key_size); + + /* Release temporary key if was allocated */ + if (ptr_key && ptr_key != buf_key) { + flb_free(ptr_key); + } + ptr_key = NULL; + + /* + * The value can be any data type, if it's a map we need to + * sanitize to avoid dots. + */ + if (v->type == MSGPACK_OBJECT_MAP) { + msgpack_pack_map(tmp_pck, v->via.map.size); + es_pack_map_content(tmp_pck, *v, ctx); + } + /* + * The value can be any data type, if it's an array we need to + * pass it to es_pack_array_content. + */ + else if (v->type == MSGPACK_OBJECT_ARRAY) { + msgpack_pack_array(tmp_pck, v->via.array.size); + es_pack_array_content(tmp_pck, *v, ctx); + } + else { + msgpack_pack_object(tmp_pck, *v); + } + } + return 0; +} + +/* + * Iterate through the array and sanitize elements. + * Mutual recursion with es_pack_map_content. + */ +static int es_pack_array_content(msgpack_packer *tmp_pck, + msgpack_object array, + struct flb_elasticsearch *ctx) +{ + int i; + msgpack_object *e; + + for (i = 0; i < array.via.array.size; i++) { + e = &array.via.array.ptr[i]; + if (e->type == MSGPACK_OBJECT_MAP) + { + msgpack_pack_map(tmp_pck, e->via.map.size); + es_pack_map_content(tmp_pck, *e, ctx); + } + else if (e->type == MSGPACK_OBJECT_ARRAY) + { + msgpack_pack_array(tmp_pck, e->via.array.size); + es_pack_array_content(tmp_pck, *e, ctx); + } + else + { + msgpack_pack_object(tmp_pck, *e); + } + } + return 0; +} + +/* + * Get _id value from incoming record. + * If it successed, return the value as flb_sds_t. + * If it failed, return NULL. +*/ +static flb_sds_t es_get_id_value(struct flb_elasticsearch *ctx, + msgpack_object *map) +{ + struct flb_ra_value *rval = NULL; + flb_sds_t tmp_str; + rval = flb_ra_get_value_object(ctx->ra_id_key, *map); + if (rval == NULL) { + flb_plg_warn(ctx->ins, "the value of %s is missing", + ctx->id_key); + return NULL; + } + else if(rval->o.type != MSGPACK_OBJECT_STR) { + flb_plg_warn(ctx->ins, "the value of %s is not string", + ctx->id_key); + flb_ra_key_value_destroy(rval); + return NULL; + } + + tmp_str = flb_sds_create_len(rval->o.via.str.ptr, + rval->o.via.str.size); + if (tmp_str == NULL) { + flb_plg_warn(ctx->ins, "cannot create ID string from record"); + flb_ra_key_value_destroy(rval); + return NULL; + } + flb_ra_key_value_destroy(rval); + return tmp_str; +} + +static int compose_index_header(struct flb_elasticsearch *ctx, + int es_index_custom_len, + char *logstash_index, size_t logstash_index_size, + char *separator_str, + struct tm *tm) +{ + int ret; + int len; + char *p; + size_t s; + + /* Compose Index header */ + if (es_index_custom_len > 0) { + p = logstash_index + es_index_custom_len; + } else { + p = logstash_index + flb_sds_len(ctx->logstash_prefix); + } + len = p - logstash_index; + ret = snprintf(p, logstash_index_size - len, "%s", + separator_str); + if (ret > logstash_index_size - len) { + /* exceed limit */ + return -1; + } + p += strlen(separator_str); + len += strlen(separator_str); + + s = strftime(p, logstash_index_size - len, + ctx->logstash_dateformat, tm); + if (s==0) { + /* exceed limit */ + return -1; + } + p += s; + *p++ = '\0'; + + return 0; +} + +/* + * Convert the internal Fluent Bit data representation to the required + * one by Elasticsearch. + * + * 'Sadly' this process involves to convert from Msgpack to JSON. + */ +static int elasticsearch_format(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *flush_ctx, + int event_type, + const char *tag, int tag_len, + const void *data, size_t bytes, + void **out_data, size_t *out_size) +{ + int ret; + int len; + int map_size; + int index_len = 0; + size_t s = 0; + size_t off = 0; + size_t off_prev = 0; + char *es_index; + char logstash_index[256]; + char time_formatted[256]; + char index_formatted[256]; + char es_uuid[37]; + flb_sds_t out_buf; + size_t out_buf_len = 0; + flb_sds_t tmp_buf; + flb_sds_t id_key_str = NULL; + // msgpack_unpacked result; + // msgpack_object root; + msgpack_object map; + // msgpack_object *obj; + flb_sds_t j_index; + struct es_bulk *bulk; + struct tm tm; + struct flb_time tms; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + uint16_t hash[8]; + int es_index_custom_len; + struct flb_elasticsearch *ctx = plugin_context; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + j_index = flb_sds_create_size(ES_BULK_HEADER); + if (j_index == NULL) { + flb_errno(); + return -1; + } + + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + flb_sds_destroy(j_index); + + return -1; + } + + /* Create the bulk composer */ + bulk = es_bulk_create(bytes); + if (!bulk) { + flb_log_event_decoder_destroy(&log_decoder); + flb_sds_destroy(j_index); + return -1; + } + + /* Copy logstash prefix if logstash format is enabled */ + if (ctx->logstash_format == FLB_TRUE) { + strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index)); + logstash_index[sizeof(logstash_index) - 1] = '\0'; + } + + /* + * If logstash format and id generation are disabled, pre-generate + * the index line for all records. + * + * The header stored in 'j_index' will be used for the all records on + * this payload. + */ + if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + flb_time_get(&tms); + gmtime_r(&tms.tm.tv_sec, &tm); + strftime(index_formatted, sizeof(index_formatted) - 1, + ctx->index, &tm); + es_index = index_formatted; + if (ctx->suppress_type_name) { + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_WITHOUT_TYPE, + ctx->es_action, + es_index); + } + else { + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT, + ctx->es_action, + es_index, ctx->type); + } + } + + /* + * Some broken clients may have time drift up to year 1970 + * this will generate corresponding index in Elasticsearch + * in order to prevent generating millions of indexes + * we can set to always use current time for index generation + */ + if (ctx->current_time_index == FLB_TRUE) { + flb_time_get(&tms); + } + + /* Iterate each record and do further formatting */ + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + + /* Only pop time from record if current_time_index is disabled */ + if (ctx->current_time_index == FLB_FALSE) { + flb_time_copy(&tms, &log_event.timestamp); + } + + map = *log_event.body; + map_size = map.via.map.size; + + es_index_custom_len = 0; + if (ctx->logstash_prefix_key) { + flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key, + (char *) tag, tag_len, + map, NULL); + if (v) { + len = flb_sds_len(v); + if (len > 128) { + len = 128; + memcpy(logstash_index, v, 128); + } + else { + memcpy(logstash_index, v, len); + } + es_index_custom_len = len; + flb_sds_destroy(v); + } + } + + /* Create temporary msgpack buffer */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + + if (ctx->include_tag_key == FLB_TRUE) { + map_size++; + } + + /* Set the new map size */ + msgpack_pack_map(&tmp_pck, map_size + 1); + + /* Append the time key */ + msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->time_key)); + msgpack_pack_str_body(&tmp_pck, ctx->time_key, flb_sds_len(ctx->time_key)); + + /* Format the time */ + gmtime_r(&tms.tm.tv_sec, &tm); + s = strftime(time_formatted, sizeof(time_formatted) - 1, + ctx->time_key_format, &tm); + if (ctx->time_key_nanos) { + len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, + ".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec); + } else { + len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, + ".%03" PRIu64 "Z", + (uint64_t) tms.tm.tv_nsec / 1000000); + } + + s += len; + msgpack_pack_str(&tmp_pck, s); + msgpack_pack_str_body(&tmp_pck, time_formatted, s); + + es_index = ctx->index; + if (ctx->logstash_format == FLB_TRUE) { + ret = compose_index_header(ctx, es_index_custom_len, + &logstash_index[0], sizeof(logstash_index), + ctx->logstash_prefix_separator, &tm); + if (ret < 0) { + /* retry with default separator */ + compose_index_header(ctx, es_index_custom_len, + &logstash_index[0], sizeof(logstash_index), + "-", &tm); + } + + es_index = logstash_index; + if (ctx->generate_id == FLB_FALSE) { + if (ctx->suppress_type_name) { + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_WITHOUT_TYPE, + ctx->es_action, + es_index); + } + else { + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT, + ctx->es_action, + es_index, ctx->type); + } + } + } + else if (ctx->current_time_index == FLB_TRUE) { + /* Make sure we handle index time format for index */ + strftime(index_formatted, sizeof(index_formatted) - 1, + ctx->index, &tm); + es_index = index_formatted; + } + + /* Tag Key */ + if (ctx->include_tag_key == FLB_TRUE) { + msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->tag_key)); + msgpack_pack_str_body(&tmp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key)); + msgpack_pack_str(&tmp_pck, tag_len); + msgpack_pack_str_body(&tmp_pck, tag, tag_len); + } + + /* + * The map_content routine iterate over each Key/Value pair found in + * the map and do some sanitization for the key names. + * + * Elasticsearch have a restriction that key names cannot contain + * a dot; if some dot is found, it's replaced with an underscore. + */ + ret = es_pack_map_content(&tmp_pck, map, ctx); + if (ret == -1) { + flb_log_event_decoder_destroy(&log_decoder); + msgpack_sbuffer_destroy(&tmp_sbuf); + es_bulk_destroy(bulk); + flb_sds_destroy(j_index); + return -1; + } + + if (ctx->generate_id == FLB_TRUE) { + MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash); + snprintf(es_uuid, sizeof(es_uuid), + "%04x%04x-%04x-%04x-%04x-%04x%04x%04x", + hash[0], hash[1], hash[2], hash[3], + hash[4], hash[5], hash[6], hash[7]); + if (ctx->suppress_type_name) { + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, + ctx->es_action, + es_index, es_uuid); + } + else { + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID, + ctx->es_action, + es_index, ctx->type, es_uuid); + } + } + if (ctx->ra_id_key) { + id_key_str = es_get_id_value(ctx ,&map); + if (id_key_str) { + if (ctx->suppress_type_name) { + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, + ctx->es_action, + es_index, id_key_str); + } + else { + index_len = flb_sds_snprintf(&j_index, + flb_sds_alloc(j_index), + ES_BULK_INDEX_FMT_ID, + ctx->es_action, + es_index, ctx->type, id_key_str); + } + flb_sds_destroy(id_key_str); + id_key_str = NULL; + } + } + + /* Convert msgpack to JSON */ + out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size); + msgpack_sbuffer_destroy(&tmp_sbuf); + if (!out_buf) { + flb_log_event_decoder_destroy(&log_decoder); + es_bulk_destroy(bulk); + flb_sds_destroy(j_index); + return -1; + } + + out_buf_len = flb_sds_len(out_buf); + if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) { + tmp_buf = out_buf; + out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPDATE_OP_BODY) - 2); + out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPDATE_OP_BODY, tmp_buf); + flb_sds_destroy(tmp_buf); + } + else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) { + tmp_buf = out_buf; + out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPSERT_OP_BODY) - 2); + out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPSERT_OP_BODY, tmp_buf); + flb_sds_destroy(tmp_buf); + } + + ret = es_bulk_append(bulk, j_index, index_len, + out_buf, out_buf_len, + bytes, off_prev); + flb_sds_destroy(out_buf); + + off_prev = off; + if (ret == -1) { + /* We likely ran out of memory, abort here */ + flb_log_event_decoder_destroy(&log_decoder); + *out_size = 0; + es_bulk_destroy(bulk); + flb_sds_destroy(j_index); + return -1; + } + } + flb_log_event_decoder_destroy(&log_decoder); + + /* Set outgoing data */ + *out_data = bulk->ptr; + *out_size = bulk->len; + + /* + * Note: we don't destroy the bulk as we need to keep the allocated + * buffer with the data. Instead we just release the bulk context and + * return the bulk->ptr buffer + */ + flb_free(bulk); + if (ctx->trace_output) { + fwrite(*out_data, 1, *out_size, stdout); + fflush(stdout); + } + flb_sds_destroy(j_index); + return 0; +} + +static int cb_es_init(struct flb_output_instance *ins, + struct flb_config *config, + void *data) +{ + struct flb_elasticsearch *ctx; + + ctx = flb_es_conf_create(ins, config); + if (!ctx) { + flb_plg_error(ins, "cannot initialize plugin"); + return -1; + } + + flb_plg_debug(ctx->ins, "host=%s port=%i uri=%s index=%s type=%s", + ins->host.name, ins->host.port, ctx->uri, + ctx->index, ctx->type); + + flb_output_set_context(ins, ctx); + + /* + * This plugin instance uses the HTTP client interface, let's register + * it debugging callbacks. + */ + flb_output_set_http_debug_callbacks(ins); + + return 0; +} + +static int elasticsearch_error_check(struct flb_elasticsearch *ctx, + struct flb_http_client *c) +{ + int i, j, k; + int ret; + int check = FLB_FALSE; + int root_type; + char *out_buf; + size_t off = 0; + size_t out_size; + msgpack_unpacked result; + msgpack_object root; + msgpack_object key; + msgpack_object val; + msgpack_object item; + msgpack_object item_key; + msgpack_object item_val; + + /* + * Check if our payload is complete: there is such situations where + * the Elasticsearch HTTP response body is bigger than the HTTP client + * buffer so payload can be incomplete. + */ + /* Convert JSON payload to msgpack */ + ret = flb_pack_json(c->resp.payload, c->resp.payload_size, + &out_buf, &out_size, &root_type, NULL); + if (ret == -1) { + /* Is this an incomplete HTTP Request ? */ + if (c->resp.payload_size <= 0) { + return FLB_TRUE; + } + + /* Lookup error field */ + if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) { + return FLB_FALSE; + } + + flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s", + c->resp.payload); + return FLB_TRUE; + } + + /* Lookup error field */ + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, out_buf, out_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s", + c->resp.payload); + return FLB_TRUE; + } + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected payload type=%i", + root.type); + check = FLB_TRUE; + goto done; + } + + for (i = 0; i < root.via.map.size; i++) { + key = root.via.map.ptr[i].key; + if (key.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "unexpected key type=%i", + key.type); + check = FLB_TRUE; + goto done; + } + + if (key.via.str.size == 6 && strncmp(key.via.str.ptr, "errors", 6) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_BOOLEAN) { + flb_plg_error(ctx->ins, "unexpected 'error' value type=%i", + val.type); + check = FLB_TRUE; + goto done; + } + + /* If error == false, we are OK (no errors = FLB_FALSE) */ + if (!val.via.boolean) { + /* no errors */ + check = FLB_FALSE; + goto done; + } + } + else if (key.via.str.size == 5 && strncmp(key.via.str.ptr, "items", 5) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_ARRAY) { + flb_plg_error(ctx->ins, "unexpected 'items' value type=%i", + val.type); + check = FLB_TRUE; + goto done; + } + + for (j = 0; j < val.via.array.size; j++) { + item = val.via.array.ptr[j]; + if (item.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i", + item.type); + check = FLB_TRUE; + goto done; + } + + if (item.via.map.size != 1) { + flb_plg_error(ctx->ins, "unexpected 'item' size=%i", + item.via.map.size); + check = FLB_TRUE; + goto done; + } + + item = item.via.map.ptr[0].val; + if (item.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i", + item.type); + check = FLB_TRUE; + goto done; + } + + for (k = 0; k < item.via.map.size; k++) { + item_key = item.via.map.ptr[k].key; + if (item_key.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "unexpected key type=%i", + item_key.type); + check = FLB_TRUE; + goto done; + } + + if (item_key.via.str.size == 6 && strncmp(item_key.via.str.ptr, "status", 6) == 0) { + item_val = item.via.map.ptr[k].val; + + if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + flb_plg_error(ctx->ins, "unexpected 'status' value type=%i", + item_val.type); + check = FLB_TRUE; + goto done; + } + /* Check for errors other than version conflict (document already exists) */ + if (item_val.via.i64 != 409) { + check = FLB_TRUE; + goto done; + } + } + } + } + } + } + + done: + flb_free(out_buf); + msgpack_unpacked_destroy(&result); + return check; +} + +static void cb_es_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, void *out_context, + struct flb_config *config) +{ + int ret; + size_t pack_size; + char *pack; + void *out_buf; + size_t out_size; + size_t b_sent; + struct flb_elasticsearch *ctx = out_context; + struct flb_connection *u_conn; + struct flb_http_client *c; + flb_sds_t signature = NULL; + int compressed = FLB_FALSE; + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Convert format */ + ret = elasticsearch_format(config, ins, + ctx, NULL, + event_chunk->type, + event_chunk->tag, flb_sds_len(event_chunk->tag), + event_chunk->data, event_chunk->size, + &out_buf, &out_size); + if (ret != 0) { + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + pack = (char *) out_buf; + pack_size = out_size; + + /* Should we compress the payload ? */ + if (ctx->compress_gzip == FLB_TRUE) { + ret = flb_gzip_compress((void *) pack, pack_size, + &out_buf, &out_size); + if (ret == -1) { + flb_plg_error(ctx->ins, + "cannot gzip payload, disabling compression"); + } + else { + compressed = FLB_TRUE; + } + + /* + * The payload buffer is different than pack, means we must be free it. + */ + if (out_buf != pack) { + flb_free(pack); + } + + pack = (char *) out_buf; + pack_size = out_size; + } + + /* Compose HTTP Client request */ + c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, + pack, pack_size, NULL, 0, NULL, 0); + + flb_http_buffer_size(c, ctx->buffer_size); + +#ifndef FLB_HAVE_AWS + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); +#endif + + flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20); + + if (ctx->http_user && ctx->http_passwd) { + flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); + } + else if (ctx->cloud_user && ctx->cloud_passwd) { + flb_http_basic_auth(c, ctx->cloud_user, ctx->cloud_passwd); + } + +#ifdef FLB_HAVE_AWS + if (ctx->has_aws_auth == FLB_TRUE) { + signature = add_aws_auth(c, ctx); + if (!signature) { + goto retry; + } + } + else { + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + } +#endif + + /* Content Encoding: gzip */ + if (compressed == FLB_TRUE) { + flb_http_set_content_encoding_gzip(c); + } + + /* Map debug callbacks */ + flb_http_client_debug(c, ctx->ins->callback); + + ret = flb_http_do(c, &b_sent); + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + goto retry; + } + else { + /* The request was issued successfully, validate the 'error' field */ + flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + if (c->resp.status != 200 && c->resp.status != 201) { + if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", + c->resp.status, ctx->uri, c->resp.payload); + } + else { + flb_plg_error(ctx->ins, "HTTP status=%i URI=%s", + c->resp.status, ctx->uri); + } + goto retry; + } + + if (c->resp.payload_size > 0) { + /* + * Elasticsearch payload should be JSON, we convert it to msgpack + * and lookup the 'error' field. + */ + ret = elasticsearch_error_check(ctx, c); + if (ret == FLB_TRUE) { + /* we got an error */ + if (ctx->trace_error) { + /* + * If trace_error is set, trace the actual + * response from Elasticsearch explaining the problem. + * Trace_Output can be used to see the request. + */ + if (pack_size < 4000) { + flb_plg_debug(ctx->ins, "error caused by: Input\n%.*s\n", + (int) pack_size, pack); + } + if (c->resp.payload_size < 4000) { + flb_plg_error(ctx->ins, "error: Output\n%s", + c->resp.payload); + } else { + /* + * We must use fwrite since the flb_log functions + * will truncate data at 4KB + */ + fwrite(c->resp.payload, 1, c->resp.payload_size, stderr); + fflush(stderr); + } + } + goto retry; + } + else { + flb_plg_debug(ctx->ins, "Elasticsearch response\n%s", + c->resp.payload); + } + } + else { + goto retry; + } + } + + /* Cleanup */ + flb_http_client_destroy(c); + flb_free(pack); + flb_upstream_conn_release(u_conn); + if (signature) { + flb_sds_destroy(signature); + } + FLB_OUTPUT_RETURN(FLB_OK); + + /* Issue a retry */ + retry: + flb_http_client_destroy(c); + flb_free(pack); + + if (out_buf != pack) { + flb_free(out_buf); + } + + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_RETRY); +} + +static int cb_es_exit(void *data, struct flb_config *config) +{ + struct flb_elasticsearch *ctx = data; + + flb_es_conf_destroy(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, index), + "Set an index name" + }, + { + FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, type), + "Set the document type property" + }, + { + FLB_CONFIG_MAP_BOOL, "suppress_type_name", "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, suppress_type_name), + "If true, mapping types is removed. (for v7.0.0 or later)" + }, + + /* HTTP Authentication */ + { + FLB_CONFIG_MAP_STR, "http_user", NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_user), + "Optional username credential for Elastic X-Pack access" + }, + { + FLB_CONFIG_MAP_STR, "http_passwd", "", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_passwd), + "Password for user defined in HTTP_User" + }, + + /* HTTP Compression */ + { + FLB_CONFIG_MAP_STR, "compress", NULL, + 0, FLB_FALSE, 0, + "Set payload compression mechanism. Option available is 'gzip'" + }, + + /* Cloud Authentication */ + { + FLB_CONFIG_MAP_STR, "cloud_id", NULL, + 0, FLB_FALSE, 0, + "Elastic cloud ID of the cluster to connect to" + }, + { + FLB_CONFIG_MAP_STR, "cloud_auth", NULL, + 0, FLB_FALSE, 0, + "Elastic cloud authentication credentials" + }, + + /* AWS Authentication */ +#ifdef FLB_HAVE_AWS + { + FLB_CONFIG_MAP_BOOL, "aws_auth", "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, has_aws_auth), + "Enable AWS Sigv4 Authentication" + }, + { + FLB_CONFIG_MAP_STR, "aws_region", NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_region), + "AWS Region of your Amazon OpenSearch Service cluster" + }, + { + FLB_CONFIG_MAP_STR, "aws_sts_endpoint", NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_sts_endpoint), + "Custom endpoint for the AWS STS API, used with the AWS_Role_ARN option" + }, + { + FLB_CONFIG_MAP_STR, "aws_role_arn", NULL, + 0, FLB_FALSE, 0, + "AWS IAM Role to assume to put records to your Amazon OpenSearch cluster" + }, + { + FLB_CONFIG_MAP_STR, "aws_external_id", NULL, + 0, FLB_FALSE, 0, + "External ID for the AWS IAM Role specified with `aws_role_arn`" + }, + { + FLB_CONFIG_MAP_STR, "aws_service_name", "es", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_service_name), + "AWS Service Name" + }, + { + FLB_CONFIG_MAP_STR, "aws_profile", NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_profile), + "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in " + "$HOME/.aws/ directory." + }, +#endif + + /* Logstash compatibility */ + { + FLB_CONFIG_MAP_BOOL, "logstash_format", "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_format), + "Enable Logstash format compatibility" + }, + { + FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix), + "When Logstash_Format is enabled, the Index name is composed using a prefix " + "and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will " + "become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date " + "when the data is being generated" + }, + { + FLB_CONFIG_MAP_STR, "logstash_prefix_separator", "-", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_separator), + "Set a separator between logstash_prefix and date." + }, + { + FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_key), + "When included: the value in the record that belongs to the key will be looked " + "up and over-write the Logstash_Prefix for index generation. If the key/value " + "is not found in the record then the Logstash_Prefix option will act as a " + "fallback. Nested keys are supported through record accessor pattern" + }, + { + FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_dateformat), + "Time format (based on strftime) to generate the second part of the Index name" + }, + + /* Custom Time and Tag keys */ + { + FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key), + "When Logstash_Format is enabled, each record will get a new timestamp field. " + "The Time_Key property defines the name of that field" + }, + { + FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_format), + "When Logstash_Format is enabled, this property defines the format of the " + "timestamp" + }, + { + FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_nanos), + "When Logstash_Format is enabled, enabling this property sends nanosecond " + "precision timestamps" + }, + { + FLB_CONFIG_MAP_BOOL, "include_tag_key", "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, include_tag_key), + "When enabled, it append the Tag name to the record" + }, + { + FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, tag_key), + "When Include_Tag_Key is enabled, this property defines the key name for the tag" + }, + { + FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, buffer_size), + "Specify the buffer size used to read the response from the Elasticsearch HTTP " + "service. This option is useful for debugging purposes where is required to read " + "full responses, note that response size grows depending of the number of records " + "inserted. To set an unlimited amount of memory set this value to 'false', " + "otherwise the value must be according to the Unit Size specification" + }, + + /* Elasticsearch specifics */ + { + FLB_CONFIG_MAP_STR, "path", NULL, + 0, FLB_FALSE, 0, + "Elasticsearch accepts new data on HTTP query path '/_bulk'. But it is also " + "possible to serve Elasticsearch behind a reverse proxy on a subpath. This " + "option defines such path on the fluent-bit side. It simply adds a path " + "prefix in the indexing HTTP POST URI" + }, + { + FLB_CONFIG_MAP_STR, "pipeline", NULL, + 0, FLB_FALSE, 0, + "Newer versions of Elasticsearch allows to setup filters called pipelines. " + "This option allows to define which pipeline the database should use. For " + "performance reasons is strongly suggested to do parsing and filtering on " + "Fluent Bit side, avoid pipelines" + }, + { + FLB_CONFIG_MAP_BOOL, "generate_id", "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, generate_id), + "When enabled, generate _id for outgoing records. This prevents duplicate " + "records when retrying ES" + }, + { + FLB_CONFIG_MAP_STR, "write_operation", "create", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, write_operation), + "Operation to use to write in bulk requests" + }, + { + FLB_CONFIG_MAP_STR, "id_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key), + "If set, _id will be the value of the key from incoming record." + }, + { + FLB_CONFIG_MAP_BOOL, "replace_dots", "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots), + "When enabled, replace field name dots with underscore, required by Elasticsearch " + "2.0-2.3." + }, + + { + FLB_CONFIG_MAP_BOOL, "current_time_index", "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, current_time_index), + "Use current time for index generation instead of message record" + }, + + /* Trace */ + { + FLB_CONFIG_MAP_BOOL, "trace_output", "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_output), + "When enabled print the Elasticsearch API calls to stdout (for diag only)" + }, + { + FLB_CONFIG_MAP_BOOL, "trace_error", "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_error), + "When enabled print the Elasticsearch exception to stderr (for diag only)" + }, + + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_output_plugin out_es_plugin = { + .name = "es", + .description = "Elasticsearch", + .cb_init = cb_es_init, + .cb_pre_run = NULL, + .cb_flush = cb_es_flush, + .cb_exit = cb_es_exit, + .workers = 2, + + /* Configuration */ + .config_map = config_map, + + /* Test */ + .test_formatter.callback = elasticsearch_format, + + /* Plugin flags */ + .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, +}; diff --git a/fluent-bit/plugins/out_es/es.h b/fluent-bit/plugins/out_es/es.h new file mode 100644 index 00000000..5d187049 --- /dev/null +++ b/fluent-bit/plugins/out_es/es.h @@ -0,0 +1,140 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_ES_H +#define FLB_OUT_ES_H + +#define FLB_ES_DEFAULT_HOST "127.0.0.1" +#define FLB_ES_DEFAULT_PORT 92000 +#define FLB_ES_DEFAULT_INDEX "fluent-bit" +#define FLB_ES_DEFAULT_TYPE "_doc" +#define FLB_ES_DEFAULT_PREFIX "logstash" +#define FLB_ES_DEFAULT_TIME_FMT "%Y.%m.%d" +#define FLB_ES_DEFAULT_TIME_KEY "@timestamp" +#define FLB_ES_DEFAULT_TIME_KEYF "%Y-%m-%dT%H:%M:%S" +#define FLB_ES_DEFAULT_TAG_KEY "flb-key" +#define FLB_ES_DEFAULT_HTTP_MAX "512k" +#define FLB_ES_DEFAULT_HTTPS_PORT 443 +#define FLB_ES_WRITE_OP_INDEX "index" +#define FLB_ES_WRITE_OP_CREATE "create" +#define FLB_ES_WRITE_OP_UPDATE "update" +#define FLB_ES_WRITE_OP_UPSERT "upsert" + +struct flb_elasticsearch { + /* Elasticsearch index (database) and type (table) */ + char *index; + char *type; + char suppress_type_name; + + /* HTTP Auth */ + char *http_user; + char *http_passwd; + + /* Elastic Cloud Auth */ + char *cloud_user; + char *cloud_passwd; + + /* AWS Auth */ +#ifdef FLB_HAVE_AWS + int has_aws_auth; + char *aws_region; + char *aws_sts_endpoint; + char *aws_profile; + struct flb_aws_provider *aws_provider; + struct flb_aws_provider *base_aws_provider; + /* tls instances can't be re-used; aws provider requires a separate one */ + struct flb_tls *aws_tls; + /* one for the standard chain provider, one for sts assume role */ + struct flb_tls *aws_sts_tls; + char *aws_session_name; + char *aws_service_name; + struct mk_list *aws_unsigned_headers; +#endif + + /* HTTP Client Setup */ + size_t buffer_size; + + /* + * If enabled, replace field name dots with underscore, required for + * Elasticsearch 2.0-2.3. + */ + int replace_dots; + + int trace_output; + int trace_error; + + /* + * Logstash compatibility options + * ============================== + */ + + /* enabled/disabled */ + int logstash_format; + int generate_id; + int current_time_index; + + /* prefix */ + flb_sds_t logstash_prefix; + flb_sds_t logstash_prefix_separator; + + /* prefix key */ + flb_sds_t logstash_prefix_key; + + /* date format */ + flb_sds_t logstash_dateformat; + + /* time key */ + flb_sds_t time_key; + + /* time key format */ + flb_sds_t time_key_format; + + /* time key nanoseconds */ + int time_key_nanos; + + + /* write operation */ + flb_sds_t write_operation; + /* write operation elasticsearch operation */ + flb_sds_t es_action; + + /* id_key */ + flb_sds_t id_key; + struct flb_record_accessor *ra_id_key; + + /* include_tag_key */ + int include_tag_key; + flb_sds_t tag_key; + + /* Elasticsearch HTTP API */ + char uri[256]; + + struct flb_record_accessor *ra_prefix_key; + + /* Compression mode (gzip) */ + int compress_gzip; + + /* Upstream connection to the backend server */ + struct flb_upstream *u; + + /* Plugin output instance reference */ + struct flb_output_instance *ins; +}; + +#endif diff --git a/fluent-bit/plugins/out_es/es_bulk.c b/fluent-bit/plugins/out_es/es_bulk.c new file mode 100644 index 00000000..221f45eb --- /dev/null +++ b/fluent-bit/plugins/out_es/es_bulk.c @@ -0,0 +1,113 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <math.h> + +#include <fluent-bit.h> +#include "es_bulk.h" + +struct es_bulk *es_bulk_create(size_t estimated_size) +{ + struct es_bulk *b; + + if (estimated_size < ES_BULK_CHUNK) { + estimated_size = ES_BULK_CHUNK; + } + + b = flb_malloc(sizeof(struct es_bulk)); + if (!b) { + perror("calloc"); + return NULL; + } + b->ptr = flb_malloc(estimated_size); + if (b->ptr == NULL) { + perror("malloc"); + flb_free(b); + return NULL; + } + + b->size = estimated_size; + b->len = 0; + + return b; +} + +void es_bulk_destroy(struct es_bulk *bulk) +{ + if (bulk->size > 0) { + flb_free(bulk->ptr); + } + flb_free(bulk); +} + +int es_bulk_append(struct es_bulk *bulk, char *index, int i_len, + char *json, size_t j_len, + size_t whole_size, size_t converted_size) +{ + int available; + int append_size; + int required; + int remaining_size; + char *ptr; + + required = i_len + j_len + ES_BULK_HEADER + 1; + available = (bulk->size - bulk->len); + + if (available < required) { + /* + * estimate a converted size of json + * calculate + * 1. rest of msgpack data size + * 2. ratio from bulk json size and processed msgpack size. + */ + append_size = required - available; + if (converted_size == 0) { + /* converted_size = 0 causes div/0 */ + flb_debug("[out_es] converted_size is 0"); + } else { + remaining_size = ceil((whole_size - converted_size) /* rest of size to convert */ + * ((double)bulk->size / converted_size)); /* = json size / msgpack size */ + append_size = fmax(append_size, remaining_size); + } + if (append_size < ES_BULK_CHUNK) { + /* append at least ES_BULK_CHUNK size */ + append_size = ES_BULK_CHUNK; + } + ptr = flb_realloc(bulk->ptr, bulk->size + append_size); + if (!ptr) { + flb_errno(); + return -1; + } + bulk->ptr = ptr; + bulk->size += append_size; + } + + memcpy(bulk->ptr + bulk->len, index, i_len); + bulk->len += i_len; + + memcpy(bulk->ptr + bulk->len, json, j_len); + bulk->len += j_len; + bulk->ptr[bulk->len] = '\n'; + bulk->len++; + + return 0; +}; diff --git a/fluent-bit/plugins/out_es/es_bulk.h b/fluent-bit/plugins/out_es/es_bulk.h new file mode 100644 index 00000000..7bb66dbb --- /dev/null +++ b/fluent-bit/plugins/out_es/es_bulk.h @@ -0,0 +1,46 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_ES_BULK_H +#define FLB_OUT_ES_BULK_H + +#include <inttypes.h> + +#define ES_BULK_CHUNK 4096 /* Size of buffer chunks */ +#define ES_BULK_HEADER 165 /* ES Bulk API prefix line */ +#define ES_BULK_INDEX_FMT "{\"%s\":{\"_index\":\"%s\",\"_type\":\"%s\"}}\n" +#define ES_BULK_INDEX_FMT_ID "{\"%s\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\"}}\n" +#define ES_BULK_INDEX_FMT_WITHOUT_TYPE "{\"%s\":{\"_index\":\"%s\"}}\n" +#define ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE "{\"%s\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n" +#define ES_BULK_UPDATE_OP_BODY "{\"doc\":%s}" +#define ES_BULK_UPSERT_OP_BODY "{\"doc_as_upsert\":true,\"doc\":%s}" + +struct es_bulk { + char *ptr; + uint32_t len; + uint32_t size; +}; + +struct es_bulk *es_bulk_create(size_t estimated_size); +int es_bulk_append(struct es_bulk *bulk, char *index, int i_len, + char *json, size_t j_len, + size_t whole_size, size_t curr_size); +void es_bulk_destroy(struct es_bulk *bulk); + +#endif diff --git a/fluent-bit/plugins/out_es/es_conf.c b/fluent-bit/plugins/out_es/es_conf.c new file mode 100644 index 00000000..48c8c3e2 --- /dev/null +++ b/fluent-bit/plugins/out_es/es_conf.c @@ -0,0 +1,537 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_record_accessor.h> +#include <fluent-bit/flb_signv4.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_base64.h> + +#include "es.h" +#include "es_conf.h" + +/* + * extract_cloud_host extracts the public hostname + * of a deployment from a Cloud ID string. + * + * The Cloud ID string has the format "<deployment_name>:<base64_info>". + * Once decoded, the "base64_info" string has the format "<deployment_region>$<elasticsearch_hostname>$<kibana_hostname>" + * and the function returns "<elasticsearch_hostname>.<deployment_region>" token. + */ +static flb_sds_t extract_cloud_host(struct flb_elasticsearch *ctx, + const char *cloud_id) +{ + + char *colon; + char *region; + char *host; + char *port = NULL; + char buf[256] = {0}; + char cloud_host_buf[256] = {0}; + const char dollar[2] = "$"; + size_t len; + int ret; + + /* keep only part after first ":" */ + colon = strchr(cloud_id, ':'); + if (colon == NULL) { + return NULL; + } + colon++; + + /* decode base64 */ + ret = flb_base64_decode((unsigned char *)buf, sizeof(buf), &len, (unsigned char *)colon, strlen(colon)); + if (ret) { + flb_plg_error(ctx->ins, "cannot decode cloud_id"); + return NULL; + } + region = strtok(buf, dollar); + if (region == NULL) { + return NULL; + } + host = strtok(NULL, dollar); + if (host == NULL) { + return NULL; + } + + /* + * Some cloud id format is "<deployment_region>$<elasticsearch_hostname>:<port>$<kibana_hostname>" . + * e.g. https://github.com/elastic/beats/blob/v8.4.1/libbeat/cloudid/cloudid_test.go#L60 + * + * It means the variable "host" can contains ':' and port number. + */ + colon = strchr(host, ':'); + if (colon != NULL) { + /* host contains host number */ + *colon = '\0'; /* remove port number from host */ + port = colon+1; + } + + strcpy(cloud_host_buf, host); + strcat(cloud_host_buf, "."); + strcat(cloud_host_buf, region); + if (port != NULL) { + strcat(cloud_host_buf, ":"); + strcat(cloud_host_buf, port); + } + return flb_sds_create(cloud_host_buf); +} + +/* + * set_cloud_credentials gets a cloud_auth + * and sets the context's cloud_user and cloud_passwd. + * Example: + * cloud_auth = elastic:ZXVyb3BxxxxxxZTA1Ng + * ----> + * cloud_user = elastic + * cloud_passwd = ZXVyb3BxxxxxxZTA1Ng + */ +static void set_cloud_credentials(struct flb_elasticsearch *ctx, + const char *cloud_auth) +{ + /* extract strings */ + int items = 0; + struct mk_list *toks; + struct mk_list *head; + struct flb_split_entry *entry; + toks = flb_utils_split((const char *)cloud_auth, ':', -1); + mk_list_foreach(head, toks) { + items++; + entry = mk_list_entry(head, struct flb_split_entry, _head); + if (items == 1) { + ctx->cloud_user = flb_strdup(entry->value); + } + if (items == 2) { + ctx->cloud_passwd = flb_strdup(entry->value); + } + } + flb_utils_split_free(toks); +} + +struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int len; + int io_flags = 0; + ssize_t ret; + char *buf; + const char *tmp; + const char *path; +#ifdef FLB_HAVE_AWS + char *aws_role_arn = NULL; + char *aws_external_id = NULL; + char *aws_session_name = NULL; +#endif + char *cloud_port_char; + char *cloud_host = NULL; + int cloud_host_port = 0; + int cloud_port = FLB_ES_DEFAULT_HTTPS_PORT; + struct flb_uri *uri = ins->host.uri; + struct flb_uri_field *f_index = NULL; + struct flb_uri_field *f_type = NULL; + struct flb_upstream *upstream; + struct flb_elasticsearch *ctx; + + /* Allocate context */ + ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + + if (uri) { + if (uri->count >= 2) { + f_index = flb_uri_get(uri, 0); + f_type = flb_uri_get(uri, 1); + } + } + + /* handle cloud_id */ + tmp = flb_output_get_property("cloud_id", ins); + if (tmp) { + cloud_host = extract_cloud_host(ctx, tmp); + if (cloud_host == NULL) { + flb_plg_error(ctx->ins, "cannot extract cloud_host"); + flb_es_conf_destroy(ctx); + return NULL; + } + flb_plg_debug(ctx->ins, "extracted cloud_host: '%s'", cloud_host); + + cloud_port_char = strchr(cloud_host, ':'); + + if (cloud_port_char == NULL) { + flb_plg_debug(ctx->ins, "cloud_host: '%s' does not contain a port: '%s'", cloud_host, cloud_host); + } + else { + cloud_port_char[0] = '\0'; + cloud_port_char = &cloud_port_char[1]; + flb_plg_debug(ctx->ins, "extracted cloud_port_char: '%s'", cloud_port_char); + cloud_host_port = (int) strtol(cloud_port_char, (char **) NULL, 10); + flb_plg_debug(ctx->ins, "converted cloud_port_char to port int: '%i'", cloud_host_port); + } + + if (cloud_host_port == 0) { + cloud_host_port = cloud_port; + } + + flb_plg_debug(ctx->ins, + "checked whether extracted port was null and set it to " + "default https port or not. Outcome: '%i' and cloud_host: '%s'.", + cloud_host_port, cloud_host); + + if (ins->host.name != NULL) { + flb_sds_destroy(ins->host.name); + } + + ins->host.name = cloud_host; + ins->host.port = cloud_host_port; + } + + /* Set default network configuration */ + flb_output_net_default("127.0.0.1", 9200, ins); + + /* Populate context with config map defaults and incoming properties */ + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "configuration error"); + flb_es_conf_destroy(ctx); + return NULL; + } + + /* handle cloud_auth */ + tmp = flb_output_get_property("cloud_auth", ins); + if (tmp) { + set_cloud_credentials(ctx, tmp); + } + + /* use TLS ? */ + if (ins->use_tls == FLB_TRUE) { + io_flags = FLB_IO_TLS; + } + else { + io_flags = FLB_IO_TCP; + } + + if (ins->host.ipv6 == FLB_TRUE) { + io_flags |= FLB_IO_IPV6; + } + + /* Compress (gzip) */ + tmp = flb_output_get_property("compress", ins); + ctx->compress_gzip = FLB_FALSE; + if (tmp) { + if (strcasecmp(tmp, "gzip") == 0) { + ctx->compress_gzip = FLB_TRUE; + } + } + + /* Prepare an upstream handler */ + upstream = flb_upstream_create(config, + ins->host.name, + ins->host.port, + io_flags, + ins->tls); + if (!upstream) { + flb_plg_error(ctx->ins, "cannot create Upstream context"); + flb_es_conf_destroy(ctx); + return NULL; + } + ctx->u = upstream; + + /* Set instance flags into upstream */ + flb_output_upstream_set(ctx->u, ins); + + /* Set manual Index and Type */ + if (f_index) { + ctx->index = flb_strdup(f_index->value); /* FIXME */ + } + + if (f_type) { + ctx->type = flb_strdup(f_type->value); /* FIXME */ + } + + /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ + if (ctx->buffer_size == -1) { + ctx->buffer_size = 0; + } + + /* Elasticsearch: Path */ + path = flb_output_get_property("path", ins); + if (!path) { + path = ""; + } + + /* Elasticsearch: Pipeline */ + tmp = flb_output_get_property("pipeline", ins); + if (tmp) { + snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + } + else { + snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); + } + + if (ctx->id_key) { + ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE); + if (ctx->ra_id_key == NULL) { + flb_plg_error(ins, "could not create record accessor for Id Key"); + } + if (ctx->generate_id == FLB_TRUE) { + flb_plg_warn(ins, "Generate_ID is ignored when ID_key is set"); + ctx->generate_id = FLB_FALSE; + } + } + + if (ctx->write_operation) { + if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_INDEX) == 0) { + ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_INDEX); + } + else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_CREATE) == 0) { + ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_CREATE); + } + else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0 + || strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) { + ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_UPDATE); + } + else { + flb_plg_error(ins, "wrong Write_Operation (should be one of index, create, update, upsert)"); + flb_es_conf_destroy(ctx); + return NULL; + } + if (strcasecmp(ctx->es_action, FLB_ES_WRITE_OP_UPDATE) == 0 + && !ctx->ra_id_key && ctx->generate_id == FLB_FALSE) { + flb_plg_error(ins, "Id_Key or Generate_Id must be set when Write_Operation update or upsert"); + flb_es_conf_destroy(ctx); + return NULL; + } + } + + if (ctx->logstash_prefix_key) { + if (ctx->logstash_prefix_key[0] != '$') { + len = flb_sds_len(ctx->logstash_prefix_key); + buf = flb_malloc(len + 2); + if (!buf) { + flb_errno(); + flb_es_conf_destroy(ctx); + return NULL; + } + buf[0] = '$'; + memcpy(buf + 1, ctx->logstash_prefix_key, len); + buf[len + 1] = '\0'; + + ctx->ra_prefix_key = flb_ra_create(buf, FLB_TRUE); + flb_free(buf); + } + else { + ctx->ra_prefix_key = flb_ra_create(ctx->logstash_prefix_key, FLB_TRUE); + } + + if (!ctx->ra_prefix_key) { + flb_plg_error(ins, "invalid logstash_prefix_key pattern '%s'", tmp); + flb_es_conf_destroy(ctx); + return NULL; + } + } + +#ifdef FLB_HAVE_AWS + /* AWS Auth Unsigned Headers */ + ctx->aws_unsigned_headers = flb_malloc(sizeof(struct mk_list)); + if (ret != 0) { + flb_es_conf_destroy(ctx); + } + flb_slist_create(ctx->aws_unsigned_headers); + ret = flb_slist_add(ctx->aws_unsigned_headers, "Content-Length"); + if (ret != 0) { + flb_es_conf_destroy(ctx); + return NULL; + } + + /* AWS Auth */ + ctx->has_aws_auth = FLB_FALSE; + tmp = flb_output_get_property("aws_auth", ins); + if (tmp) { + if (strncasecmp(tmp, "On", 2) == 0) { + ctx->has_aws_auth = FLB_TRUE; + flb_debug("[out_es] Enabled AWS Auth"); + + /* AWS provider needs a separate TLS instance */ + ctx->aws_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (!ctx->aws_tls) { + flb_errno(); + flb_es_conf_destroy(ctx); + return NULL; + } + + tmp = flb_output_get_property("aws_region", ins); + if (!tmp) { + flb_error("[out_es] aws_auth enabled but aws_region not set"); + flb_es_conf_destroy(ctx); + return NULL; + } + ctx->aws_region = (char *) tmp; + + tmp = flb_output_get_property("aws_sts_endpoint", ins); + if (tmp) { + ctx->aws_sts_endpoint = (char *) tmp; + } + + ctx->aws_provider = flb_standard_chain_provider_create(config, + ctx->aws_tls, + ctx->aws_region, + ctx->aws_sts_endpoint, + NULL, + flb_aws_client_generator(), + ctx->aws_profile); + if (!ctx->aws_provider) { + flb_error("[out_es] Failed to create AWS Credential Provider"); + flb_es_conf_destroy(ctx); + return NULL; + } + + tmp = flb_output_get_property("aws_role_arn", ins); + if (tmp) { + /* Use the STS Provider */ + ctx->base_aws_provider = ctx->aws_provider; + aws_role_arn = (char *) tmp; + aws_external_id = NULL; + tmp = flb_output_get_property("aws_external_id", ins); + if (tmp) { + aws_external_id = (char *) tmp; + } + + aws_session_name = flb_sts_session_name(); + if (!aws_session_name) { + flb_error("[out_es] Failed to create aws iam role " + "session name"); + flb_es_conf_destroy(ctx); + return NULL; + } + + /* STS provider needs yet another separate TLS instance */ + ctx->aws_sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (!ctx->aws_sts_tls) { + flb_errno(); + flb_es_conf_destroy(ctx); + return NULL; + } + + ctx->aws_provider = flb_sts_provider_create(config, + ctx->aws_sts_tls, + ctx-> + base_aws_provider, + aws_external_id, + aws_role_arn, + aws_session_name, + ctx->aws_region, + ctx->aws_sts_endpoint, + NULL, + flb_aws_client_generator()); + /* Session name can be freed once provider is created */ + flb_free(aws_session_name); + if (!ctx->aws_provider) { + flb_error("[out_es] Failed to create AWS STS Credential " + "Provider"); + flb_es_conf_destroy(ctx); + return NULL; + } + + } + + /* initialize credentials in sync mode */ + ctx->aws_provider->provider_vtable->sync(ctx->aws_provider); + ctx->aws_provider->provider_vtable->init(ctx->aws_provider); + /* set back to async */ + ctx->aws_provider->provider_vtable->async(ctx->aws_provider); + ctx->aws_provider->provider_vtable->upstream_set(ctx->aws_provider, ctx->ins); + } + } +#endif + + return ctx; +} + +int flb_es_conf_destroy(struct flb_elasticsearch *ctx) +{ + if (!ctx) { + return 0; + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + if (ctx->ra_id_key) { + flb_ra_destroy(ctx->ra_id_key); + ctx->ra_id_key = NULL; + } + if (ctx->es_action) { + flb_free(ctx->es_action); + } + +#ifdef FLB_HAVE_AWS + if (ctx->base_aws_provider) { + flb_aws_provider_destroy(ctx->base_aws_provider); + } + + if (ctx->aws_provider) { + flb_aws_provider_destroy(ctx->aws_provider); + } + + if (ctx->aws_tls) { + flb_tls_destroy(ctx->aws_tls); + } + + if (ctx->aws_sts_tls) { + flb_tls_destroy(ctx->aws_sts_tls); + } + + if (ctx->aws_unsigned_headers) { + flb_slist_destroy(ctx->aws_unsigned_headers); + flb_free(ctx->aws_unsigned_headers); + } +#endif + + if (ctx->ra_prefix_key) { + flb_ra_destroy(ctx->ra_prefix_key); + } + + flb_free(ctx->cloud_passwd); + flb_free(ctx->cloud_user); + flb_free(ctx); + + return 0; +} diff --git a/fluent-bit/plugins/out_es/es_conf.h b/fluent-bit/plugins/out_es/es_conf.h new file mode 100644 index 00000000..3c421bec --- /dev/null +++ b/fluent-bit/plugins/out_es/es_conf.h @@ -0,0 +1,33 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_ES_CONF_H +#define FLB_OUT_ES_CONF_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_config.h> + +#include "es.h" + +struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, + struct flb_config *config); +int flb_es_conf_destroy(struct flb_elasticsearch *ctx); + +#endif diff --git a/fluent-bit/plugins/out_es/murmur3.c b/fluent-bit/plugins/out_es/murmur3.c new file mode 100644 index 00000000..8fccb6de --- /dev/null +++ b/fluent-bit/plugins/out_es/murmur3.c @@ -0,0 +1,314 @@ +//----------------------------------------------------------------------------- +// MurmurHash3 was written by Austin Appleby, and is placed in the public +// domain. The author hereby disclaims copyright to this source code. + +// Note - The x86 and x64 versions do _not_ produce the same results, as the +// algorithms are optimized for their respective platforms. You can still +// compile and run any of them on any platform, but your performance with the +// non-native version will be less than optimal. + +#include "murmur3.h" + +//----------------------------------------------------------------------------- +// Platform-specific functions and macros + +#ifdef __GNUC__ +#define FORCE_INLINE __attribute__((always_inline)) inline +#else +#define FORCE_INLINE inline +#endif + +static FORCE_INLINE uint32_t rotl32 ( uint32_t x, int8_t r ) +{ + return (x << r) | (x >> (32 - r)); +} + +static FORCE_INLINE uint64_t rotl64 ( uint64_t x, int8_t r ) +{ + return (x << r) | (x >> (64 - r)); +} + +#define ROTL32(x,y) rotl32(x,y) +#define ROTL64(x,y) rotl64(x,y) + +#define BIG_CONSTANT(x) (x##LLU) + +//----------------------------------------------------------------------------- +// Block read - if your platform needs to do endian-swapping or can only +// handle aligned reads, do the conversion here + +#define getblock(p, i) (p[i]) + +//----------------------------------------------------------------------------- +// Finalization mix - force all bits of a hash block to avalanche + +static FORCE_INLINE uint32_t fmix32 ( uint32_t h ) +{ + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + + return h; +} + +//---------- + +static FORCE_INLINE uint64_t fmix64 ( uint64_t k ) +{ + k ^= k >> 33; + k *= BIG_CONSTANT(0xff51afd7ed558ccd); + k ^= k >> 33; + k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53); + k ^= k >> 33; + + return k; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x86_32 ( const void * key, int len, + uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 4; + int i; + + uint32_t h1 = seed; + + uint32_t c1 = 0xcc9e2d51; + uint32_t c2 = 0x1b873593; + + //---------- + // body + + const uint32_t * blocks = (const uint32_t *)(data + nblocks*4); + + for(i = -nblocks; i; i++) + { + uint32_t k1 = getblock(blocks,i); + + k1 *= c1; + k1 = ROTL32(k1,15); + k1 *= c2; + + h1 ^= k1; + h1 = ROTL32(h1,13); + h1 = h1*5+0xe6546b64; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*4); + + uint32_t k1 = 0; + + switch(len & 3) + { + case 3: k1 ^= tail[2] << 16; + case 2: k1 ^= tail[1] << 8; + case 1: k1 ^= tail[0]; + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; + + h1 = fmix32(h1); + + *(uint32_t*)out = h1; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x86_128 ( const void * key, const int len, + uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 16; + int i; + + uint32_t h1 = seed; + uint32_t h2 = seed; + uint32_t h3 = seed; + uint32_t h4 = seed; + + uint32_t c1 = 0x239b961b; + uint32_t c2 = 0xab0e9789; + uint32_t c3 = 0x38b34ae5; + uint32_t c4 = 0xa1e38b93; + + //---------- + // body + + const uint32_t * blocks = (const uint32_t *)(data + nblocks*16); + + for(i = -nblocks; i; i++) + { + uint32_t k1 = getblock(blocks,i*4+0); + uint32_t k2 = getblock(blocks,i*4+1); + uint32_t k3 = getblock(blocks,i*4+2); + uint32_t k4 = getblock(blocks,i*4+3); + + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + + h1 = ROTL32(h1,19); h1 += h2; h1 = h1*5+0x561ccd1b; + + k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2; + + h2 = ROTL32(h2,17); h2 += h3; h2 = h2*5+0x0bcaa747; + + k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3; + + h3 = ROTL32(h3,15); h3 += h4; h3 = h3*5+0x96cd1c35; + + k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4; + + h4 = ROTL32(h4,13); h4 += h1; h4 = h4*5+0x32ac3b17; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*16); + + uint32_t k1 = 0; + uint32_t k2 = 0; + uint32_t k3 = 0; + uint32_t k4 = 0; + + switch(len & 15) + { + case 15: k4 ^= tail[14] << 16; + case 14: k4 ^= tail[13] << 8; + case 13: k4 ^= tail[12] << 0; + k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4; + + case 12: k3 ^= tail[11] << 24; + case 11: k3 ^= tail[10] << 16; + case 10: k3 ^= tail[ 9] << 8; + case 9: k3 ^= tail[ 8] << 0; + k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3; + + case 8: k2 ^= tail[ 7] << 24; + case 7: k2 ^= tail[ 6] << 16; + case 6: k2 ^= tail[ 5] << 8; + case 5: k2 ^= tail[ 4] << 0; + k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2; + + case 4: k1 ^= tail[ 3] << 24; + case 3: k1 ^= tail[ 2] << 16; + case 2: k1 ^= tail[ 1] << 8; + case 1: k1 ^= tail[ 0] << 0; + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; h2 ^= len; h3 ^= len; h4 ^= len; + + h1 += h2; h1 += h3; h1 += h4; + h2 += h1; h3 += h1; h4 += h1; + + h1 = fmix32(h1); + h2 = fmix32(h2); + h3 = fmix32(h3); + h4 = fmix32(h4); + + h1 += h2; h1 += h3; h1 += h4; + h2 += h1; h3 += h1; h4 += h1; + + ((uint32_t*)out)[0] = h1; + ((uint32_t*)out)[1] = h2; + ((uint32_t*)out)[2] = h3; + ((uint32_t*)out)[3] = h4; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x64_128 ( const void * key, const int len, + const uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 16; + int i; + + uint64_t h1 = seed; + uint64_t h2 = seed; + + uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5); + uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f); + + //---------- + // body + + const uint64_t * blocks = (const uint64_t *)(data); + + for(i = 0; i < nblocks; i++) + { + uint64_t k1 = getblock(blocks,i*2+0); + uint64_t k2 = getblock(blocks,i*2+1); + + k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1; + + h1 = ROTL64(h1,27); h1 += h2; h1 = h1*5+0x52dce729; + + k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2; + + h2 = ROTL64(h2,31); h2 += h1; h2 = h2*5+0x38495ab5; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*16); + + uint64_t k1 = 0; + uint64_t k2 = 0; + + switch(len & 15) + { + case 15: k2 ^= (uint64_t)(tail[14]) << 48; + case 14: k2 ^= (uint64_t)(tail[13]) << 40; + case 13: k2 ^= (uint64_t)(tail[12]) << 32; + case 12: k2 ^= (uint64_t)(tail[11]) << 24; + case 11: k2 ^= (uint64_t)(tail[10]) << 16; + case 10: k2 ^= (uint64_t)(tail[ 9]) << 8; + case 9: k2 ^= (uint64_t)(tail[ 8]) << 0; + k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2; + + case 8: k1 ^= (uint64_t)(tail[ 7]) << 56; + case 7: k1 ^= (uint64_t)(tail[ 6]) << 48; + case 6: k1 ^= (uint64_t)(tail[ 5]) << 40; + case 5: k1 ^= (uint64_t)(tail[ 4]) << 32; + case 4: k1 ^= (uint64_t)(tail[ 3]) << 24; + case 3: k1 ^= (uint64_t)(tail[ 2]) << 16; + case 2: k1 ^= (uint64_t)(tail[ 1]) << 8; + case 1: k1 ^= (uint64_t)(tail[ 0]) << 0; + k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; h2 ^= len; + + h1 += h2; + h2 += h1; + + h1 = fmix64(h1); + h2 = fmix64(h2); + + h1 += h2; + h2 += h1; + + ((uint64_t*)out)[0] = h1; + ((uint64_t*)out)[1] = h2; +} + +//----------------------------------------------------------------------------- diff --git a/fluent-bit/plugins/out_es/murmur3.h b/fluent-bit/plugins/out_es/murmur3.h new file mode 100644 index 00000000..c85395a1 --- /dev/null +++ b/fluent-bit/plugins/out_es/murmur3.h @@ -0,0 +1,29 @@ +//----------------------------------------------------------------------------- +// MurmurHash3 was written by Austin Appleby, and is placed in the +// public domain. The author hereby disclaims copyright to this source +// code. + +#ifndef _MURMURHASH3_H_ +#define _MURMURHASH3_H_ + +#include <stdint.h> + +#ifdef __cplusplus +extern "C" { +#endif + +//----------------------------------------------------------------------------- + +void MurmurHash3_x86_32 (const void *key, int len, uint32_t seed, void *out); + +void MurmurHash3_x86_128(const void *key, int len, uint32_t seed, void *out); + +void MurmurHash3_x64_128(const void *key, int len, uint32_t seed, void *out); + +//----------------------------------------------------------------------------- + +#ifdef __cplusplus +} +#endif + +#endif // _MURMURHASH3_H_
\ No newline at end of file |