summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/out_kafka/kafka.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/plugins/out_kafka/kafka.c
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/out_kafka/kafka.c')
-rw-r--r--fluent-bit/plugins/out_kafka/kafka.c658
1 files changed, 658 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_kafka/kafka.c b/fluent-bit/plugins/out_kafka/kafka.c
new file mode 100644
index 000000000..ff700a687
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka/kafka.c
@@ -0,0 +1,658 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+
+#include "kafka_config.h"
+#include "kafka_topic.h"
+
+void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
+ void *opaque)
+{
+ struct flb_out_kafka *ctx = (struct flb_out_kafka *) opaque;
+
+ if (rkmessage->err) {
+ flb_plg_warn(ctx->ins, "message delivery failed: %s",
+ rd_kafka_err2str(rkmessage->err));
+ }
+ else {
+ flb_plg_debug(ctx->ins, "message delivered (%zd bytes, "
+ "partition %"PRId32")",
+ rkmessage->len, rkmessage->partition);
+ }
+}
+
+void cb_kafka_logger(const rd_kafka_t *rk, int level,
+ const char *fac, const char *buf)
+{
+ struct flb_out_kafka *ctx;
+
+ ctx = (struct flb_out_kafka *) rd_kafka_opaque(rk);
+
+ if (level <= FLB_KAFKA_LOG_ERR) {
+ flb_plg_error(ctx->ins, "%s: %s",
+ rk ? rd_kafka_name(rk) : NULL, buf);
+ }
+ else if (level == FLB_KAFKA_LOG_WARNING) {
+ flb_plg_warn(ctx->ins, "%s: %s",
+ rk ? rd_kafka_name(rk) : NULL, buf);
+ }
+ else if (level == FLB_KAFKA_LOG_NOTICE || level == FLB_KAFKA_LOG_INFO) {
+ flb_plg_info(ctx->ins, "%s: %s",
+ rk ? rd_kafka_name(rk) : NULL, buf);
+ }
+ else if (level == FLB_KAFKA_LOG_DEBUG) {
+ flb_plg_debug(ctx->ins, "%s: %s",
+ rk ? rd_kafka_name(rk) : NULL, buf);
+ }
+}
+
+static int cb_kafka_init(struct flb_output_instance *ins,
+ struct flb_config *config,
+ void *data)
+{
+ struct flb_out_kafka *ctx;
+
+ /* Configuration */
+ ctx = flb_out_kafka_create(ins, config);
+ if (!ctx) {
+ flb_plg_error(ins, "failed to initialize");
+ return -1;
+ }
+
+ /* Set global context */
+ flb_output_set_context(ins, ctx);
+ return 0;
+}
+
+int produce_message(struct flb_time *tm, msgpack_object *map,
+ struct flb_out_kafka *ctx, struct flb_config *config)
+{
+ int i;
+ int ret;
+ int size;
+ int queue_full_retries = 0;
+ char *out_buf;
+ size_t out_size;
+ struct mk_list *head;
+ struct mk_list *topics;
+ struct flb_split_entry *entry;
+ char *dynamic_topic;
+ char *message_key = NULL;
+ size_t message_key_len = 0;
+ struct flb_kafka_topic *topic = NULL;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ msgpack_object key;
+ msgpack_object val;
+ flb_sds_t s;
+
+#ifdef FLB_HAVE_AVRO_ENCODER
+ // used to flag when a buffer needs to be freed for avro
+ bool avro_fast_buffer = true;
+
+ // avro encoding uses a buffer
+ // the majority of lines are fairly small
+ // so using static buffer for these is much more efficient
+ // larger sizes will allocate
+#ifndef AVRO_DEFAULT_BUFFER_SIZE
+#define AVRO_DEFAULT_BUFFER_SIZE 2048
+#endif
+ static char avro_buff[AVRO_DEFAULT_BUFFER_SIZE];
+
+ // don't take lines that are too large
+ // these lines will log a warning
+ // this roughly a log line of 250000 chars
+#ifndef AVRO_LINE_MAX_LEN
+#define AVRO_LINE_MAX_LEN 1000000
+
+ // this is a convenience
+#define AVRO_FREE(X, Y) if (!X) { flb_free(Y); }
+#endif
+
+ // this is just to keep the code cleaner
+ // the avro encoding includes
+ // an embedded schemaid which is used
+ // the embedding is a null byte
+ // followed by a 16 byte schemaid
+#define AVRO_SCHEMA_OVERHEAD 16 + 1
+#endif
+
+ flb_debug("in produce_message\n");
+ if (flb_log_check(FLB_LOG_DEBUG))
+ msgpack_object_print(stderr, *map);
+
+ /* Init temporal buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) {
+ /* Make room for the timestamp */
+ size = map->via.map.size + 1;
+ msgpack_pack_map(&mp_pck, size);
+
+ /* Pack timestamp */
+ msgpack_pack_str(&mp_pck, ctx->timestamp_key_len);
+ msgpack_pack_str_body(&mp_pck,
+ ctx->timestamp_key, ctx->timestamp_key_len);
+ switch (ctx->timestamp_format) {
+ case FLB_JSON_DATE_DOUBLE:
+ msgpack_pack_double(&mp_pck, flb_time_to_double(tm));
+ break;
+
+ case FLB_JSON_DATE_ISO8601:
+ case FLB_JSON_DATE_ISO8601_NS:
+ {
+ size_t date_len;
+ int len;
+ struct tm _tm;
+ char time_formatted[36];
+
+ /* Format the time; use microsecond precision (not nanoseconds). */
+ gmtime_r(&tm->tm.tv_sec, &_tm);
+ date_len = strftime(time_formatted, sizeof(time_formatted) - 1,
+ FLB_JSON_DATE_ISO8601_FMT, &_tm);
+
+ if (ctx->timestamp_format == FLB_JSON_DATE_ISO8601) {
+ len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len,
+ ".%06" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec / 1000);
+ }
+ else {
+ /* FLB_JSON_DATE_ISO8601_NS */
+ len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len,
+ ".%09" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec);
+ }
+ date_len += len;
+
+ msgpack_pack_str(&mp_pck, date_len);
+ msgpack_pack_str_body(&mp_pck, time_formatted, date_len);
+ }
+ break;
+ }
+ }
+ else {
+ size = map->via.map.size;
+ msgpack_pack_map(&mp_pck, size);
+ }
+
+ for (i = 0; i < map->via.map.size; i++) {
+ key = map->via.map.ptr[i].key;
+ val = map->via.map.ptr[i].val;
+
+ msgpack_pack_object(&mp_pck, key);
+ msgpack_pack_object(&mp_pck, val);
+
+ /* Lookup message key */
+ if (ctx->message_key_field && !message_key && val.type == MSGPACK_OBJECT_STR) {
+ if (key.via.str.size == ctx->message_key_field_len &&
+ strncmp(key.via.str.ptr, ctx->message_key_field, ctx->message_key_field_len) == 0) {
+ message_key = (char *) val.via.str.ptr;
+ message_key_len = val.via.str.size;
+ }
+ }
+
+ /* Lookup key/topic */
+ if (ctx->topic_key && !topic && val.type == MSGPACK_OBJECT_STR) {
+ if (key.via.str.size == ctx->topic_key_len &&
+ strncmp(key.via.str.ptr, ctx->topic_key, ctx->topic_key_len) == 0) {
+ topic = flb_kafka_topic_lookup((char *) val.via.str.ptr,
+ val.via.str.size, ctx);
+ /* Add extracted topic on the fly to topiclist */
+ if (ctx->dynamic_topic) {
+ /* Only if default topic is set and this topicname is not set for this message */
+ if (strncmp(topic->name, flb_kafka_topic_default(ctx)->name, val.via.str.size) == 0 &&
+ (strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) {
+ if (memchr(val.via.str.ptr, ',', val.via.str.size)) {
+ /* Don't allow commas in kafkatopic name */
+ flb_warn("',' not allowed in dynamic_kafka topic names");
+ continue;
+ }
+ if (val.via.str.size > 64) {
+ /* Don't allow length of dynamic kafka topics > 64 */
+ flb_warn(" dynamic kafka topic length > 64 not allowed");
+ continue;
+ }
+ dynamic_topic = flb_malloc(val.via.str.size + 1);
+ if (!dynamic_topic) {
+ /* Use default topic */
+ flb_errno();
+ continue;
+ }
+ strncpy(dynamic_topic, val.via.str.ptr, val.via.str.size);
+ dynamic_topic[val.via.str.size] = '\0';
+ topics = flb_utils_split(dynamic_topic, ',', 0);
+ if (!topics) {
+ /* Use the default topic */
+ flb_errno();
+ flb_free(dynamic_topic);
+ continue;
+ }
+ mk_list_foreach(head, topics) {
+ /* Add the (one) found topicname to the topic configuration */
+ entry = mk_list_entry(head, struct flb_split_entry, _head);
+ topic = flb_kafka_topic_create(entry->value, ctx);
+ if (!topic) {
+ /* Use default topic */
+ flb_error("[out_kafka] cannot register topic '%s'",
+ entry->value);
+ topic = flb_kafka_topic_lookup((char *) val.via.str.ptr,
+ val.via.str.size, ctx);
+ }
+ else {
+ flb_info("[out_kafka] new topic added: %s", dynamic_topic);
+ }
+ }
+ flb_free(dynamic_topic);
+ }
+ }
+ }
+ }
+ }
+
+ if (ctx->format == FLB_KAFKA_FMT_JSON) {
+ s = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ if (!s) {
+ flb_plg_error(ctx->ins, "error encoding to JSON");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_ERROR;
+ }
+ out_buf = s;
+ out_size = flb_sds_len(out_buf);
+ }
+ else if (ctx->format == FLB_KAFKA_FMT_MSGP) {
+ out_buf = mp_sbuf.data;
+ out_size = mp_sbuf.size;
+ }
+ else if (ctx->format == FLB_KAFKA_FMT_GELF) {
+ s = flb_msgpack_raw_to_gelf(mp_sbuf.data, mp_sbuf.size,
+ tm, &(ctx->gelf_fields));
+ if (s == NULL) {
+ flb_plg_error(ctx->ins, "error encoding to GELF");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_ERROR;
+ }
+ out_buf = s;
+ out_size = flb_sds_len(s);
+ }
+#ifdef FLB_HAVE_AVRO_ENCODER
+ else if (ctx->format == FLB_KAFKA_FMT_AVRO) {
+
+ flb_plg_debug(ctx->ins, "avro schema ID:%s:\n", ctx->avro_fields.schema_id);
+ flb_plg_debug(ctx->ins, "avro schema string:%s:\n", ctx->avro_fields.schema_str);
+
+ // if there's no data then log it and return
+ if (mp_sbuf.size == 0) {
+ flb_plg_error(ctx->ins, "got zero bytes decoding to avro AVRO:schemaID:%s:\n", ctx->avro_fields.schema_id);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_OK;
+ }
+
+ // is the line is too long log it and return
+ if (mp_sbuf.size > AVRO_LINE_MAX_LEN) {
+ flb_plg_warn(ctx->ins, "skipping long line AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_LINE_MAX_LEN, ctx->avro_fields.schema_id);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_OK;
+ }
+
+ flb_plg_debug(ctx->ins, "using default buffer AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_DEFAULT_BUFFER_SIZE, ctx->avro_fields.schema_id);
+ out_buf = avro_buff;
+ out_size = AVRO_DEFAULT_BUFFER_SIZE;
+
+ if (mp_sbuf.size + AVRO_SCHEMA_OVERHEAD >= AVRO_DEFAULT_BUFFER_SIZE) {
+ flb_plg_info(ctx->ins, "upsizing to dynamic buffer AVRO:len:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, ctx->avro_fields.schema_id);
+ avro_fast_buffer = false;
+ // avro will always be smaller than msgpack
+ // it contains no meta-info aside from the schemaid
+ // all the metadata is in the schema which is not part of the msg
+ // add schemaid + magic byte for safety buffer and allocate
+ // that's 16 byte schemaid and one byte magic byte
+ out_size = mp_sbuf.size + AVRO_SCHEMA_OVERHEAD;
+ out_buf = flb_malloc(out_size);
+ if (!out_buf) {
+ flb_plg_error(ctx->ins, "error allocating memory for decoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_ERROR;
+ }
+ }
+
+ if(!flb_msgpack_raw_to_avro_sds(mp_sbuf.data, mp_sbuf.size, &ctx->avro_fields, out_buf, &out_size)) {
+ flb_plg_error(ctx->ins, "error encoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ if (!avro_fast_buffer) {
+ flb_free(out_buf);
+ }
+ return FLB_ERROR;
+ }
+
+ }
+#endif
+
+ if (!message_key) {
+ message_key = ctx->message_key;
+ message_key_len = ctx->message_key_len;
+ }
+
+ if (!topic) {
+ topic = flb_kafka_topic_default(ctx);
+ }
+ if (!topic) {
+ flb_plg_error(ctx->ins, "no default topic found");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+#ifdef FLB_HAVE_AVRO_ENCODER
+ if (ctx->format == FLB_KAFKA_FMT_AVRO) {
+ AVRO_FREE(avro_fast_buffer, out_buf)
+ }
+#endif
+ return FLB_ERROR;
+ }
+
+ retry:
+ /*
+ * If the local rdkafka queue is full, we retry up to 'queue_full_retries'
+ * times set by the configuration (default: 10). If the configuration
+ * property was set to '0' or 'false', we don't impose a limit. Use that
+ * value under your own risk.
+ */
+ if (ctx->queue_full_retries > 0 &&
+ queue_full_retries >= ctx->queue_full_retries) {
+ if (ctx->format != FLB_KAFKA_FMT_MSGP) {
+ flb_sds_destroy(s);
+ }
+ msgpack_sbuffer_destroy(&mp_sbuf);
+#ifdef FLB_HAVE_AVRO_ENCODER
+ if (ctx->format == FLB_KAFKA_FMT_AVRO) {
+ AVRO_FREE(avro_fast_buffer, out_buf)
+ }
+#endif
+ /*
+ * Unblock the flush requests so that the
+ * engine could try sending data again.
+ */
+ ctx->blocked = FLB_FALSE;
+ return FLB_RETRY;
+ }
+
+ ret = rd_kafka_produce(topic->tp,
+ RD_KAFKA_PARTITION_UA,
+ RD_KAFKA_MSG_F_COPY,
+ out_buf, out_size,
+ message_key, message_key_len,
+ ctx);
+
+ if (ret == -1) {
+ flb_error(
+ "%% Failed to produce to topic %s: %s\n",
+ rd_kafka_topic_name(topic->tp),
+ rd_kafka_err2str(rd_kafka_last_error()));
+
+ /*
+ * rdkafka queue is full, keep trying 'locally' for a few seconds,
+ * otherwise let the caller to issue a main retry againt the engine.
+ */
+ if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
+ flb_plg_warn(ctx->ins,
+ "internal queue is full, retrying in one second");
+
+ /*
+ * If the queue is full, first make sure to discard any further
+ * flush request from the engine. This means 'the caller will
+ * issue a retry at a later time'.
+ */
+ ctx->blocked = FLB_TRUE;
+
+ /*
+ * Next step is to give it some time to the background rdkafka
+ * library to do it own work. By default rdkafka wait 1 second
+ * or up to 10000 messages to be enqueued before delivery.
+ *
+ * If the kafka broker is down we should try a couple of times
+ * to enqueue this message, if we exceed 10 times, we just
+ * issue a full retry of the data chunk.
+ */
+ flb_time_sleep(1000);
+ rd_kafka_poll(ctx->kafka.rk, 0);
+
+ /* Issue a re-try */
+ queue_full_retries++;
+ goto retry;
+ }
+ }
+ else {
+ flb_plg_debug(ctx->ins, "enqueued message (%zd bytes) for topic '%s'",
+ out_size, rd_kafka_topic_name(topic->tp));
+ }
+ ctx->blocked = FLB_FALSE;
+
+ rd_kafka_poll(ctx->kafka.rk, 0);
+ if (ctx->format == FLB_KAFKA_FMT_JSON) {
+ flb_sds_destroy(s);
+ }
+ if (ctx->format == FLB_KAFKA_FMT_GELF) {
+ flb_sds_destroy(s);
+ }
+#ifdef FLB_HAVE_AVRO_ENCODER
+ if (ctx->format == FLB_KAFKA_FMT_AVRO) {
+ AVRO_FREE(avro_fast_buffer, out_buf)
+ }
+#endif
+
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_OK;
+}
+
+static void cb_kafka_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context,
+ struct flb_config *config)
+{
+
+ int ret;
+ struct flb_out_kafka *ctx = out_context;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ /*
+ * If the context is blocked, means rdkafka queue is full and no more
+ * messages can be appended. For our called (Fluent Bit engine) means
+ * that is not possible to work on this now and it need to 'retry'.
+ */
+ if (ctx->blocked == FLB_TRUE) {
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ ret = flb_log_event_decoder_init(&log_decoder,
+ (char *) event_chunk->data,
+ event_chunk->size);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* Iterate the original buffer and perform adjustments */
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ ret = produce_message(&log_event.timestamp,
+ log_event.body,
+ ctx, config);
+
+ if (ret != FLB_OK) {
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ FLB_OUTPUT_RETURN(ret);
+ }
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ FLB_OUTPUT_RETURN(FLB_OK);
+}
+
+static void kafka_flush_force(struct flb_out_kafka *ctx,
+ struct flb_config *config)
+{
+ int ret;
+
+ if (!ctx) {
+ return;
+ }
+
+ if (ctx->kafka.rk) {
+ ret = rd_kafka_flush(ctx->kafka.rk, config->grace * 1000);
+ if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) {
+ flb_plg_warn(ctx->ins, "Failed to force flush: %s",
+ rd_kafka_err2str(ret));
+ }
+ }
+}
+
+static int cb_kafka_exit(void *data, struct flb_config *config)
+{
+ struct flb_out_kafka *ctx = data;
+
+ kafka_flush_force(ctx, config);
+ flb_out_kafka_destroy(ctx);
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "topic_key", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, topic_key),
+ "Which record to use as the kafka topic."
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "dynamic_topic", "false",
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, dynamic_topic),
+ "Activate dynamic topics."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "format", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, format_str),
+ "Set the record output format."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "message_key", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, message_key),
+ "Which record key to use as the message data."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "message_key_field", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, message_key_field),
+ "Which record key field to use as the message data."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "timestamp_key", FLB_KAFKA_TS_KEY,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_key),
+ "Set the key for the the timestamp."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "timestamp_format", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_format_str),
+ "Set the format the timestamp is in."
+ },
+ {
+ FLB_CONFIG_MAP_INT, "queue_full_retries", FLB_KAFKA_QUEUE_FULL_RETRIES,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, queue_full_retries),
+ "Set the number of local retries to enqueue the data."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_timestamp_key", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the timestamp key for gelf output."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_host_key", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the host key for gelf output."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_short_message_key", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the short message key for gelf output."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_full_message_key", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the full message key for gelf output."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_level_key", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the level key for gelf output."
+ },
+#ifdef FLB_HAVE_AVRO_ENCODER
+ {
+ FLB_CONFIG_MAP_STR, "schema_str", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set AVRO schema."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "schema_id", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set AVRO schema ID."
+ },
+#endif
+ {
+ FLB_CONFIG_MAP_STR, "topics", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the kafka topics, delimited by commas."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "brokers", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the kafka brokers, delimited by commas."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "client_id", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the kafka client_id."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "group_id", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the kafka group_id."
+ },
+ {
+ FLB_CONFIG_MAP_STR_PREFIX, "rdkafka.", NULL,
+ //FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_kafka, rdkafka_opts),
+ 0, FLB_FALSE, 0,
+ "Set the kafka group_id."
+ },
+ /* EOF */
+ {0}
+};
+
+struct flb_output_plugin out_kafka_plugin = {
+ .name = "kafka",
+ .description = "Kafka",
+ .cb_init = cb_kafka_init,
+ .cb_flush = cb_kafka_flush,
+ .cb_exit = cb_kafka_exit,
+ .config_map = config_map,
+ .flags = 0
+};