summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_kafka.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_kafka.c')
-rw-r--r--fluent-bit/src/flb_kafka.c216
1 files changed, 216 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_kafka.c b/fluent-bit/src/flb_kafka.c
new file mode 100644
index 00000000..a3b69a9c
--- /dev/null
+++ b/fluent-bit/src/flb_kafka.c
@@ -0,0 +1,216 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2019-2021 The Fluent Bit Authors
+ * Copyright (C) 2015-2018 Treasure Data Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fluent-bit/flb_config.h"
+#include "fluent-bit/flb_mem.h"
+#include "fluent-bit/flb_str.h"
+#include "fluent-bit/flb_utils.h"
+#include "monkey/mk_core/mk_list.h"
+#include <fluent-bit/flb_kafka.h>
+#include <fluent-bit/flb_kv.h>
+
+#include <rdkafka.h>
+
+rd_kafka_conf_t *flb_kafka_conf_create(struct flb_kafka *kafka,
+ struct mk_list *properties,
+ int with_group_id)
+{
+ struct mk_list *head;
+ struct flb_kv *kv;
+ const char *conf;
+ rd_kafka_conf_t *kafka_cfg;
+ char errstr[512];
+
+ kafka_cfg = rd_kafka_conf_new();
+ if (!kafka_cfg) {
+ flb_error("[flb_kafka] Could not initialize kafka config object");
+ goto err;
+ }
+
+ conf = flb_config_prop_get("client_id", properties);
+ if (!conf) {
+ conf = "fluent-bit";
+ }
+ if (rd_kafka_conf_set(kafka_cfg, "client.id", conf,
+ errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+ flb_error("[flb_kafka] cannot configure client id: %s", errstr);
+ }
+
+ if (with_group_id) {
+ conf = flb_config_prop_get("group_id", properties);
+ if (!conf) {
+ conf = "fluent-bit";
+ }
+ if (rd_kafka_conf_set(kafka_cfg, "group.id", conf,
+ errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+ flb_error("[flb_kafka] cannot configure group id: %s", errstr);
+ }
+ }
+
+ conf = flb_config_prop_get("brokers", properties);
+ if (conf) {
+ if (rd_kafka_conf_set(kafka_cfg, "bootstrap.servers", conf,
+ errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+ flb_error("[flb_kafka] failed to configure brokers: %s", errstr);
+ goto err;
+ }
+ kafka->brokers = flb_strdup(conf);
+ }
+ else {
+ flb_error("config: no brokers defined");
+ goto err;
+ }
+
+ /* Iterate custom rdkafka properties */
+ mk_list_foreach(head, properties) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+ if (strncasecmp(kv->key, "rdkafka.", 8) == 0 &&
+ flb_sds_len(kv->key) > 8) {
+ if (rd_kafka_conf_set(kafka_cfg, kv->key + 8, kv->val,
+ errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+ flb_error("[flb_kafka] cannot configure '%s' property", kv->key + 8);
+ }
+ }
+ }
+
+ return kafka_cfg;
+
+err:
+ if (kafka_cfg) {
+ flb_free(kafka_cfg);
+ }
+ return NULL;
+}
+
+static int add_topic_partitions(rd_kafka_topic_partition_list_t *list,
+ const char *topic_str,
+ const char *partitions_str)
+{
+ int ret = -1;
+ struct mk_list *split;
+ char *str, *end;
+ int start, stop;
+ size_t len;
+ split = flb_utils_split(partitions_str, '-', -1);
+ if (!split) {
+ flb_error("[flb_kafka] Failed to split partitions string");
+ goto end;
+ }
+
+ len = mk_list_size(split);
+ if (len == 1) {
+ str = mk_list_entry(split->next, struct flb_split_entry, _head)->value;
+ start = strtol(str, &end, 10);
+ if (end == str || *end != '\0') {
+ flb_error("[flb_kafka] invalid partition \"%s\"", str);
+ goto end;
+ }
+ // single partition
+ rd_kafka_topic_partition_list_add(list, topic_str, start);
+ } else if (len == 2) {
+ str = mk_list_entry(split->next, struct flb_split_entry, _head)->value;
+ start = strtol(str, &end, 10);
+ if (end == str || *end != '\0') {
+ flb_error("[flb_kafka] invalid partition \"%s\"", str);
+ goto end;
+ }
+ str = mk_list_entry(split->next->next, struct flb_split_entry, _head)->value;
+ stop = strtol(str, &end, 10);
+ if (end == str || *end != '\0') {
+ flb_error("[flb_kafka] invalid partition \"%s\"", str);
+ goto end;
+ }
+ rd_kafka_topic_partition_list_add_range(list, topic_str, start, stop);
+ } else {
+ flb_error("[flb_kafka] invalid partition range string \"%s\"", partitions_str);
+ goto end;
+ }
+
+ ret = 0;
+
+end:
+ if (split) {
+ flb_utils_split_free(split);
+ }
+ return ret;
+}
+
+rd_kafka_topic_partition_list_t *flb_kafka_parse_topics(const char *topics_str)
+{
+ rd_kafka_topic_partition_list_t *ret;
+ struct mk_list *split = NULL;
+ struct mk_list *partitions = NULL;
+ struct mk_list *curr;
+ struct flb_split_entry *entry;
+ struct flb_split_entry *topic_entry;
+ struct flb_split_entry *partitions_entry;
+ size_t len;
+
+ ret = rd_kafka_topic_partition_list_new(1);
+ if (!ret) {
+ flb_error("[flb_kafka] Failed to allocate topic list");
+ goto err;
+ }
+
+ split = flb_utils_split(topics_str, ',', -1);
+ if (!split) {
+ flb_error("[flb_kafka] Failed to split topics string");
+ goto err;
+ }
+
+ mk_list_foreach(curr, split) {
+ entry = mk_list_entry(curr, struct flb_split_entry, _head);
+ partitions = flb_utils_split(entry->value, ':', -1);
+ if (!partitions) {
+ flb_error("[flb_kafka] Failed to split topic string");
+ goto err;
+ }
+ len = mk_list_size(partitions);
+ if (len == 1) {
+ rd_kafka_topic_partition_list_add(ret, entry->value, 0);
+ } else if (len == 2) {
+ topic_entry = mk_list_entry(
+ partitions->next, struct flb_split_entry, _head);
+ partitions_entry = mk_list_entry(
+ partitions->next->next, struct flb_split_entry, _head);
+ if (add_topic_partitions(ret, topic_entry->value, partitions_entry->value)) {
+ goto err;
+ }
+ } else {
+ flb_error("[flb_kafka] Failed to parse topic/partition string");
+ goto err;
+ }
+ flb_utils_split_free(partitions);
+ }
+ flb_utils_split_free(split);
+ return ret;
+
+err:
+ if (ret) {
+ rd_kafka_topic_partition_list_destroy(ret);
+ }
+ if (split) {
+ flb_utils_split_free(split);
+ }
+ if (partitions) {
+ flb_utils_split_free(partitions);
+ }
+ return NULL;
+}