diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_kafka/kafka_topic.c')
-rw-r--r-- | src/fluent-bit/plugins/out_kafka/kafka_topic.c | 120 |
1 files changed, 0 insertions, 120 deletions
diff --git a/src/fluent-bit/plugins/out_kafka/kafka_topic.c b/src/fluent-bit/plugins/out_kafka/kafka_topic.c deleted file mode 100644 index 2db8698b1..000000000 --- a/src/fluent-bit/plugins/out_kafka/kafka_topic.c +++ /dev/null @@ -1,120 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_log.h> -#include <fluent-bit/flb_mem.h> - -#include "kafka_config.h" -#include "rdkafka.h" - -struct flb_kafka_topic *flb_kafka_topic_create(char *name, - struct flb_out_kafka *ctx) -{ - rd_kafka_topic_t *tp; - struct flb_kafka_topic *topic; - - tp = rd_kafka_topic_new(ctx->kafka.rk, name, NULL); - if (!tp) { - flb_plg_error(ctx->ins, "failed to create topic: %s", - rd_kafka_err2str(rd_kafka_last_error())); - return NULL; - } - - topic = flb_malloc(sizeof(struct flb_kafka_topic)); - if (!topic) { - flb_errno(); - return NULL; - } - - topic->name = flb_strdup(name); - topic->name_len = strlen(name); - topic->tp = tp; - mk_list_add(&topic->_head, &ctx->topics); - - return topic; -} - -int flb_kafka_topic_destroy(struct flb_kafka_topic *topic, - struct flb_out_kafka *ctx) -{ - mk_list_del(&topic->_head); - rd_kafka_topic_destroy(topic->tp); - flb_free(topic->name); - flb_free(topic); - - return 0; -} - -int flb_kafka_topic_destroy_all(struct flb_out_kafka *ctx) -{ - int c = 0; - struct mk_list *tmp; - struct mk_list *head; - struct flb_kafka_topic *topic; - - mk_list_foreach_safe(head, tmp, &ctx->topics) { - topic = mk_list_entry(head, struct flb_kafka_topic, _head); - flb_kafka_topic_destroy(topic, ctx); - c++; - } - - return c; -} - -/* Get first topic of the list (default topic) */ -struct flb_kafka_topic *flb_kafka_topic_default(struct flb_out_kafka *ctx) -{ - struct flb_kafka_topic *topic; - - if (mk_list_is_empty(&ctx->topics) == 0) { - return NULL; - } - - topic = mk_list_entry_first(&ctx->topics, struct flb_kafka_topic, - _head); - return topic; -} - -struct flb_kafka_topic *flb_kafka_topic_lookup(char *name, - int name_len, - struct flb_out_kafka *ctx) -{ - struct mk_list *head; - struct flb_kafka_topic *topic; - - if (!ctx->topic_key) { - return flb_kafka_topic_default(ctx); - } - - mk_list_foreach(head, &ctx->topics) { - topic = mk_list_entry(head, struct flb_kafka_topic, _head); - if (topic->name_len != name_len) { - continue; - } - - if (strncmp(name, topic->name, topic->name_len) == 0) { - return topic; - } - } - - /* No matches, return the default topic */ - return flb_kafka_topic_default(ctx); - -} |