summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_kafka/kafka.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/out_kafka/kafka.c')
-rw-r--r--src/fluent-bit/plugins/out_kafka/kafka.c658
1 files changed, 0 insertions, 658 deletions
diff --git a/src/fluent-bit/plugins/out_kafka/kafka.c b/src/fluent-bit/plugins/out_kafka/kafka.c
deleted file mode 100644
index ff700a687..000000000
--- a/src/fluent-bit/plugins/out_kafka/kafka.c
+++ /dev/null
@@ -1,658 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_utils.h>
-#include <fluent-bit/flb_log_event_decoder.h>
-
-#include "kafka_config.h"
-#include "kafka_topic.h"
-
-void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
- void *opaque)
-{
- struct flb_out_kafka *ctx = (struct flb_out_kafka *) opaque;
-
- if (rkmessage->err) {
- flb_plg_warn(ctx->ins, "message delivery failed: %s",
- rd_kafka_err2str(rkmessage->err));
- }
- else {
- flb_plg_debug(ctx->ins, "message delivered (%zd bytes, "
- "partition %"PRId32")",
- rkmessage->len, rkmessage->partition);
- }
-}
-
-void cb_kafka_logger(const rd_kafka_t *rk, int level,
- const char *fac, const char *buf)
-{
- struct flb_out_kafka *ctx;
-
- ctx = (struct flb_out_kafka *) rd_kafka_opaque(rk);
-
- if (level <= FLB_KAFKA_LOG_ERR) {
- flb_plg_error(ctx->ins, "%s: %s",
- rk ? rd_kafka_name(rk) : NULL, buf);
- }
- else if (level == FLB_KAFKA_LOG_WARNING) {
- flb_plg_warn(ctx->ins, "%s: %s",
- rk ? rd_kafka_name(rk) : NULL, buf);
- }
- else if (level == FLB_KAFKA_LOG_NOTICE || level == FLB_KAFKA_LOG_INFO) {
- flb_plg_info(ctx->ins, "%s: %s",
- rk ? rd_kafka_name(rk) : NULL, buf);
- }
- else if (level == FLB_KAFKA_LOG_DEBUG) {
- flb_plg_debug(ctx->ins, "%s: %s",
- rk ? rd_kafka_name(rk) : NULL, buf);
- }
-}
-
-static int cb_kafka_init(struct flb_output_instance *ins,
- struct flb_config *config,
- void *data)
-{
- struct flb_out_kafka *ctx;
-
- /* Configuration */
- ctx = flb_out_kafka_create(ins, config);
- if (!ctx) {
- flb_plg_error(ins, "failed to initialize");
- return -1;
- }
-
- /* Set global context */
- flb_output_set_context(ins, ctx);
- return 0;
-}
-
-int produce_message(struct flb_time *tm, msgpack_object *map,
- struct flb_out_kafka *ctx, struct flb_config *config)
-{
- int i;
- int ret;
- int size;
- int queue_full_retries = 0;
- char *out_buf;
- size_t out_size;
- struct mk_list *head;
- struct mk_list *topics;
- struct flb_split_entry *entry;
- char *dynamic_topic;
- char *message_key = NULL;
- size_t message_key_len = 0;
- struct flb_kafka_topic *topic = NULL;
- msgpack_sbuffer mp_sbuf;
- msgpack_packer mp_pck;
- msgpack_object key;
- msgpack_object val;
- flb_sds_t s;
-
-#ifdef FLB_HAVE_AVRO_ENCODER
- // used to flag when a buffer needs to be freed for avro
- bool avro_fast_buffer = true;
-
- // avro encoding uses a buffer
- // the majority of lines are fairly small
- // so using static buffer for these is much more efficient
- // larger sizes will allocate
-#ifndef AVRO_DEFAULT_BUFFER_SIZE
-#define AVRO_DEFAULT_BUFFER_SIZE 2048
-#endif
- static char avro_buff[AVRO_DEFAULT_BUFFER_SIZE];
-
- // don't take lines that are too large
- // these lines will log a warning
- // this roughly a log line of 250000 chars
-#ifndef AVRO_LINE_MAX_LEN
-#define AVRO_LINE_MAX_LEN 1000000
-
- // this is a convenience
-#define AVRO_FREE(X, Y) if (!X) { flb_free(Y); }
-#endif
-
- // this is just to keep the code cleaner
- // the avro encoding includes
- // an embedded schemaid which is used
- // the embedding is a null byte
- // followed by a 16 byte schemaid
-#define AVRO_SCHEMA_OVERHEAD 16 + 1
-#endif
-
- flb_debug("in produce_message\n");
- if (flb_log_check(FLB_LOG_DEBUG))
- msgpack_object_print(stderr, *map);
-
- /* Init temporal buffers */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) {
- /* Make room for the timestamp */
- size = map->via.map.size + 1;
- msgpack_pack_map(&mp_pck, size);
-
- /* Pack timestamp */
- msgpack_pack_str(&mp_pck, ctx->timestamp_key_len);
- msgpack_pack_str_body(&mp_pck,
- ctx->timestamp_key, ctx->timestamp_key_len);
- switch (ctx->timestamp_format) {
- case FLB_JSON_DATE_DOUBLE:
- msgpack_pack_double(&mp_pck, flb_time_to_double(tm));
- break;
-
- case FLB_JSON_DATE_ISO8601:
- case FLB_JSON_DATE_ISO8601_NS:
- {
- size_t date_len;
- int len;
- struct tm _tm;
- char time_formatted[36];
-
- /* Format the time; use microsecond precision (not nanoseconds). */
- gmtime_r(&tm->tm.tv_sec, &_tm);
- date_len = strftime(time_formatted, sizeof(time_formatted) - 1,
- FLB_JSON_DATE_ISO8601_FMT, &_tm);
-
- if (ctx->timestamp_format == FLB_JSON_DATE_ISO8601) {
- len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len,
- ".%06" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec / 1000);
- }
- else {
- /* FLB_JSON_DATE_ISO8601_NS */
- len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len,
- ".%09" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec);
- }
- date_len += len;
-
- msgpack_pack_str(&mp_pck, date_len);
- msgpack_pack_str_body(&mp_pck, time_formatted, date_len);
- }
- break;
- }
- }
- else {
- size = map->via.map.size;
- msgpack_pack_map(&mp_pck, size);
- }
-
- for (i = 0; i < map->via.map.size; i++) {
- key = map->via.map.ptr[i].key;
- val = map->via.map.ptr[i].val;
-
- msgpack_pack_object(&mp_pck, key);
- msgpack_pack_object(&mp_pck, val);
-
- /* Lookup message key */
- if (ctx->message_key_field && !message_key && val.type == MSGPACK_OBJECT_STR) {
- if (key.via.str.size == ctx->message_key_field_len &&
- strncmp(key.via.str.ptr, ctx->message_key_field, ctx->message_key_field_len) == 0) {
- message_key = (char *) val.via.str.ptr;
- message_key_len = val.via.str.size;
- }
- }
-
- /* Lookup key/topic */
- if (ctx->topic_key && !topic && val.type == MSGPACK_OBJECT_STR) {
- if (key.via.str.size == ctx->topic_key_len &&
- strncmp(key.via.str.ptr, ctx->topic_key, ctx->topic_key_len) == 0) {
- topic = flb_kafka_topic_lookup((char *) val.via.str.ptr,
- val.via.str.size, ctx);
- /* Add extracted topic on the fly to topiclist */
- if (ctx->dynamic_topic) {
- /* Only if default topic is set and this topicname is not set for this message */
- if (strncmp(topic->name, flb_kafka_topic_default(ctx)->name, val.via.str.size) == 0 &&
- (strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) {
- if (memchr(val.via.str.ptr, ',', val.via.str.size)) {
- /* Don't allow commas in kafkatopic name */
- flb_warn("',' not allowed in dynamic_kafka topic names");
- continue;
- }
- if (val.via.str.size > 64) {
- /* Don't allow length of dynamic kafka topics > 64 */
- flb_warn(" dynamic kafka topic length > 64 not allowed");
- continue;
- }
- dynamic_topic = flb_malloc(val.via.str.size + 1);
- if (!dynamic_topic) {
- /* Use default topic */
- flb_errno();
- continue;
- }
- strncpy(dynamic_topic, val.via.str.ptr, val.via.str.size);
- dynamic_topic[val.via.str.size] = '\0';
- topics = flb_utils_split(dynamic_topic, ',', 0);
- if (!topics) {
- /* Use the default topic */
- flb_errno();
- flb_free(dynamic_topic);
- continue;
- }
- mk_list_foreach(head, topics) {
- /* Add the (one) found topicname to the topic configuration */
- entry = mk_list_entry(head, struct flb_split_entry, _head);
- topic = flb_kafka_topic_create(entry->value, ctx);
- if (!topic) {
- /* Use default topic */
- flb_error("[out_kafka] cannot register topic '%s'",
- entry->value);
- topic = flb_kafka_topic_lookup((char *) val.via.str.ptr,
- val.via.str.size, ctx);
- }
- else {
- flb_info("[out_kafka] new topic added: %s", dynamic_topic);
- }
- }
- flb_free(dynamic_topic);
- }
- }
- }
- }
- }
-
- if (ctx->format == FLB_KAFKA_FMT_JSON) {
- s = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
- if (!s) {
- flb_plg_error(ctx->ins, "error encoding to JSON");
- msgpack_sbuffer_destroy(&mp_sbuf);
- return FLB_ERROR;
- }
- out_buf = s;
- out_size = flb_sds_len(out_buf);
- }
- else if (ctx->format == FLB_KAFKA_FMT_MSGP) {
- out_buf = mp_sbuf.data;
- out_size = mp_sbuf.size;
- }
- else if (ctx->format == FLB_KAFKA_FMT_GELF) {
- s = flb_msgpack_raw_to_gelf(mp_sbuf.data, mp_sbuf.size,
- tm, &(ctx->gelf_fields));
- if (s == NULL) {
- flb_plg_error(ctx->ins, "error encoding to GELF");
- msgpack_sbuffer_destroy(&mp_sbuf);
- return FLB_ERROR;
- }
- out_buf = s;
- out_size = flb_sds_len(s);
- }
-#ifdef FLB_HAVE_AVRO_ENCODER
- else if (ctx->format == FLB_KAFKA_FMT_AVRO) {
-
- flb_plg_debug(ctx->ins, "avro schema ID:%s:\n", ctx->avro_fields.schema_id);
- flb_plg_debug(ctx->ins, "avro schema string:%s:\n", ctx->avro_fields.schema_str);
-
- // if there's no data then log it and return
- if (mp_sbuf.size == 0) {
- flb_plg_error(ctx->ins, "got zero bytes decoding to avro AVRO:schemaID:%s:\n", ctx->avro_fields.schema_id);
- msgpack_sbuffer_destroy(&mp_sbuf);
- return FLB_OK;
- }
-
- // is the line is too long log it and return
- if (mp_sbuf.size > AVRO_LINE_MAX_LEN) {
- flb_plg_warn(ctx->ins, "skipping long line AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_LINE_MAX_LEN, ctx->avro_fields.schema_id);
- msgpack_sbuffer_destroy(&mp_sbuf);
- return FLB_OK;
- }
-
- flb_plg_debug(ctx->ins, "using default buffer AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_DEFAULT_BUFFER_SIZE, ctx->avro_fields.schema_id);
- out_buf = avro_buff;
- out_size = AVRO_DEFAULT_BUFFER_SIZE;
-
- if (mp_sbuf.size + AVRO_SCHEMA_OVERHEAD >= AVRO_DEFAULT_BUFFER_SIZE) {
- flb_plg_info(ctx->ins, "upsizing to dynamic buffer AVRO:len:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, ctx->avro_fields.schema_id);
- avro_fast_buffer = false;
- // avro will always be smaller than msgpack
- // it contains no meta-info aside from the schemaid
- // all the metadata is in the schema which is not part of the msg
- // add schemaid + magic byte for safety buffer and allocate
- // that's 16 byte schemaid and one byte magic byte
- out_size = mp_sbuf.size + AVRO_SCHEMA_OVERHEAD;
- out_buf = flb_malloc(out_size);
- if (!out_buf) {
- flb_plg_error(ctx->ins, "error allocating memory for decoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
- msgpack_sbuffer_destroy(&mp_sbuf);
- return FLB_ERROR;
- }
- }
-
- if(!flb_msgpack_raw_to_avro_sds(mp_sbuf.data, mp_sbuf.size, &ctx->avro_fields, out_buf, &out_size)) {
- flb_plg_error(ctx->ins, "error encoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
- msgpack_sbuffer_destroy(&mp_sbuf);
- if (!avro_fast_buffer) {
- flb_free(out_buf);
- }
- return FLB_ERROR;
- }
-
- }
-#endif
-
- if (!message_key) {
- message_key = ctx->message_key;
- message_key_len = ctx->message_key_len;
- }
-
- if (!topic) {
- topic = flb_kafka_topic_default(ctx);
- }
- if (!topic) {
- flb_plg_error(ctx->ins, "no default topic found");
- msgpack_sbuffer_destroy(&mp_sbuf);
-#ifdef FLB_HAVE_AVRO_ENCODER
- if (ctx->format == FLB_KAFKA_FMT_AVRO) {
- AVRO_FREE(avro_fast_buffer, out_buf)
- }
-#endif
- return FLB_ERROR;
- }
-
- retry:
- /*
- * If the local rdkafka queue is full, we retry up to 'queue_full_retries'
- * times set by the configuration (default: 10). If the configuration
- * property was set to '0' or 'false', we don't impose a limit. Use that
- * value under your own risk.
- */
- if (ctx->queue_full_retries > 0 &&
- queue_full_retries >= ctx->queue_full_retries) {
- if (ctx->format != FLB_KAFKA_FMT_MSGP) {
- flb_sds_destroy(s);
- }
- msgpack_sbuffer_destroy(&mp_sbuf);
-#ifdef FLB_HAVE_AVRO_ENCODER
- if (ctx->format == FLB_KAFKA_FMT_AVRO) {
- AVRO_FREE(avro_fast_buffer, out_buf)
- }
-#endif
- /*
- * Unblock the flush requests so that the
- * engine could try sending data again.
- */
- ctx->blocked = FLB_FALSE;
- return FLB_RETRY;
- }
-
- ret = rd_kafka_produce(topic->tp,
- RD_KAFKA_PARTITION_UA,
- RD_KAFKA_MSG_F_COPY,
- out_buf, out_size,
- message_key, message_key_len,
- ctx);
-
- if (ret == -1) {
- flb_error(
- "%% Failed to produce to topic %s: %s\n",
- rd_kafka_topic_name(topic->tp),
- rd_kafka_err2str(rd_kafka_last_error()));
-
- /*
- * rdkafka queue is full, keep trying 'locally' for a few seconds,
- * otherwise let the caller to issue a main retry againt the engine.
- */
- if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
- flb_plg_warn(ctx->ins,
- "internal queue is full, retrying in one second");
-
- /*
- * If the queue is full, first make sure to discard any further
- * flush request from the engine. This means 'the caller will
- * issue a retry at a later time'.
- */
- ctx->blocked = FLB_TRUE;
-
- /*
- * Next step is to give it some time to the background rdkafka
- * library to do it own work. By default rdkafka wait 1 second
- * or up to 10000 messages to be enqueued before delivery.
- *
- * If the kafka broker is down we should try a couple of times
- * to enqueue this message, if we exceed 10 times, we just
- * issue a full retry of the data chunk.
- */
- flb_time_sleep(1000);
- rd_kafka_poll(ctx->kafka.rk, 0);
-
- /* Issue a re-try */
- queue_full_retries++;
- goto retry;
- }
- }
- else {
- flb_plg_debug(ctx->ins, "enqueued message (%zd bytes) for topic '%s'",
- out_size, rd_kafka_topic_name(topic->tp));
- }
- ctx->blocked = FLB_FALSE;
-
- rd_kafka_poll(ctx->kafka.rk, 0);
- if (ctx->format == FLB_KAFKA_FMT_JSON) {
- flb_sds_destroy(s);
- }
- if (ctx->format == FLB_KAFKA_FMT_GELF) {
- flb_sds_destroy(s);
- }
-#ifdef FLB_HAVE_AVRO_ENCODER
- if (ctx->format == FLB_KAFKA_FMT_AVRO) {
- AVRO_FREE(avro_fast_buffer, out_buf)
- }
-#endif
-
- msgpack_sbuffer_destroy(&mp_sbuf);
- return FLB_OK;
-}
-
-static void cb_kafka_flush(struct flb_event_chunk *event_chunk,
- struct flb_output_flush *out_flush,
- struct flb_input_instance *i_ins,
- void *out_context,
- struct flb_config *config)
-{
-
- int ret;
- struct flb_out_kafka *ctx = out_context;
- struct flb_log_event_decoder log_decoder;
- struct flb_log_event log_event;
-
- /*
- * If the context is blocked, means rdkafka queue is full and no more
- * messages can be appended. For our called (Fluent Bit engine) means
- * that is not possible to work on this now and it need to 'retry'.
- */
- if (ctx->blocked == FLB_TRUE) {
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
-
- ret = flb_log_event_decoder_init(&log_decoder,
- (char *) event_chunk->data,
- event_chunk->size);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- flb_plg_error(ctx->ins,
- "Log event decoder initialization error : %d", ret);
-
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
-
- /* Iterate the original buffer and perform adjustments */
- while ((ret = flb_log_event_decoder_next(
- &log_decoder,
- &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
- ret = produce_message(&log_event.timestamp,
- log_event.body,
- ctx, config);
-
- if (ret != FLB_OK) {
- flb_log_event_decoder_destroy(&log_decoder);
-
- FLB_OUTPUT_RETURN(ret);
- }
- }
-
- flb_log_event_decoder_destroy(&log_decoder);
-
- FLB_OUTPUT_RETURN(FLB_OK);
-}
-
-static void kafka_flush_force(struct flb_out_kafka *ctx,
- struct flb_config *config)
-{
- int ret;
-
- if (!ctx) {
- return;
- }
-
- if (ctx->kafka.rk) {
- ret = rd_kafka_flush(ctx->kafka.rk, config->grace * 1000);
- if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) {
- flb_plg_warn(ctx->ins, "Failed to force flush: %s",
- rd_kafka_err2str(ret));
- }
- }
-}
-
-static int cb_kafka_exit(void *data, struct flb_config *config)
-{
- struct flb_out_kafka *ctx = data;
-
- kafka_flush_force(ctx, config);
- flb_out_kafka_destroy(ctx);
- return 0;
-}
-
-static struct flb_config_map config_map[] = {
- {
- FLB_CONFIG_MAP_STR, "topic_key", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_out_kafka, topic_key),
- "Which record to use as the kafka topic."
- },
- {
- FLB_CONFIG_MAP_BOOL, "dynamic_topic", "false",
- 0, FLB_TRUE, offsetof(struct flb_out_kafka, dynamic_topic),
- "Activate dynamic topics."
- },
- {
- FLB_CONFIG_MAP_STR, "format", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_out_kafka, format_str),
- "Set the record output format."
- },
- {
- FLB_CONFIG_MAP_STR, "message_key", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_out_kafka, message_key),
- "Which record key to use as the message data."
- },
- {
- FLB_CONFIG_MAP_STR, "message_key_field", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_out_kafka, message_key_field),
- "Which record key field to use as the message data."
- },
- {
- FLB_CONFIG_MAP_STR, "timestamp_key", FLB_KAFKA_TS_KEY,
- 0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_key),
- "Set the key for the the timestamp."
- },
- {
- FLB_CONFIG_MAP_STR, "timestamp_format", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_format_str),
- "Set the format the timestamp is in."
- },
- {
- FLB_CONFIG_MAP_INT, "queue_full_retries", FLB_KAFKA_QUEUE_FULL_RETRIES,
- 0, FLB_TRUE, offsetof(struct flb_out_kafka, queue_full_retries),
- "Set the number of local retries to enqueue the data."
- },
- {
- FLB_CONFIG_MAP_STR, "gelf_timestamp_key", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set the timestamp key for gelf output."
- },
- {
- FLB_CONFIG_MAP_STR, "gelf_host_key", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set the host key for gelf output."
- },
- {
- FLB_CONFIG_MAP_STR, "gelf_short_message_key", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set the short message key for gelf output."
- },
- {
- FLB_CONFIG_MAP_STR, "gelf_full_message_key", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set the full message key for gelf output."
- },
- {
- FLB_CONFIG_MAP_STR, "gelf_level_key", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set the level key for gelf output."
- },
-#ifdef FLB_HAVE_AVRO_ENCODER
- {
- FLB_CONFIG_MAP_STR, "schema_str", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set AVRO schema."
- },
- {
- FLB_CONFIG_MAP_STR, "schema_id", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set AVRO schema ID."
- },
-#endif
- {
- FLB_CONFIG_MAP_STR, "topics", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set the kafka topics, delimited by commas."
- },
- {
- FLB_CONFIG_MAP_STR, "brokers", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set the kafka brokers, delimited by commas."
- },
- {
- FLB_CONFIG_MAP_STR, "client_id", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set the kafka client_id."
- },
- {
- FLB_CONFIG_MAP_STR, "group_id", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set the kafka group_id."
- },
- {
- FLB_CONFIG_MAP_STR_PREFIX, "rdkafka.", NULL,
- //FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_kafka, rdkafka_opts),
- 0, FLB_FALSE, 0,
- "Set the kafka group_id."
- },
- /* EOF */
- {0}
-};
-
-struct flb_output_plugin out_kafka_plugin = {
- .name = "kafka",
- .description = "Kafka",
- .cb_init = cb_kafka_init,
- .cb_flush = cb_kafka_flush,
- .cb_exit = cb_kafka_exit,
- .config_map = config_map,
- .flags = 0
-};