From 58daab21cd043e1dc37024a7f99b396788372918 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 9 Mar 2024 14:19:48 +0100 Subject: Merging upstream version 1.44.3. Signed-off-by: Daniel Baumann --- fluent-bit/plugins/out_kafka/kafka.c | 658 +++++++++++++++++++++++++++++++++++ 1 file changed, 658 insertions(+) create mode 100644 fluent-bit/plugins/out_kafka/kafka.c (limited to 'fluent-bit/plugins/out_kafka/kafka.c') diff --git a/fluent-bit/plugins/out_kafka/kafka.c b/fluent-bit/plugins/out_kafka/kafka.c new file mode 100644 index 000000000..ff700a687 --- /dev/null +++ b/fluent-bit/plugins/out_kafka/kafka.c @@ -0,0 +1,658 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "kafka_config.h" +#include "kafka_topic.h" + +void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, + void *opaque) +{ + struct flb_out_kafka *ctx = (struct flb_out_kafka *) opaque; + + if (rkmessage->err) { + flb_plg_warn(ctx->ins, "message delivery failed: %s", + rd_kafka_err2str(rkmessage->err)); + } + else { + flb_plg_debug(ctx->ins, "message delivered (%zd bytes, " + "partition %"PRId32")", + rkmessage->len, rkmessage->partition); + } +} + +void cb_kafka_logger(const rd_kafka_t *rk, int level, + const char *fac, const char *buf) +{ + struct flb_out_kafka *ctx; + + ctx = (struct flb_out_kafka *) rd_kafka_opaque(rk); + + if (level <= FLB_KAFKA_LOG_ERR) { + flb_plg_error(ctx->ins, "%s: %s", + rk ? rd_kafka_name(rk) : NULL, buf); + } + else if (level == FLB_KAFKA_LOG_WARNING) { + flb_plg_warn(ctx->ins, "%s: %s", + rk ? rd_kafka_name(rk) : NULL, buf); + } + else if (level == FLB_KAFKA_LOG_NOTICE || level == FLB_KAFKA_LOG_INFO) { + flb_plg_info(ctx->ins, "%s: %s", + rk ? rd_kafka_name(rk) : NULL, buf); + } + else if (level == FLB_KAFKA_LOG_DEBUG) { + flb_plg_debug(ctx->ins, "%s: %s", + rk ? rd_kafka_name(rk) : NULL, buf); + } +} + +static int cb_kafka_init(struct flb_output_instance *ins, + struct flb_config *config, + void *data) +{ + struct flb_out_kafka *ctx; + + /* Configuration */ + ctx = flb_out_kafka_create(ins, config); + if (!ctx) { + flb_plg_error(ins, "failed to initialize"); + return -1; + } + + /* Set global context */ + flb_output_set_context(ins, ctx); + return 0; +} + +int produce_message(struct flb_time *tm, msgpack_object *map, + struct flb_out_kafka *ctx, struct flb_config *config) +{ + int i; + int ret; + int size; + int queue_full_retries = 0; + char *out_buf; + size_t out_size; + struct mk_list *head; + struct mk_list *topics; + struct flb_split_entry *entry; + char *dynamic_topic; + char *message_key = NULL; + size_t message_key_len = 0; + struct flb_kafka_topic *topic = NULL; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + msgpack_object key; + msgpack_object val; + flb_sds_t s; + +#ifdef FLB_HAVE_AVRO_ENCODER + // used to flag when a buffer needs to be freed for avro + bool avro_fast_buffer = true; + + // avro encoding uses a buffer + // the majority of lines are fairly small + // so using static buffer for these is much more efficient + // larger sizes will allocate +#ifndef AVRO_DEFAULT_BUFFER_SIZE +#define AVRO_DEFAULT_BUFFER_SIZE 2048 +#endif + static char avro_buff[AVRO_DEFAULT_BUFFER_SIZE]; + + // don't take lines that are too large + // these lines will log a warning + // this roughly a log line of 250000 chars +#ifndef AVRO_LINE_MAX_LEN +#define AVRO_LINE_MAX_LEN 1000000 + + // this is a convenience +#define AVRO_FREE(X, Y) if (!X) { flb_free(Y); } +#endif + + // this is just to keep the code cleaner + // the avro encoding includes + // an embedded schemaid which is used + // the embedding is a null byte + // followed by a 16 byte schemaid +#define AVRO_SCHEMA_OVERHEAD 16 + 1 +#endif + + flb_debug("in produce_message\n"); + if (flb_log_check(FLB_LOG_DEBUG)) + msgpack_object_print(stderr, *map); + + /* Init temporal buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) { + /* Make room for the timestamp */ + size = map->via.map.size + 1; + msgpack_pack_map(&mp_pck, size); + + /* Pack timestamp */ + msgpack_pack_str(&mp_pck, ctx->timestamp_key_len); + msgpack_pack_str_body(&mp_pck, + ctx->timestamp_key, ctx->timestamp_key_len); + switch (ctx->timestamp_format) { + case FLB_JSON_DATE_DOUBLE: + msgpack_pack_double(&mp_pck, flb_time_to_double(tm)); + break; + + case FLB_JSON_DATE_ISO8601: + case FLB_JSON_DATE_ISO8601_NS: + { + size_t date_len; + int len; + struct tm _tm; + char time_formatted[36]; + + /* Format the time; use microsecond precision (not nanoseconds). */ + gmtime_r(&tm->tm.tv_sec, &_tm); + date_len = strftime(time_formatted, sizeof(time_formatted) - 1, + FLB_JSON_DATE_ISO8601_FMT, &_tm); + + if (ctx->timestamp_format == FLB_JSON_DATE_ISO8601) { + len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len, + ".%06" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec / 1000); + } + else { + /* FLB_JSON_DATE_ISO8601_NS */ + len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len, + ".%09" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec); + } + date_len += len; + + msgpack_pack_str(&mp_pck, date_len); + msgpack_pack_str_body(&mp_pck, time_formatted, date_len); + } + break; + } + } + else { + size = map->via.map.size; + msgpack_pack_map(&mp_pck, size); + } + + for (i = 0; i < map->via.map.size; i++) { + key = map->via.map.ptr[i].key; + val = map->via.map.ptr[i].val; + + msgpack_pack_object(&mp_pck, key); + msgpack_pack_object(&mp_pck, val); + + /* Lookup message key */ + if (ctx->message_key_field && !message_key && val.type == MSGPACK_OBJECT_STR) { + if (key.via.str.size == ctx->message_key_field_len && + strncmp(key.via.str.ptr, ctx->message_key_field, ctx->message_key_field_len) == 0) { + message_key = (char *) val.via.str.ptr; + message_key_len = val.via.str.size; + } + } + + /* Lookup key/topic */ + if (ctx->topic_key && !topic && val.type == MSGPACK_OBJECT_STR) { + if (key.via.str.size == ctx->topic_key_len && + strncmp(key.via.str.ptr, ctx->topic_key, ctx->topic_key_len) == 0) { + topic = flb_kafka_topic_lookup((char *) val.via.str.ptr, + val.via.str.size, ctx); + /* Add extracted topic on the fly to topiclist */ + if (ctx->dynamic_topic) { + /* Only if default topic is set and this topicname is not set for this message */ + if (strncmp(topic->name, flb_kafka_topic_default(ctx)->name, val.via.str.size) == 0 && + (strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) { + if (memchr(val.via.str.ptr, ',', val.via.str.size)) { + /* Don't allow commas in kafkatopic name */ + flb_warn("',' not allowed in dynamic_kafka topic names"); + continue; + } + if (val.via.str.size > 64) { + /* Don't allow length of dynamic kafka topics > 64 */ + flb_warn(" dynamic kafka topic length > 64 not allowed"); + continue; + } + dynamic_topic = flb_malloc(val.via.str.size + 1); + if (!dynamic_topic) { + /* Use default topic */ + flb_errno(); + continue; + } + strncpy(dynamic_topic, val.via.str.ptr, val.via.str.size); + dynamic_topic[val.via.str.size] = '\0'; + topics = flb_utils_split(dynamic_topic, ',', 0); + if (!topics) { + /* Use the default topic */ + flb_errno(); + flb_free(dynamic_topic); + continue; + } + mk_list_foreach(head, topics) { + /* Add the (one) found topicname to the topic configuration */ + entry = mk_list_entry(head, struct flb_split_entry, _head); + topic = flb_kafka_topic_create(entry->value, ctx); + if (!topic) { + /* Use default topic */ + flb_error("[out_kafka] cannot register topic '%s'", + entry->value); + topic = flb_kafka_topic_lookup((char *) val.via.str.ptr, + val.via.str.size, ctx); + } + else { + flb_info("[out_kafka] new topic added: %s", dynamic_topic); + } + } + flb_free(dynamic_topic); + } + } + } + } + } + + if (ctx->format == FLB_KAFKA_FMT_JSON) { + s = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + if (!s) { + flb_plg_error(ctx->ins, "error encoding to JSON"); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_ERROR; + } + out_buf = s; + out_size = flb_sds_len(out_buf); + } + else if (ctx->format == FLB_KAFKA_FMT_MSGP) { + out_buf = mp_sbuf.data; + out_size = mp_sbuf.size; + } + else if (ctx->format == FLB_KAFKA_FMT_GELF) { + s = flb_msgpack_raw_to_gelf(mp_sbuf.data, mp_sbuf.size, + tm, &(ctx->gelf_fields)); + if (s == NULL) { + flb_plg_error(ctx->ins, "error encoding to GELF"); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_ERROR; + } + out_buf = s; + out_size = flb_sds_len(s); + } +#ifdef FLB_HAVE_AVRO_ENCODER + else if (ctx->format == FLB_KAFKA_FMT_AVRO) { + + flb_plg_debug(ctx->ins, "avro schema ID:%s:\n", ctx->avro_fields.schema_id); + flb_plg_debug(ctx->ins, "avro schema string:%s:\n", ctx->avro_fields.schema_str); + + // if there's no data then log it and return + if (mp_sbuf.size == 0) { + flb_plg_error(ctx->ins, "got zero bytes decoding to avro AVRO:schemaID:%s:\n", ctx->avro_fields.schema_id); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_OK; + } + + // is the line is too long log it and return + if (mp_sbuf.size > AVRO_LINE_MAX_LEN) { + flb_plg_warn(ctx->ins, "skipping long line AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_LINE_MAX_LEN, ctx->avro_fields.schema_id); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_OK; + } + + flb_plg_debug(ctx->ins, "using default buffer AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_DEFAULT_BUFFER_SIZE, ctx->avro_fields.schema_id); + out_buf = avro_buff; + out_size = AVRO_DEFAULT_BUFFER_SIZE; + + if (mp_sbuf.size + AVRO_SCHEMA_OVERHEAD >= AVRO_DEFAULT_BUFFER_SIZE) { + flb_plg_info(ctx->ins, "upsizing to dynamic buffer AVRO:len:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, ctx->avro_fields.schema_id); + avro_fast_buffer = false; + // avro will always be smaller than msgpack + // it contains no meta-info aside from the schemaid + // all the metadata is in the schema which is not part of the msg + // add schemaid + magic byte for safety buffer and allocate + // that's 16 byte schemaid and one byte magic byte + out_size = mp_sbuf.size + AVRO_SCHEMA_OVERHEAD; + out_buf = flb_malloc(out_size); + if (!out_buf) { + flb_plg_error(ctx->ins, "error allocating memory for decoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_ERROR; + } + } + + if(!flb_msgpack_raw_to_avro_sds(mp_sbuf.data, mp_sbuf.size, &ctx->avro_fields, out_buf, &out_size)) { + flb_plg_error(ctx->ins, "error encoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id); + msgpack_sbuffer_destroy(&mp_sbuf); + if (!avro_fast_buffer) { + flb_free(out_buf); + } + return FLB_ERROR; + } + + } +#endif + + if (!message_key) { + message_key = ctx->message_key; + message_key_len = ctx->message_key_len; + } + + if (!topic) { + topic = flb_kafka_topic_default(ctx); + } + if (!topic) { + flb_plg_error(ctx->ins, "no default topic found"); + msgpack_sbuffer_destroy(&mp_sbuf); +#ifdef FLB_HAVE_AVRO_ENCODER + if (ctx->format == FLB_KAFKA_FMT_AVRO) { + AVRO_FREE(avro_fast_buffer, out_buf) + } +#endif + return FLB_ERROR; + } + + retry: + /* + * If the local rdkafka queue is full, we retry up to 'queue_full_retries' + * times set by the configuration (default: 10). If the configuration + * property was set to '0' or 'false', we don't impose a limit. Use that + * value under your own risk. + */ + if (ctx->queue_full_retries > 0 && + queue_full_retries >= ctx->queue_full_retries) { + if (ctx->format != FLB_KAFKA_FMT_MSGP) { + flb_sds_destroy(s); + } + msgpack_sbuffer_destroy(&mp_sbuf); +#ifdef FLB_HAVE_AVRO_ENCODER + if (ctx->format == FLB_KAFKA_FMT_AVRO) { + AVRO_FREE(avro_fast_buffer, out_buf) + } +#endif + /* + * Unblock the flush requests so that the + * engine could try sending data again. + */ + ctx->blocked = FLB_FALSE; + return FLB_RETRY; + } + + ret = rd_kafka_produce(topic->tp, + RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY, + out_buf, out_size, + message_key, message_key_len, + ctx); + + if (ret == -1) { + flb_error( + "%% Failed to produce to topic %s: %s\n", + rd_kafka_topic_name(topic->tp), + rd_kafka_err2str(rd_kafka_last_error())); + + /* + * rdkafka queue is full, keep trying 'locally' for a few seconds, + * otherwise let the caller to issue a main retry againt the engine. + */ + if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) { + flb_plg_warn(ctx->ins, + "internal queue is full, retrying in one second"); + + /* + * If the queue is full, first make sure to discard any further + * flush request from the engine. This means 'the caller will + * issue a retry at a later time'. + */ + ctx->blocked = FLB_TRUE; + + /* + * Next step is to give it some time to the background rdkafka + * library to do it own work. By default rdkafka wait 1 second + * or up to 10000 messages to be enqueued before delivery. + * + * If the kafka broker is down we should try a couple of times + * to enqueue this message, if we exceed 10 times, we just + * issue a full retry of the data chunk. + */ + flb_time_sleep(1000); + rd_kafka_poll(ctx->kafka.rk, 0); + + /* Issue a re-try */ + queue_full_retries++; + goto retry; + } + } + else { + flb_plg_debug(ctx->ins, "enqueued message (%zd bytes) for topic '%s'", + out_size, rd_kafka_topic_name(topic->tp)); + } + ctx->blocked = FLB_FALSE; + + rd_kafka_poll(ctx->kafka.rk, 0); + if (ctx->format == FLB_KAFKA_FMT_JSON) { + flb_sds_destroy(s); + } + if (ctx->format == FLB_KAFKA_FMT_GELF) { + flb_sds_destroy(s); + } +#ifdef FLB_HAVE_AVRO_ENCODER + if (ctx->format == FLB_KAFKA_FMT_AVRO) { + AVRO_FREE(avro_fast_buffer, out_buf) + } +#endif + + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_OK; +} + +static void cb_kafka_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + + int ret; + struct flb_out_kafka *ctx = out_context; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + /* + * If the context is blocked, means rdkafka queue is full and no more + * messages can be appended. For our called (Fluent Bit engine) means + * that is not possible to work on this now and it need to 'retry'. + */ + if (ctx->blocked == FLB_TRUE) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + ret = flb_log_event_decoder_init(&log_decoder, + (char *) event_chunk->data, + event_chunk->size); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Iterate the original buffer and perform adjustments */ + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + ret = produce_message(&log_event.timestamp, + log_event.body, + ctx, config); + + if (ret != FLB_OK) { + flb_log_event_decoder_destroy(&log_decoder); + + FLB_OUTPUT_RETURN(ret); + } + } + + flb_log_event_decoder_destroy(&log_decoder); + + FLB_OUTPUT_RETURN(FLB_OK); +} + +static void kafka_flush_force(struct flb_out_kafka *ctx, + struct flb_config *config) +{ + int ret; + + if (!ctx) { + return; + } + + if (ctx->kafka.rk) { + ret = rd_kafka_flush(ctx->kafka.rk, config->grace * 1000); + if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) { + flb_plg_warn(ctx->ins, "Failed to force flush: %s", + rd_kafka_err2str(ret)); + } + } +} + +static int cb_kafka_exit(void *data, struct flb_config *config) +{ + struct flb_out_kafka *ctx = data; + + kafka_flush_force(ctx, config); + flb_out_kafka_destroy(ctx); + return 0; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "topic_key", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, topic_key), + "Which record to use as the kafka topic." + }, + { + FLB_CONFIG_MAP_BOOL, "dynamic_topic", "false", + 0, FLB_TRUE, offsetof(struct flb_out_kafka, dynamic_topic), + "Activate dynamic topics." + }, + { + FLB_CONFIG_MAP_STR, "format", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, format_str), + "Set the record output format." + }, + { + FLB_CONFIG_MAP_STR, "message_key", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, message_key), + "Which record key to use as the message data." + }, + { + FLB_CONFIG_MAP_STR, "message_key_field", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, message_key_field), + "Which record key field to use as the message data." + }, + { + FLB_CONFIG_MAP_STR, "timestamp_key", FLB_KAFKA_TS_KEY, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_key), + "Set the key for the the timestamp." + }, + { + FLB_CONFIG_MAP_STR, "timestamp_format", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_format_str), + "Set the format the timestamp is in." + }, + { + FLB_CONFIG_MAP_INT, "queue_full_retries", FLB_KAFKA_QUEUE_FULL_RETRIES, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, queue_full_retries), + "Set the number of local retries to enqueue the data." + }, + { + FLB_CONFIG_MAP_STR, "gelf_timestamp_key", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the timestamp key for gelf output." + }, + { + FLB_CONFIG_MAP_STR, "gelf_host_key", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the host key for gelf output." + }, + { + FLB_CONFIG_MAP_STR, "gelf_short_message_key", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the short message key for gelf output." + }, + { + FLB_CONFIG_MAP_STR, "gelf_full_message_key", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the full message key for gelf output." + }, + { + FLB_CONFIG_MAP_STR, "gelf_level_key", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the level key for gelf output." + }, +#ifdef FLB_HAVE_AVRO_ENCODER + { + FLB_CONFIG_MAP_STR, "schema_str", (char *)NULL, + 0, FLB_FALSE, 0, + "Set AVRO schema." + }, + { + FLB_CONFIG_MAP_STR, "schema_id", (char *)NULL, + 0, FLB_FALSE, 0, + "Set AVRO schema ID." + }, +#endif + { + FLB_CONFIG_MAP_STR, "topics", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the kafka topics, delimited by commas." + }, + { + FLB_CONFIG_MAP_STR, "brokers", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the kafka brokers, delimited by commas." + }, + { + FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the kafka client_id." + }, + { + FLB_CONFIG_MAP_STR, "group_id", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the kafka group_id." + }, + { + FLB_CONFIG_MAP_STR_PREFIX, "rdkafka.", NULL, + //FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_kafka, rdkafka_opts), + 0, FLB_FALSE, 0, + "Set the kafka group_id." + }, + /* EOF */ + {0} +}; + +struct flb_output_plugin out_kafka_plugin = { + .name = "kafka", + .description = "Kafka", + .cb_init = cb_kafka_init, + .cb_flush = cb_kafka_flush, + .cb_exit = cb_kafka_exit, + .config_map = config_map, + .flags = 0 +}; -- cgit v1.2.3