summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_kafka.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_kafka.c')
-rw-r--r--fluent-bit/src/flb_kafka.c216
1 files changed, 0 insertions, 216 deletions
diff --git a/fluent-bit/src/flb_kafka.c b/fluent-bit/src/flb_kafka.c
deleted file mode 100644
index a3b69a9c8..000000000
--- a/fluent-bit/src/flb_kafka.c
+++ /dev/null
@@ -1,216 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2019-2021 The Fluent Bit Authors
- * Copyright (C) 2015-2018 Treasure Data Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "fluent-bit/flb_config.h"
-#include "fluent-bit/flb_mem.h"
-#include "fluent-bit/flb_str.h"
-#include "fluent-bit/flb_utils.h"
-#include "monkey/mk_core/mk_list.h"
-#include <fluent-bit/flb_kafka.h>
-#include <fluent-bit/flb_kv.h>
-
-#include <rdkafka.h>
-
-rd_kafka_conf_t *flb_kafka_conf_create(struct flb_kafka *kafka,
- struct mk_list *properties,
- int with_group_id)
-{
- struct mk_list *head;
- struct flb_kv *kv;
- const char *conf;
- rd_kafka_conf_t *kafka_cfg;
- char errstr[512];
-
- kafka_cfg = rd_kafka_conf_new();
- if (!kafka_cfg) {
- flb_error("[flb_kafka] Could not initialize kafka config object");
- goto err;
- }
-
- conf = flb_config_prop_get("client_id", properties);
- if (!conf) {
- conf = "fluent-bit";
- }
- if (rd_kafka_conf_set(kafka_cfg, "client.id", conf,
- errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- flb_error("[flb_kafka] cannot configure client id: %s", errstr);
- }
-
- if (with_group_id) {
- conf = flb_config_prop_get("group_id", properties);
- if (!conf) {
- conf = "fluent-bit";
- }
- if (rd_kafka_conf_set(kafka_cfg, "group.id", conf,
- errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- flb_error("[flb_kafka] cannot configure group id: %s", errstr);
- }
- }
-
- conf = flb_config_prop_get("brokers", properties);
- if (conf) {
- if (rd_kafka_conf_set(kafka_cfg, "bootstrap.servers", conf,
- errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- flb_error("[flb_kafka] failed to configure brokers: %s", errstr);
- goto err;
- }
- kafka->brokers = flb_strdup(conf);
- }
- else {
- flb_error("config: no brokers defined");
- goto err;
- }
-
- /* Iterate custom rdkafka properties */
- mk_list_foreach(head, properties) {
- kv = mk_list_entry(head, struct flb_kv, _head);
- if (strncasecmp(kv->key, "rdkafka.", 8) == 0 &&
- flb_sds_len(kv->key) > 8) {
- if (rd_kafka_conf_set(kafka_cfg, kv->key + 8, kv->val,
- errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- flb_error("[flb_kafka] cannot configure '%s' property", kv->key + 8);
- }
- }
- }
-
- return kafka_cfg;
-
-err:
- if (kafka_cfg) {
- flb_free(kafka_cfg);
- }
- return NULL;
-}
-
-static int add_topic_partitions(rd_kafka_topic_partition_list_t *list,
- const char *topic_str,
- const char *partitions_str)
-{
- int ret = -1;
- struct mk_list *split;
- char *str, *end;
- int start, stop;
- size_t len;
- split = flb_utils_split(partitions_str, '-', -1);
- if (!split) {
- flb_error("[flb_kafka] Failed to split partitions string");
- goto end;
- }
-
- len = mk_list_size(split);
- if (len == 1) {
- str = mk_list_entry(split->next, struct flb_split_entry, _head)->value;
- start = strtol(str, &end, 10);
- if (end == str || *end != '\0') {
- flb_error("[flb_kafka] invalid partition \"%s\"", str);
- goto end;
- }
- // single partition
- rd_kafka_topic_partition_list_add(list, topic_str, start);
- } else if (len == 2) {
- str = mk_list_entry(split->next, struct flb_split_entry, _head)->value;
- start = strtol(str, &end, 10);
- if (end == str || *end != '\0') {
- flb_error("[flb_kafka] invalid partition \"%s\"", str);
- goto end;
- }
- str = mk_list_entry(split->next->next, struct flb_split_entry, _head)->value;
- stop = strtol(str, &end, 10);
- if (end == str || *end != '\0') {
- flb_error("[flb_kafka] invalid partition \"%s\"", str);
- goto end;
- }
- rd_kafka_topic_partition_list_add_range(list, topic_str, start, stop);
- } else {
- flb_error("[flb_kafka] invalid partition range string \"%s\"", partitions_str);
- goto end;
- }
-
- ret = 0;
-
-end:
- if (split) {
- flb_utils_split_free(split);
- }
- return ret;
-}
-
-rd_kafka_topic_partition_list_t *flb_kafka_parse_topics(const char *topics_str)
-{
- rd_kafka_topic_partition_list_t *ret;
- struct mk_list *split = NULL;
- struct mk_list *partitions = NULL;
- struct mk_list *curr;
- struct flb_split_entry *entry;
- struct flb_split_entry *topic_entry;
- struct flb_split_entry *partitions_entry;
- size_t len;
-
- ret = rd_kafka_topic_partition_list_new(1);
- if (!ret) {
- flb_error("[flb_kafka] Failed to allocate topic list");
- goto err;
- }
-
- split = flb_utils_split(topics_str, ',', -1);
- if (!split) {
- flb_error("[flb_kafka] Failed to split topics string");
- goto err;
- }
-
- mk_list_foreach(curr, split) {
- entry = mk_list_entry(curr, struct flb_split_entry, _head);
- partitions = flb_utils_split(entry->value, ':', -1);
- if (!partitions) {
- flb_error("[flb_kafka] Failed to split topic string");
- goto err;
- }
- len = mk_list_size(partitions);
- if (len == 1) {
- rd_kafka_topic_partition_list_add(ret, entry->value, 0);
- } else if (len == 2) {
- topic_entry = mk_list_entry(
- partitions->next, struct flb_split_entry, _head);
- partitions_entry = mk_list_entry(
- partitions->next->next, struct flb_split_entry, _head);
- if (add_topic_partitions(ret, topic_entry->value, partitions_entry->value)) {
- goto err;
- }
- } else {
- flb_error("[flb_kafka] Failed to parse topic/partition string");
- goto err;
- }
- flb_utils_split_free(partitions);
- }
- flb_utils_split_free(split);
- return ret;
-
-err:
- if (ret) {
- rd_kafka_topic_partition_list_destroy(ret);
- }
- if (split) {
- flb_utils_split_free(split);
- }
- if (partitions) {
- flb_utils_split_free(partitions);
- }
- return NULL;
-}