summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/out_kafka
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/plugins/out_kafka
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/out_kafka')
-rw-r--r--fluent-bit/plugins/out_kafka/CMakeLists.txt8
-rw-r--r--fluent-bit/plugins/out_kafka/kafka.c658
-rw-r--r--fluent-bit/plugins/out_kafka/kafka_callbacks.h31
-rw-r--r--fluent-bit/plugins/out_kafka/kafka_config.c253
-rw-r--r--fluent-bit/plugins/out_kafka/kafka_config.h129
-rw-r--r--fluent-bit/plugins/out_kafka/kafka_topic.c120
-rw-r--r--fluent-bit/plugins/out_kafka/kafka_topic.h34
7 files changed, 1233 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_kafka/CMakeLists.txt b/fluent-bit/plugins/out_kafka/CMakeLists.txt
new file mode 100644
index 000000000..526910d49
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Fluent Bit Kafka Output plugin
+set(src
+ kafka_config.c
+ kafka_topic.c
+ kafka.c)
+
+FLB_PLUGIN(out_kafka "${src}" "rdkafka")
+target_link_libraries(flb-plugin-out_kafka -lpthread)
diff --git a/fluent-bit/plugins/out_kafka/kafka.c b/fluent-bit/plugins/out_kafka/kafka.c
new file mode 100644
index 000000000..ff700a687
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka/kafka.c
@@ -0,0 +1,658 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+
+#include "kafka_config.h"
+#include "kafka_topic.h"
+
+void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
+ void *opaque)
+{
+ struct flb_out_kafka *ctx = (struct flb_out_kafka *) opaque;
+
+ if (rkmessage->err) {
+ flb_plg_warn(ctx->ins, "message delivery failed: %s",
+ rd_kafka_err2str(rkmessage->err));
+ }
+ else {
+ flb_plg_debug(ctx->ins, "message delivered (%zd bytes, "
+ "partition %"PRId32")",
+ rkmessage->len, rkmessage->partition);
+ }
+}
+
+void cb_kafka_logger(const rd_kafka_t *rk, int level,
+ const char *fac, const char *buf)
+{
+ struct flb_out_kafka *ctx;
+
+ ctx = (struct flb_out_kafka *) rd_kafka_opaque(rk);
+
+ if (level <= FLB_KAFKA_LOG_ERR) {
+ flb_plg_error(ctx->ins, "%s: %s",
+ rk ? rd_kafka_name(rk) : NULL, buf);
+ }
+ else if (level == FLB_KAFKA_LOG_WARNING) {
+ flb_plg_warn(ctx->ins, "%s: %s",
+ rk ? rd_kafka_name(rk) : NULL, buf);
+ }
+ else if (level == FLB_KAFKA_LOG_NOTICE || level == FLB_KAFKA_LOG_INFO) {
+ flb_plg_info(ctx->ins, "%s: %s",
+ rk ? rd_kafka_name(rk) : NULL, buf);
+ }
+ else if (level == FLB_KAFKA_LOG_DEBUG) {
+ flb_plg_debug(ctx->ins, "%s: %s",
+ rk ? rd_kafka_name(rk) : NULL, buf);
+ }
+}
+
+static int cb_kafka_init(struct flb_output_instance *ins,
+ struct flb_config *config,
+ void *data)
+{
+ struct flb_out_kafka *ctx;
+
+ /* Configuration */
+ ctx = flb_out_kafka_create(ins, config);
+ if (!ctx) {
+ flb_plg_error(ins, "failed to initialize");
+ return -1;
+ }
+
+ /* Set global context */
+ flb_output_set_context(ins, ctx);
+ return 0;
+}
+
+int produce_message(struct flb_time *tm, msgpack_object *map,
+ struct flb_out_kafka *ctx, struct flb_config *config)
+{
+ int i;
+ int ret;
+ int size;
+ int queue_full_retries = 0;
+ char *out_buf;
+ size_t out_size;
+ struct mk_list *head;
+ struct mk_list *topics;
+ struct flb_split_entry *entry;
+ char *dynamic_topic;
+ char *message_key = NULL;
+ size_t message_key_len = 0;
+ struct flb_kafka_topic *topic = NULL;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ msgpack_object key;
+ msgpack_object val;
+ flb_sds_t s;
+
+#ifdef FLB_HAVE_AVRO_ENCODER
+ // used to flag when a buffer needs to be freed for avro
+ bool avro_fast_buffer = true;
+
+ // avro encoding uses a buffer
+ // the majority of lines are fairly small
+ // so using static buffer for these is much more efficient
+ // larger sizes will allocate
+#ifndef AVRO_DEFAULT_BUFFER_SIZE
+#define AVRO_DEFAULT_BUFFER_SIZE 2048
+#endif
+ static char avro_buff[AVRO_DEFAULT_BUFFER_SIZE];
+
+ // don't take lines that are too large
+ // these lines will log a warning
+ // this roughly a log line of 250000 chars
+#ifndef AVRO_LINE_MAX_LEN
+#define AVRO_LINE_MAX_LEN 1000000
+
+ // this is a convenience
+#define AVRO_FREE(X, Y) if (!X) { flb_free(Y); }
+#endif
+
+ // this is just to keep the code cleaner
+ // the avro encoding includes
+ // an embedded schemaid which is used
+ // the embedding is a null byte
+ // followed by a 16 byte schemaid
+#define AVRO_SCHEMA_OVERHEAD 16 + 1
+#endif
+
+ flb_debug("in produce_message\n");
+ if (flb_log_check(FLB_LOG_DEBUG))
+ msgpack_object_print(stderr, *map);
+
+ /* Init temporal buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) {
+ /* Make room for the timestamp */
+ size = map->via.map.size + 1;
+ msgpack_pack_map(&mp_pck, size);
+
+ /* Pack timestamp */
+ msgpack_pack_str(&mp_pck, ctx->timestamp_key_len);
+ msgpack_pack_str_body(&mp_pck,
+ ctx->timestamp_key, ctx->timestamp_key_len);
+ switch (ctx->timestamp_format) {
+ case FLB_JSON_DATE_DOUBLE:
+ msgpack_pack_double(&mp_pck, flb_time_to_double(tm));
+ break;
+
+ case FLB_JSON_DATE_ISO8601:
+ case FLB_JSON_DATE_ISO8601_NS:
+ {
+ size_t date_len;
+ int len;
+ struct tm _tm;
+ char time_formatted[36];
+
+ /* Format the time; use microsecond precision (not nanoseconds). */
+ gmtime_r(&tm->tm.tv_sec, &_tm);
+ date_len = strftime(time_formatted, sizeof(time_formatted) - 1,
+ FLB_JSON_DATE_ISO8601_FMT, &_tm);
+
+ if (ctx->timestamp_format == FLB_JSON_DATE_ISO8601) {
+ len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len,
+ ".%06" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec / 1000);
+ }
+ else {
+ /* FLB_JSON_DATE_ISO8601_NS */
+ len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len,
+ ".%09" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec);
+ }
+ date_len += len;
+
+ msgpack_pack_str(&mp_pck, date_len);
+ msgpack_pack_str_body(&mp_pck, time_formatted, date_len);
+ }
+ break;
+ }
+ }
+ else {
+ size = map->via.map.size;
+ msgpack_pack_map(&mp_pck, size);
+ }
+
+ for (i = 0; i < map->via.map.size; i++) {
+ key = map->via.map.ptr[i].key;
+ val = map->via.map.ptr[i].val;
+
+ msgpack_pack_object(&mp_pck, key);
+ msgpack_pack_object(&mp_pck, val);
+
+ /* Lookup message key */
+ if (ctx->message_key_field && !message_key && val.type == MSGPACK_OBJECT_STR) {
+ if (key.via.str.size == ctx->message_key_field_len &&
+ strncmp(key.via.str.ptr, ctx->message_key_field, ctx->message_key_field_len) == 0) {
+ message_key = (char *) val.via.str.ptr;
+ message_key_len = val.via.str.size;
+ }
+ }
+
+ /* Lookup key/topic */
+ if (ctx->topic_key && !topic && val.type == MSGPACK_OBJECT_STR) {
+ if (key.via.str.size == ctx->topic_key_len &&
+ strncmp(key.via.str.ptr, ctx->topic_key, ctx->topic_key_len) == 0) {
+ topic = flb_kafka_topic_lookup((char *) val.via.str.ptr,
+ val.via.str.size, ctx);
+ /* Add extracted topic on the fly to topiclist */
+ if (ctx->dynamic_topic) {
+ /* Only if default topic is set and this topicname is not set for this message */
+ if (strncmp(topic->name, flb_kafka_topic_default(ctx)->name, val.via.str.size) == 0 &&
+ (strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) {
+ if (memchr(val.via.str.ptr, ',', val.via.str.size)) {
+ /* Don't allow commas in kafkatopic name */
+ flb_warn("',' not allowed in dynamic_kafka topic names");
+ continue;
+ }
+ if (val.via.str.size > 64) {
+ /* Don't allow length of dynamic kafka topics > 64 */
+ flb_warn(" dynamic kafka topic length > 64 not allowed");
+ continue;
+ }
+ dynamic_topic = flb_malloc(val.via.str.size + 1);
+ if (!dynamic_topic) {
+ /* Use default topic */
+ flb_errno();
+ continue;
+ }
+ strncpy(dynamic_topic, val.via.str.ptr, val.via.str.size);
+ dynamic_topic[val.via.str.size] = '\0';
+ topics = flb_utils_split(dynamic_topic, ',', 0);
+ if (!topics) {
+ /* Use the default topic */
+ flb_errno();
+ flb_free(dynamic_topic);
+ continue;
+ }
+ mk_list_foreach(head, topics) {
+ /* Add the (one) found topicname to the topic configuration */
+ entry = mk_list_entry(head, struct flb_split_entry, _head);
+ topic = flb_kafka_topic_create(entry->value, ctx);
+ if (!topic) {
+ /* Use default topic */
+ flb_error("[out_kafka] cannot register topic '%s'",
+ entry->value);
+ topic = flb_kafka_topic_lookup((char *) val.via.str.ptr,
+ val.via.str.size, ctx);
+ }
+ else {
+ flb_info("[out_kafka] new topic added: %s", dynamic_topic);
+ }
+ }
+ flb_free(dynamic_topic);
+ }
+ }
+ }
+ }
+ }
+
+ if (ctx->format == FLB_KAFKA_FMT_JSON) {
+ s = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ if (!s) {
+ flb_plg_error(ctx->ins, "error encoding to JSON");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_ERROR;
+ }
+ out_buf = s;
+ out_size = flb_sds_len(out_buf);
+ }
+ else if (ctx->format == FLB_KAFKA_FMT_MSGP) {
+ out_buf = mp_sbuf.data;
+ out_size = mp_sbuf.size;
+ }
+ else if (ctx->format == FLB_KAFKA_FMT_GELF) {
+ s = flb_msgpack_raw_to_gelf(mp_sbuf.data, mp_sbuf.size,
+ tm, &(ctx->gelf_fields));
+ if (s == NULL) {
+ flb_plg_error(ctx->ins, "error encoding to GELF");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_ERROR;
+ }
+ out_buf = s;
+ out_size = flb_sds_len(s);
+ }
+#ifdef FLB_HAVE_AVRO_ENCODER
+ else if (ctx->format == FLB_KAFKA_FMT_AVRO) {
+
+ flb_plg_debug(ctx->ins, "avro schema ID:%s:\n", ctx->avro_fields.schema_id);
+ flb_plg_debug(ctx->ins, "avro schema string:%s:\n", ctx->avro_fields.schema_str);
+
+ // if there's no data then log it and return
+ if (mp_sbuf.size == 0) {
+ flb_plg_error(ctx->ins, "got zero bytes decoding to avro AVRO:schemaID:%s:\n", ctx->avro_fields.schema_id);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_OK;
+ }
+
+ // is the line is too long log it and return
+ if (mp_sbuf.size > AVRO_LINE_MAX_LEN) {
+ flb_plg_warn(ctx->ins, "skipping long line AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_LINE_MAX_LEN, ctx->avro_fields.schema_id);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_OK;
+ }
+
+ flb_plg_debug(ctx->ins, "using default buffer AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_DEFAULT_BUFFER_SIZE, ctx->avro_fields.schema_id);
+ out_buf = avro_buff;
+ out_size = AVRO_DEFAULT_BUFFER_SIZE;
+
+ if (mp_sbuf.size + AVRO_SCHEMA_OVERHEAD >= AVRO_DEFAULT_BUFFER_SIZE) {
+ flb_plg_info(ctx->ins, "upsizing to dynamic buffer AVRO:len:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, ctx->avro_fields.schema_id);
+ avro_fast_buffer = false;
+ // avro will always be smaller than msgpack
+ // it contains no meta-info aside from the schemaid
+ // all the metadata is in the schema which is not part of the msg
+ // add schemaid + magic byte for safety buffer and allocate
+ // that's 16 byte schemaid and one byte magic byte
+ out_size = mp_sbuf.size + AVRO_SCHEMA_OVERHEAD;
+ out_buf = flb_malloc(out_size);
+ if (!out_buf) {
+ flb_plg_error(ctx->ins, "error allocating memory for decoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_ERROR;
+ }
+ }
+
+ if(!flb_msgpack_raw_to_avro_sds(mp_sbuf.data, mp_sbuf.size, &ctx->avro_fields, out_buf, &out_size)) {
+ flb_plg_error(ctx->ins, "error encoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ if (!avro_fast_buffer) {
+ flb_free(out_buf);
+ }
+ return FLB_ERROR;
+ }
+
+ }
+#endif
+
+ if (!message_key) {
+ message_key = ctx->message_key;
+ message_key_len = ctx->message_key_len;
+ }
+
+ if (!topic) {
+ topic = flb_kafka_topic_default(ctx);
+ }
+ if (!topic) {
+ flb_plg_error(ctx->ins, "no default topic found");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+#ifdef FLB_HAVE_AVRO_ENCODER
+ if (ctx->format == FLB_KAFKA_FMT_AVRO) {
+ AVRO_FREE(avro_fast_buffer, out_buf)
+ }
+#endif
+ return FLB_ERROR;
+ }
+
+ retry:
+ /*
+ * If the local rdkafka queue is full, we retry up to 'queue_full_retries'
+ * times set by the configuration (default: 10). If the configuration
+ * property was set to '0' or 'false', we don't impose a limit. Use that
+ * value under your own risk.
+ */
+ if (ctx->queue_full_retries > 0 &&
+ queue_full_retries >= ctx->queue_full_retries) {
+ if (ctx->format != FLB_KAFKA_FMT_MSGP) {
+ flb_sds_destroy(s);
+ }
+ msgpack_sbuffer_destroy(&mp_sbuf);
+#ifdef FLB_HAVE_AVRO_ENCODER
+ if (ctx->format == FLB_KAFKA_FMT_AVRO) {
+ AVRO_FREE(avro_fast_buffer, out_buf)
+ }
+#endif
+ /*
+ * Unblock the flush requests so that the
+ * engine could try sending data again.
+ */
+ ctx->blocked = FLB_FALSE;
+ return FLB_RETRY;
+ }
+
+ ret = rd_kafka_produce(topic->tp,
+ RD_KAFKA_PARTITION_UA,
+ RD_KAFKA_MSG_F_COPY,
+ out_buf, out_size,
+ message_key, message_key_len,
+ ctx);
+
+ if (ret == -1) {
+ flb_error(
+ "%% Failed to produce to topic %s: %s\n",
+ rd_kafka_topic_name(topic->tp),
+ rd_kafka_err2str(rd_kafka_last_error()));
+
+ /*
+ * rdkafka queue is full, keep trying 'locally' for a few seconds,
+ * otherwise let the caller to issue a main retry againt the engine.
+ */
+ if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
+ flb_plg_warn(ctx->ins,
+ "internal queue is full, retrying in one second");
+
+ /*
+ * If the queue is full, first make sure to discard any further
+ * flush request from the engine. This means 'the caller will
+ * issue a retry at a later time'.
+ */
+ ctx->blocked = FLB_TRUE;
+
+ /*
+ * Next step is to give it some time to the background rdkafka
+ * library to do it own work. By default rdkafka wait 1 second
+ * or up to 10000 messages to be enqueued before delivery.
+ *
+ * If the kafka broker is down we should try a couple of times
+ * to enqueue this message, if we exceed 10 times, we just
+ * issue a full retry of the data chunk.
+ */
+ flb_time_sleep(1000);
+ rd_kafka_poll(ctx->kafka.rk, 0);
+
+ /* Issue a re-try */
+ queue_full_retries++;
+ goto retry;
+ }
+ }
+ else {
+ flb_plg_debug(ctx->ins, "enqueued message (%zd bytes) for topic '%s'",
+ out_size, rd_kafka_topic_name(topic->tp));
+ }
+ ctx->blocked = FLB_FALSE;
+
+ rd_kafka_poll(ctx->kafka.rk, 0);
+ if (ctx->format == FLB_KAFKA_FMT_JSON) {
+ flb_sds_destroy(s);
+ }
+ if (ctx->format == FLB_KAFKA_FMT_GELF) {
+ flb_sds_destroy(s);
+ }
+#ifdef FLB_HAVE_AVRO_ENCODER
+ if (ctx->format == FLB_KAFKA_FMT_AVRO) {
+ AVRO_FREE(avro_fast_buffer, out_buf)
+ }
+#endif
+
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_OK;
+}
+
+static void cb_kafka_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context,
+ struct flb_config *config)
+{
+
+ int ret;
+ struct flb_out_kafka *ctx = out_context;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ /*
+ * If the context is blocked, means rdkafka queue is full and no more
+ * messages can be appended. For our called (Fluent Bit engine) means
+ * that is not possible to work on this now and it need to 'retry'.
+ */
+ if (ctx->blocked == FLB_TRUE) {
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ ret = flb_log_event_decoder_init(&log_decoder,
+ (char *) event_chunk->data,
+ event_chunk->size);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* Iterate the original buffer and perform adjustments */
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ ret = produce_message(&log_event.timestamp,
+ log_event.body,
+ ctx, config);
+
+ if (ret != FLB_OK) {
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ FLB_OUTPUT_RETURN(ret);
+ }
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ FLB_OUTPUT_RETURN(FLB_OK);
+}
+
+static void kafka_flush_force(struct flb_out_kafka *ctx,
+ struct flb_config *config)
+{
+ int ret;
+
+ if (!ctx) {
+ return;
+ }
+
+ if (ctx->kafka.rk) {
+ ret = rd_kafka_flush(ctx->kafka.rk, config->grace * 1000);
+ if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) {
+ flb_plg_warn(ctx->ins, "Failed to force flush: %s",
+ rd_kafka_err2str(ret));
+ }
+ }
+}
+
+static int cb_kafka_exit(void *data, struct flb_config *config)
+{
+ struct flb_out_kafka *ctx = data;
+
+ kafka_flush_force(ctx, config);
+ flb_out_kafka_destroy(ctx);
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "topic_key", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, topic_key),
+ "Which record to use as the kafka topic."
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "dynamic_topic", "false",
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, dynamic_topic),
+ "Activate dynamic topics."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "format", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, format_str),
+ "Set the record output format."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "message_key", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, message_key),
+ "Which record key to use as the message data."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "message_key_field", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, message_key_field),
+ "Which record key field to use as the message data."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "timestamp_key", FLB_KAFKA_TS_KEY,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_key),
+ "Set the key for the the timestamp."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "timestamp_format", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_format_str),
+ "Set the format the timestamp is in."
+ },
+ {
+ FLB_CONFIG_MAP_INT, "queue_full_retries", FLB_KAFKA_QUEUE_FULL_RETRIES,
+ 0, FLB_TRUE, offsetof(struct flb_out_kafka, queue_full_retries),
+ "Set the number of local retries to enqueue the data."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_timestamp_key", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the timestamp key for gelf output."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_host_key", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the host key for gelf output."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_short_message_key", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the short message key for gelf output."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_full_message_key", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the full message key for gelf output."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_level_key", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the level key for gelf output."
+ },
+#ifdef FLB_HAVE_AVRO_ENCODER
+ {
+ FLB_CONFIG_MAP_STR, "schema_str", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set AVRO schema."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "schema_id", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set AVRO schema ID."
+ },
+#endif
+ {
+ FLB_CONFIG_MAP_STR, "topics", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the kafka topics, delimited by commas."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "brokers", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the kafka brokers, delimited by commas."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "client_id", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the kafka client_id."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "group_id", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the kafka group_id."
+ },
+ {
+ FLB_CONFIG_MAP_STR_PREFIX, "rdkafka.", NULL,
+ //FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_kafka, rdkafka_opts),
+ 0, FLB_FALSE, 0,
+ "Set the kafka group_id."
+ },
+ /* EOF */
+ {0}
+};
+
+struct flb_output_plugin out_kafka_plugin = {
+ .name = "kafka",
+ .description = "Kafka",
+ .cb_init = cb_kafka_init,
+ .cb_flush = cb_kafka_flush,
+ .cb_exit = cb_kafka_exit,
+ .config_map = config_map,
+ .flags = 0
+};
diff --git a/fluent-bit/plugins/out_kafka/kafka_callbacks.h b/fluent-bit/plugins/out_kafka/kafka_callbacks.h
new file mode 100644
index 000000000..f496cba8c
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka/kafka_callbacks.h
@@ -0,0 +1,31 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_KAFKA_CALLBACKS_H
+#define FLB_OUT_KAFKA_CALLBACKS_H
+
+#include "rdkafka.h"
+
+void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
+ void *opaque);
+
+void cb_kafka_logger(const rd_kafka_t *rk, int level,
+ const char *fac, const char *buf);
+
+#endif
diff --git a/fluent-bit/plugins/out_kafka/kafka_config.c b/fluent-bit/plugins/out_kafka/kafka_config.c
new file mode 100644
index 000000000..3c00f3682
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka/kafka_config.c
@@ -0,0 +1,253 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_utils.h>
+
+#include "kafka_config.h"
+#include "kafka_topic.h"
+#include "kafka_callbacks.h"
+
+struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
+ struct flb_config *config)
+{
+ int ret;
+ const char *tmp;
+ char errstr[512];
+ struct mk_list *head;
+ struct mk_list *topics;
+ struct flb_split_entry *entry;
+ struct flb_out_kafka *ctx;
+
+ /* Configuration context */
+ ctx = flb_calloc(1, sizeof(struct flb_out_kafka));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->ins = ins;
+ ctx->blocked = FLB_FALSE;
+
+ ret = flb_output_config_map_set(ins, (void*) ctx);
+ if (ret == -1) {
+ flb_plg_error(ins, "unable to load configuration.");
+ flb_free(ctx);
+
+ return NULL;
+ }
+
+ /* rdkafka config context */
+ ctx->conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 0);
+ if (!ctx->conf) {
+ flb_plg_error(ctx->ins, "error creating context");
+ flb_free(ctx);
+ return NULL;
+ }
+
+ /* Set our global opaque data (plugin context*/
+ rd_kafka_conf_set_opaque(ctx->conf, ctx);
+
+ /* Callback: message delivery */
+ rd_kafka_conf_set_dr_msg_cb(ctx->conf, cb_kafka_msg);
+
+ /* Callback: log */
+ rd_kafka_conf_set_log_cb(ctx->conf, cb_kafka_logger);
+
+ /* Config: Topic_Key */
+ if (ctx->topic_key) {
+ ctx->topic_key_len = strlen(ctx->topic_key);
+ }
+
+ /* Config: Format */
+ if (ctx->format_str) {
+ if (strcasecmp(ctx->format_str, "json") == 0) {
+ ctx->format = FLB_KAFKA_FMT_JSON;
+ }
+ else if (strcasecmp(ctx->format_str, "msgpack") == 0) {
+ ctx->format = FLB_KAFKA_FMT_MSGP;
+ }
+ else if (strcasecmp(ctx->format_str, "gelf") == 0) {
+ ctx->format = FLB_KAFKA_FMT_GELF;
+ }
+#ifdef FLB_HAVE_AVRO_ENCODER
+ else if (strcasecmp(ctx->format_str, "avro") == 0) {
+ ctx->format = FLB_KAFKA_FMT_AVRO;
+ }
+#endif
+ }
+ else {
+ ctx->format = FLB_KAFKA_FMT_JSON;
+ }
+
+ /* Config: Message_Key */
+ if (ctx->message_key) {
+ ctx->message_key_len = strlen(ctx->message_key);
+ }
+ else {
+ ctx->message_key_len = 0;
+ }
+
+ /* Config: Message_Key_Field */
+ if (ctx->message_key_field) {
+ ctx->message_key_field_len = strlen(ctx->message_key_field);
+ }
+ else {
+ ctx->message_key_field_len = 0;
+ }
+
+ /* Config: Timestamp_Key */
+ if (ctx->timestamp_key) {
+ ctx->timestamp_key_len = strlen(ctx->timestamp_key);
+ }
+
+ /* Config: Timestamp_Format */
+ ctx->timestamp_format = FLB_JSON_DATE_DOUBLE;
+ if (ctx->timestamp_format_str) {
+ if (strcasecmp(ctx->timestamp_format_str, "iso8601") == 0) {
+ ctx->timestamp_format = FLB_JSON_DATE_ISO8601;
+ }
+ else if (strcasecmp(ctx->timestamp_format_str, "iso8601_ns") == 0) {
+ ctx->timestamp_format = FLB_JSON_DATE_ISO8601_NS;
+ }
+ }
+
+ /* set number of retries: note that if the number is zero, means forever */
+ if (ctx->queue_full_retries < 0) {
+ ctx->queue_full_retries = 0;
+ }
+
+ /* Config Gelf_Short_Message_Key */
+ tmp = flb_output_get_property("gelf_short_message_key", ins);
+ if (tmp) {
+ ctx->gelf_fields.short_message_key = flb_sds_create(tmp);
+ }
+
+ /* Config Gelf_Full_Message_Key */
+ tmp = flb_output_get_property("gelf_full_message_key", ins);
+ if (tmp) {
+ ctx->gelf_fields.full_message_key = flb_sds_create(tmp);
+ }
+
+ /* Config Gelf_Level_Key */
+ tmp = flb_output_get_property("gelf_level_key", ins);
+ if (tmp) {
+ ctx->gelf_fields.level_key = flb_sds_create(tmp);
+ }
+
+ /* Kafka Producer */
+ ctx->kafka.rk = rd_kafka_new(RD_KAFKA_PRODUCER, ctx->conf,
+ errstr, sizeof(errstr));
+ if (!ctx->kafka.rk) {
+ flb_plg_error(ctx->ins, "failed to create producer: %s",
+ errstr);
+ flb_out_kafka_destroy(ctx);
+ return NULL;
+ }
+
+#ifdef FLB_HAVE_AVRO_ENCODER
+ /* Config AVRO */
+ tmp = flb_output_get_property("schema_str", ins);
+ if (tmp) {
+ ctx->avro_fields.schema_str = flb_sds_create(tmp);
+ }
+ tmp = flb_output_get_property("schema_id", ins);
+ if (tmp) {
+ ctx->avro_fields.schema_id = flb_sds_create(tmp);
+ }
+#endif
+
+ /* Config: Topic */
+ mk_list_init(&ctx->topics);
+ tmp = flb_output_get_property("topics", ins);
+ if (!tmp) {
+ flb_kafka_topic_create(FLB_KAFKA_TOPIC, ctx);
+ }
+ else {
+ topics = flb_utils_split(tmp, ',', -1);
+ if (!topics) {
+ flb_plg_warn(ctx->ins, "invalid topics defined, setting default");
+ flb_kafka_topic_create(FLB_KAFKA_TOPIC, ctx);
+ }
+ else {
+ /* Register each topic */
+ mk_list_foreach(head, topics) {
+ entry = mk_list_entry(head, struct flb_split_entry, _head);
+ if (!flb_kafka_topic_create(entry->value, ctx)) {
+ flb_plg_error(ctx->ins, "cannot register topic '%s'",
+ entry->value);
+ }
+ }
+ flb_utils_split_free(topics);
+ }
+ }
+
+ flb_plg_info(ctx->ins, "brokers='%s' topics='%s'", ctx->kafka.brokers, tmp);
+#ifdef FLB_HAVE_AVRO_ENCODER
+ flb_plg_info(ctx->ins, "schemaID='%s' schema='%s'", ctx->avro_fields.schema_id, ctx->avro_fields.schema_str);
+#endif
+
+ return ctx;
+}
+
+int flb_out_kafka_destroy(struct flb_out_kafka *ctx)
+{
+ if (!ctx) {
+ return 0;
+ }
+
+ if (ctx->kafka.brokers) {
+ flb_free(ctx->kafka.brokers);
+ }
+
+ flb_kafka_topic_destroy_all(ctx);
+
+ if (ctx->kafka.rk) {
+ rd_kafka_destroy(ctx->kafka.rk);
+ }
+
+ if (ctx->topic_key) {
+ flb_free(ctx->topic_key);
+ }
+
+ if (ctx->message_key) {
+ flb_free(ctx->message_key);
+ }
+
+ if (ctx->message_key_field) {
+ flb_free(ctx->message_key_field);
+ }
+
+ flb_sds_destroy(ctx->gelf_fields.timestamp_key);
+ flb_sds_destroy(ctx->gelf_fields.host_key);
+ flb_sds_destroy(ctx->gelf_fields.short_message_key);
+ flb_sds_destroy(ctx->gelf_fields.full_message_key);
+ flb_sds_destroy(ctx->gelf_fields.level_key);
+
+#ifdef FLB_HAVE_AVRO_ENCODER
+ // avro
+ flb_sds_destroy(ctx->avro_fields.schema_id);
+ flb_sds_destroy(ctx->avro_fields.schema_str);
+#endif
+
+ flb_free(ctx);
+ return 0;
+}
diff --git a/fluent-bit/plugins/out_kafka/kafka_config.h b/fluent-bit/plugins/out_kafka/kafka_config.h
new file mode 100644
index 000000000..1ef2cce16
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka/kafka_config.h
@@ -0,0 +1,129 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_KAFKA_CONFIG_H
+#define FLB_OUT_KAFKA_CONFIG_H
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_pack.h>
+#ifdef FLB_HAVE_AVRO_ENCODER
+#include <fluent-bit/flb_avro.h>
+#endif
+
+#include <fluent-bit/flb_kafka.h>
+
+#define FLB_KAFKA_FMT_JSON 0
+#define FLB_KAFKA_FMT_MSGP 1
+#define FLB_KAFKA_FMT_GELF 2
+#ifdef FLB_HAVE_AVRO_ENCODER
+#define FLB_KAFKA_FMT_AVRO 3
+#endif
+#define FLB_KAFKA_TS_KEY "@timestamp"
+#define FLB_KAFKA_QUEUE_FULL_RETRIES "10"
+
+/* rdkafka log levels based on syslog(3) */
+#define FLB_KAFKA_LOG_EMERG 0
+#define FLB_KAFKA_LOG_ALERT 1
+#define FLB_KAFKA_LOG_CRIT 2
+#define FLB_KAFKA_LOG_ERR 3
+#define FLB_KAFKA_LOG_WARNING 4
+#define FLB_KAFKA_LOG_NOTICE 5
+#define FLB_KAFKA_LOG_INFO 6
+#define FLB_KAFKA_LOG_DEBUG 7
+
+#define FLB_JSON_DATE_DOUBLE 0
+#define FLB_JSON_DATE_ISO8601 1
+#define FLB_JSON_DATE_ISO8601_NS 2
+#define FLB_JSON_DATE_ISO8601_FMT "%Y-%m-%dT%H:%M:%S"
+
+struct flb_kafka_topic {
+ int name_len;
+ char *name;
+ rd_kafka_topic_t *tp;
+ struct mk_list _head;
+};
+
+struct flb_out_kafka {
+ struct flb_kafka kafka;
+ /* Config Parameters */
+ int format;
+ flb_sds_t format_str;
+
+ /* Optional topic key for routing */
+ int topic_key_len;
+ char *topic_key;
+
+ int timestamp_key_len;
+ char *timestamp_key;
+ int timestamp_format;
+ flb_sds_t timestamp_format_str;
+
+ int message_key_len;
+ char *message_key;
+
+ int message_key_field_len;
+ char *message_key_field;
+
+ /* Gelf Keys */
+ struct flb_gelf_fields gelf_fields;
+
+ /* Head of defined topics by configuration */
+ struct mk_list topics;
+
+ /*
+ * Blocked Status: since rdkafka have it own buffering queue, there is a
+ * chance that the queue becomes full, when that happens our default
+ * behavior is the following:
+ *
+ * - out_kafka yields and try to continue every second until it succeed. In
+ * the meanwhile blocked flag gets FLB_TRUE value.
+ * - when flushing more records and blocked == FLB_TRUE, issue
+ * a retry.
+ */
+ int blocked;
+
+ int dynamic_topic;
+
+ int queue_full_retries;
+
+ /* Internal */
+ rd_kafka_conf_t *conf;
+
+ /* Plugin instance */
+ struct flb_output_instance *ins;
+
+#ifdef FLB_HAVE_AVRO_ENCODER
+ // avro serialization requires a schema
+ // the schema is stored in json in avro_schema_str
+ //
+ // optionally the schema ID can be stashed in the avro data stream
+ // the schema ID is stored in avro_schema_id
+ // this is common at this time with large kafka installations and schema registries
+ // flb_sds_t avro_schema_str;
+ // flb_sds_t avro_schema_id;
+ struct flb_avro_fields avro_fields;
+#endif
+
+};
+
+struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
+ struct flb_config *config);
+int flb_out_kafka_destroy(struct flb_out_kafka *ctx);
+
+#endif
diff --git a/fluent-bit/plugins/out_kafka/kafka_topic.c b/fluent-bit/plugins/out_kafka/kafka_topic.c
new file mode 100644
index 000000000..2db8698b1
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka/kafka_topic.c
@@ -0,0 +1,120 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_mem.h>
+
+#include "kafka_config.h"
+#include "rdkafka.h"
+
+struct flb_kafka_topic *flb_kafka_topic_create(char *name,
+ struct flb_out_kafka *ctx)
+{
+ rd_kafka_topic_t *tp;
+ struct flb_kafka_topic *topic;
+
+ tp = rd_kafka_topic_new(ctx->kafka.rk, name, NULL);
+ if (!tp) {
+ flb_plg_error(ctx->ins, "failed to create topic: %s",
+ rd_kafka_err2str(rd_kafka_last_error()));
+ return NULL;
+ }
+
+ topic = flb_malloc(sizeof(struct flb_kafka_topic));
+ if (!topic) {
+ flb_errno();
+ return NULL;
+ }
+
+ topic->name = flb_strdup(name);
+ topic->name_len = strlen(name);
+ topic->tp = tp;
+ mk_list_add(&topic->_head, &ctx->topics);
+
+ return topic;
+}
+
+int flb_kafka_topic_destroy(struct flb_kafka_topic *topic,
+ struct flb_out_kafka *ctx)
+{
+ mk_list_del(&topic->_head);
+ rd_kafka_topic_destroy(topic->tp);
+ flb_free(topic->name);
+ flb_free(topic);
+
+ return 0;
+}
+
+int flb_kafka_topic_destroy_all(struct flb_out_kafka *ctx)
+{
+ int c = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_kafka_topic *topic;
+
+ mk_list_foreach_safe(head, tmp, &ctx->topics) {
+ topic = mk_list_entry(head, struct flb_kafka_topic, _head);
+ flb_kafka_topic_destroy(topic, ctx);
+ c++;
+ }
+
+ return c;
+}
+
+/* Get first topic of the list (default topic) */
+struct flb_kafka_topic *flb_kafka_topic_default(struct flb_out_kafka *ctx)
+{
+ struct flb_kafka_topic *topic;
+
+ if (mk_list_is_empty(&ctx->topics) == 0) {
+ return NULL;
+ }
+
+ topic = mk_list_entry_first(&ctx->topics, struct flb_kafka_topic,
+ _head);
+ return topic;
+}
+
+struct flb_kafka_topic *flb_kafka_topic_lookup(char *name,
+ int name_len,
+ struct flb_out_kafka *ctx)
+{
+ struct mk_list *head;
+ struct flb_kafka_topic *topic;
+
+ if (!ctx->topic_key) {
+ return flb_kafka_topic_default(ctx);
+ }
+
+ mk_list_foreach(head, &ctx->topics) {
+ topic = mk_list_entry(head, struct flb_kafka_topic, _head);
+ if (topic->name_len != name_len) {
+ continue;
+ }
+
+ if (strncmp(name, topic->name, topic->name_len) == 0) {
+ return topic;
+ }
+ }
+
+ /* No matches, return the default topic */
+ return flb_kafka_topic_default(ctx);
+
+}
diff --git a/fluent-bit/plugins/out_kafka/kafka_topic.h b/fluent-bit/plugins/out_kafka/kafka_topic.h
new file mode 100644
index 000000000..9b1203b96
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka/kafka_topic.h
@@ -0,0 +1,34 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_KAFKA_TOPIC_H
+#define FLB_KAFKA_TOPIC_H
+
+struct flb_kafka_topic *flb_kafka_topic_create(char *name,
+ struct flb_out_kafka *ctx);
+int flb_kafka_topic_destroy(struct flb_kafka_topic *topic,
+ struct flb_out_kafka *ctx);
+int flb_kafka_topic_destroy_all(struct flb_out_kafka *ctx);
+struct flb_kafka_topic *flb_kafka_topic_default(struct flb_out_kafka *ctx);
+
+struct flb_kafka_topic *flb_kafka_topic_lookup(char *name,
+ int name_len,
+ struct flb_out_kafka *ctx);
+
+#endif