summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/out_kafka_rest
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/plugins/out_kafka_rest
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/out_kafka_rest')
-rw-r--r--fluent-bit/plugins/out_kafka_rest/CMakeLists.txt5
-rw-r--r--fluent-bit/plugins/out_kafka_rest/kafka.c351
-rw-r--r--fluent-bit/plugins/out_kafka_rest/kafka.h66
-rw-r--r--fluent-bit/plugins/out_kafka_rest/kafka_conf.c223
-rw-r--r--fluent-bit/plugins/out_kafka_rest/kafka_conf.h33
5 files changed, 678 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_kafka_rest/CMakeLists.txt b/fluent-bit/plugins/out_kafka_rest/CMakeLists.txt
new file mode 100644
index 000000000..39df92f77
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka_rest/CMakeLists.txt
@@ -0,0 +1,5 @@
+set(src
+ kafka_conf.c
+ kafka.c)
+
+FLB_PLUGIN(out_kafka_rest "${src}" "")
diff --git a/fluent-bit/plugins/out_kafka_rest/kafka.c b/fluent-bit/plugins/out_kafka_rest/kafka.c
new file mode 100644
index 000000000..f3b6153a6
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka_rest/kafka.c
@@ -0,0 +1,351 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_config_map.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <msgpack.h>
+
+#include "kafka.h"
+#include "kafka_conf.h"
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "message_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_kafka_rest, message_key),
+ "Specify a message key. "
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "time_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_kafka_rest, time_key),
+ "Specify the name of the field that holds the record timestamp. "
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "topic", "fluent-bit",
+ 0, FLB_TRUE, offsetof(struct flb_kafka_rest, topic),
+ "Specify the kafka topic. "
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "url_path", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_kafka_rest, url_path),
+ "Specify an optional HTTP URL path for the target web server, e.g: /something"
+ },
+
+ {
+ FLB_CONFIG_MAP_DOUBLE, "partition", "-1",
+ 0, FLB_TRUE, offsetof(struct flb_kafka_rest, partition),
+ "Specify kafka partition number. "
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "time_key_format", FLB_KAFKA_TIME_KEYF,
+ 0, FLB_TRUE, offsetof(struct flb_kafka_rest, time_key_format),
+ "Specify the format of the timestamp. "
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
+ 0, FLB_TRUE, offsetof(struct flb_kafka_rest, include_tag_key),
+ "Specify whether to append tag name to final record. "
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "tag_key", "_flb-key",
+ 0, FLB_TRUE, offsetof(struct flb_kafka_rest, tag_key),
+ "Specify the key name of the record if include_tag_key is enabled. "
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "avro_http_header", "false",
+ 0, FLB_TRUE, offsetof(struct flb_kafka_rest, avro_http_header),
+ "Specify if the format has avro header in http request"
+ },
+
+ /* EOF */
+ {0}
+};
+/*
+ * Convert the internal Fluent Bit data representation to the required
+ * one by Kafka REST Proxy.
+ */
+static flb_sds_t kafka_rest_format(const void *data, size_t bytes,
+ const char *tag, int tag_len,
+ size_t *out_size,
+ struct flb_kafka_rest *ctx)
+{
+ int i;
+ int len;
+ int arr_size = 0;
+ int map_size;
+ size_t s;
+ flb_sds_t out_buf;
+ char time_formatted[256];
+ msgpack_object map;
+ msgpack_object key;
+ msgpack_object val;
+ struct tm tm;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int ret;
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return NULL;
+ }
+
+ /* Init temporary buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /* Count number of entries */
+ arr_size = flb_mp_count(data, bytes);
+
+ /* Root map */
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "records", 7);
+
+ msgpack_pack_array(&mp_pck, arr_size);
+
+ /* Iterate and compose array content */
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ map = *log_event.body;
+ map_size = 1;
+
+ if (ctx->partition >= 0) {
+ map_size++;
+ }
+
+ if (ctx->message_key != NULL) {
+ map_size++;
+ }
+
+ msgpack_pack_map(&mp_pck, map_size);
+ if (ctx->partition >= 0) {
+ msgpack_pack_str(&mp_pck, 9);
+ msgpack_pack_str_body(&mp_pck, "partition", 9);
+ msgpack_pack_int64(&mp_pck, ctx->partition);
+ }
+
+
+ if (ctx->message_key != NULL) {
+ msgpack_pack_str(&mp_pck, 3);
+ msgpack_pack_str_body(&mp_pck, "key", 3);
+ msgpack_pack_str(&mp_pck, ctx->message_key_len);
+ msgpack_pack_str_body(&mp_pck, ctx->message_key, ctx->message_key_len);
+ }
+
+ /* Value Map Size */
+ map_size = map.via.map.size;
+ map_size++;
+ if (ctx->include_tag_key == FLB_TRUE) {
+ map_size++;
+ }
+
+ msgpack_pack_str(&mp_pck, 5);
+ msgpack_pack_str_body(&mp_pck, "value", 5);
+
+ msgpack_pack_map(&mp_pck, map_size);
+
+ /* Time key and time formatted */
+ msgpack_pack_str(&mp_pck, ctx->time_key_len);
+ msgpack_pack_str_body(&mp_pck, ctx->time_key, ctx->time_key_len);
+
+ /* Format the time */
+ gmtime_r(&log_event.timestamp.tm.tv_sec, &tm);
+ s = strftime(time_formatted, sizeof(time_formatted) - 1,
+ ctx->time_key_format, &tm);
+ len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
+ ".%09" PRIu64 "Z", (uint64_t) log_event.timestamp.tm.tv_nsec);
+ s += len;
+ msgpack_pack_str(&mp_pck, s);
+ msgpack_pack_str_body(&mp_pck, time_formatted, s);
+
+ /* Tag Key */
+ if (ctx->include_tag_key == FLB_TRUE) {
+ msgpack_pack_str(&mp_pck, ctx->tag_key_len);
+ msgpack_pack_str_body(&mp_pck, ctx->tag_key, ctx->tag_key_len);
+ msgpack_pack_str(&mp_pck, tag_len);
+ msgpack_pack_str_body(&mp_pck, tag, tag_len);
+ }
+
+ for (i = 0; i < map.via.map.size; i++) {
+ key = map.via.map.ptr[i].key;
+ val = map.via.map.ptr[i].val;
+
+ msgpack_pack_object(&mp_pck, key);
+ msgpack_pack_object(&mp_pck, val);
+ }
+ }
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ /* Convert to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ if (!out_buf) {
+ return NULL;
+ }
+
+ *out_size = flb_sds_len(out_buf);
+
+ return out_buf;
+}
+
+static int cb_kafka_init(struct flb_output_instance *ins,
+ struct flb_config *config,
+ void *data)
+{
+ (void) ins;
+ (void) config;
+ (void) data;
+ struct flb_kafka_rest *ctx;
+
+ ctx = flb_kr_conf_create(ins, config);
+ if (!ctx) {
+ flb_plg_error(ins, "cannot initialize plugin");
+ return -1;
+ }
+
+ flb_plg_debug(ctx->ins, "host=%s port=%i",
+ ins->host.name, ins->host.port);
+ flb_output_set_context(ins, ctx);
+
+ return 0;
+}
+
+static void cb_kafka_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context,
+ struct flb_config *config)
+{
+ int ret;
+ flb_sds_t js;
+ size_t js_size;
+ size_t b_sent;
+ struct flb_http_client *c;
+ struct flb_connection *u_conn;
+ struct flb_kafka_rest *ctx = out_context;
+ (void) i_ins;
+
+ /* Get upstream connection */
+ u_conn = flb_upstream_conn_get(ctx->u);
+ if (!u_conn) {
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* Convert format */
+ js = kafka_rest_format(event_chunk->data, event_chunk->size,
+ event_chunk->tag, flb_sds_len(event_chunk->tag),
+ &js_size, ctx);
+ if (!js) {
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+
+ /* Compose HTTP Client request */
+ c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri,
+ js, js_size, NULL, 0, NULL, 0);
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+ if (ctx->avro_http_header == FLB_TRUE) {
+ flb_http_add_header(c,
+ "Content-Type", 12,
+ "application/vnd.kafka.avro.v2+json", 34);
+ }
+ else {
+ flb_http_add_header(c,
+ "Content-Type", 12,
+ "application/vnd.kafka.json.v2+json", 34);
+ }
+
+ if (ctx->http_user && ctx->http_passwd) {
+ flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd);
+ }
+
+ ret = flb_http_do(c, &b_sent);
+ if (ret != 0) {
+ flb_plg_warn(ctx->ins, "http_do=%i", ret);
+ goto retry;
+ }
+ else {
+ /* The request was issued successfully, validate the 'error' field */
+ flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status);
+ if (c->resp.status != 200) {
+ if (c->resp.payload_size > 0) {
+ flb_plg_debug(ctx->ins, "Kafka REST response\n%s",
+ c->resp.payload);
+ }
+ goto retry;
+ }
+
+ if (c->resp.payload_size > 0) {
+ flb_plg_debug(ctx->ins, "Kafka REST response\n%s",
+ c->resp.payload);
+ }
+ else {
+ goto retry;
+ }
+ }
+
+ /* Cleanup */
+ flb_http_client_destroy(c);
+ flb_sds_destroy(js);
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_OK);
+
+ /* Issue a retry */
+ retry:
+ flb_http_client_destroy(c);
+ flb_sds_destroy(js);
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+}
+
+static int cb_kafka_exit(void *data, struct flb_config *config)
+{
+ struct flb_kafka_rest *ctx = data;
+
+ flb_kr_conf_destroy(ctx);
+ return 0;
+}
+
+struct flb_output_plugin out_kafka_rest_plugin = {
+ .name = "kafka-rest",
+ .description = "Kafka REST Proxy",
+ .cb_init = cb_kafka_init,
+ .cb_flush = cb_kafka_flush,
+ .cb_exit = cb_kafka_exit,
+ .config_map = config_map,
+ .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
+};
diff --git a/fluent-bit/plugins/out_kafka_rest/kafka.h b/fluent-bit/plugins/out_kafka_rest/kafka.h
new file mode 100644
index 000000000..c2d220e7d
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka_rest/kafka.h
@@ -0,0 +1,66 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_KAFKA_REST_H
+#define FLB_OUT_KAFKA_REST_H
+
+#define FLB_KAFKA_TIME_KEY "@timestamp"
+#define FLB_KAFKA_TIME_KEYF "%Y-%m-%dT%H:%M:%S"
+#define FLB_KAFKA_TAG_KEY "_flb-key"
+
+struct flb_kafka_rest {
+ /* Kafka specifics */
+ long partition;
+ char *topic;
+ int message_key_len;
+ char *message_key;
+
+ /* HTTP Auth */
+ char *http_user;
+ char *http_passwd;
+
+ /* time key */
+ int time_key_len;
+ char *time_key;
+
+ /* time key format */
+ int time_key_format_len;
+ char *time_key_format;
+
+ /* include_tag_key */
+ int include_tag_key;
+ int tag_key_len;
+ char *tag_key;
+
+ /* HTTP URI */
+ char uri[256];
+ char *url_path;
+
+ /* Upstream connection to the backend server */
+ struct flb_upstream *u;
+
+ /* Plugin instance */
+ struct flb_output_instance *ins;
+
+ /* Avro http header*/
+ int avro_http_header;
+};
+
+
+#endif
diff --git a/fluent-bit/plugins/out_kafka_rest/kafka_conf.c b/fluent-bit/plugins/out_kafka_rest/kafka_conf.c
new file mode 100644
index 000000000..3df50eb8b
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka_rest/kafka_conf.c
@@ -0,0 +1,223 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_utils.h>
+
+#include "kafka.h"
+#include "kafka_conf.h"
+
+struct flb_kafka_rest *flb_kr_conf_create(struct flb_output_instance *ins,
+ struct flb_config *config)
+{
+ long part;
+ int io_flags = 0;
+ const char *tmp;
+ char *endptr;
+ struct flb_upstream *upstream;
+ struct flb_kafka_rest *ctx;
+ int ret;
+
+ /* Allocate context */
+ ctx = flb_calloc(1, sizeof(struct flb_kafka_rest));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->ins = ins;
+
+ ret = flb_output_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return NULL;
+ }
+
+ /* Get network configuration */
+ flb_output_net_default("127.0.0.1", 8082, ins);
+
+ /* use TLS ? */
+ if (ins->use_tls == FLB_TRUE) {
+ io_flags = FLB_IO_TLS;
+ }
+ else {
+ io_flags = FLB_IO_TCP;
+ }
+
+ if (ins->host.ipv6 == FLB_TRUE) {
+ io_flags |= FLB_IO_IPV6;
+ }
+
+ /* Prepare an upstream handler */
+ upstream = flb_upstream_create(config,
+ ins->host.name,
+ ins->host.port,
+ io_flags,
+ ins->tls);
+ if (!upstream) {
+ flb_plg_error(ctx->ins, "cannot create Upstream context");
+ flb_kr_conf_destroy(ctx);
+ return NULL;
+ }
+ ctx->u = upstream;
+ flb_output_upstream_set(ctx->u, ins);
+
+ flb_output_upstream_set(ctx->u, ins);
+
+ /* HTTP Auth */
+ tmp = flb_output_get_property("http_user", ins);
+ if (tmp) {
+ ctx->http_user = flb_strdup(tmp);
+
+ tmp = flb_output_get_property("http_passwd", ins);
+ if (tmp) {
+ ctx->http_passwd = flb_strdup(tmp);
+ }
+ else {
+ ctx->http_passwd = flb_strdup("");
+ }
+ }
+
+ /* Time Key */
+ tmp = flb_output_get_property("time_key", ins);
+ if (tmp) {
+ ctx->time_key = flb_strdup(tmp);
+ ctx->time_key_len = strlen(tmp);
+ }
+ else {
+ ctx->time_key = flb_strdup(FLB_KAFKA_TIME_KEY);
+ ctx->time_key_len = sizeof(FLB_KAFKA_TIME_KEY) - 1;
+ }
+
+ /* Time Key Format */
+ tmp = flb_output_get_property("time_key_format", ins);
+ if (tmp) {
+ ctx->time_key_format = flb_strdup(tmp);
+ ctx->time_key_format_len = strlen(tmp);
+ }
+ else {
+ ctx->time_key_format = flb_strdup(FLB_KAFKA_TIME_KEYF);
+ ctx->time_key_format_len = sizeof(FLB_KAFKA_TIME_KEYF) - 1;
+ }
+
+ /* Include Tag key */
+ tmp = flb_output_get_property("include_tag_key", ins);
+ if (tmp) {
+ ctx->include_tag_key = flb_utils_bool(tmp);
+ }
+ else {
+ ctx->include_tag_key = FLB_FALSE;
+ }
+
+ /* Tag Key */
+ if (ctx->include_tag_key == FLB_TRUE) {
+ tmp = flb_output_get_property("tag_key", ins);
+ if (tmp) {
+ ctx->tag_key = flb_strdup(tmp);
+ ctx->tag_key_len = strlen(tmp);
+ if (tmp[0] != '_') {
+ flb_plg_warn(ctx->ins, "consider use a tag_key "
+ "that starts with '_'");
+ }
+ }
+ else {
+ ctx->tag_key = flb_strdup(FLB_KAFKA_TAG_KEY);
+ ctx->tag_key_len = sizeof(FLB_KAFKA_TAG_KEY) - 1;
+ }
+ }
+
+ /* Kafka: partition */
+ tmp = flb_output_get_property("partition", ins);
+ if (tmp) {
+ errno = 0;
+ part = strtol(tmp, &endptr, 10);
+ if ((errno == ERANGE && (part == LONG_MAX || part == LONG_MIN))
+ || (errno != 0 && part == 0)) {
+ flb_plg_error(ctx->ins, "invalid partition number");
+ }
+
+ if (endptr == tmp) {
+ flb_plg_error(ctx->ins, "invalid partition number");
+ }
+ ctx->partition = part;
+ }
+ else {
+ ctx->partition = -1;
+ }
+
+ /* Kafka: topic */
+ tmp = flb_output_get_property("topic", ins);
+ if (tmp) {
+ ctx->topic = flb_strdup(tmp);
+ }
+ else {
+ ctx->topic = flb_strdup("fluent-bit");
+ }
+
+ /* Set partition based on topic */
+ tmp = flb_output_get_property("url_path", ins);
+ if (tmp) {
+ ctx->url_path = flb_strdup(tmp);
+ snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/topics/%s", ctx->url_path, ctx->topic);
+ } else {
+ ctx->url_path = NULL;
+ snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/topics/%s", ctx->topic);
+ }
+
+ /* Kafka: message key */
+ tmp = flb_output_get_property("message_key", ins);
+ if (tmp) {
+ ctx->message_key = flb_strdup(tmp);
+ ctx->message_key_len = strlen(tmp);
+ }
+ else {
+ ctx->message_key = NULL;
+ ctx->message_key_len = 0;
+ }
+
+ return ctx;
+}
+
+int flb_kr_conf_destroy(struct flb_kafka_rest *ctx)
+{
+ flb_free(ctx->topic);
+ flb_free(ctx->http_user);
+ flb_free(ctx->http_passwd);
+
+ flb_free(ctx->time_key);
+ flb_free(ctx->time_key_format);
+
+ if (ctx->url_path) {
+ flb_free(ctx->url_path);
+ }
+
+ if (ctx->include_tag_key) {
+ flb_free(ctx->tag_key);
+ }
+
+ if (ctx->message_key) {
+ flb_free(ctx->message_key);
+ }
+
+ flb_upstream_destroy(ctx->u);
+ flb_free(ctx);
+
+ return 0;
+}
diff --git a/fluent-bit/plugins/out_kafka_rest/kafka_conf.h b/fluent-bit/plugins/out_kafka_rest/kafka_conf.h
new file mode 100644
index 000000000..1d80445b3
--- /dev/null
+++ b/fluent-bit/plugins/out_kafka_rest/kafka_conf.h
@@ -0,0 +1,33 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_KAFKA_REST_CONF_H
+#define FLB_OUT_KAFKA_REST_CONF_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_output.h>
+
+#include "kafka.h"
+
+struct flb_kafka_rest *flb_kr_conf_create(struct flb_output_instance *ins,
+ struct flb_config *config);
+int flb_kr_conf_destroy(struct flb_kafka_rest *ctx);
+
+#endif