diff options
Diffstat (limited to 'fluent-bit/plugins/out_kafka')
-rw-r--r-- | fluent-bit/plugins/out_kafka/CMakeLists.txt | 8 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kafka/kafka.c | 658 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kafka/kafka_callbacks.h | 31 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kafka/kafka_config.c | 253 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kafka/kafka_config.h | 129 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kafka/kafka_topic.c | 120 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kafka/kafka_topic.h | 34 |
7 files changed, 1233 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_kafka/CMakeLists.txt b/fluent-bit/plugins/out_kafka/CMakeLists.txt new file mode 100644 index 000000000..526910d49 --- /dev/null +++ b/fluent-bit/plugins/out_kafka/CMakeLists.txt @@ -0,0 +1,8 @@ +# Fluent Bit Kafka Output plugin +set(src + kafka_config.c + kafka_topic.c + kafka.c) + +FLB_PLUGIN(out_kafka "${src}" "rdkafka") +target_link_libraries(flb-plugin-out_kafka -lpthread) diff --git a/fluent-bit/plugins/out_kafka/kafka.c b/fluent-bit/plugins/out_kafka/kafka.c new file mode 100644 index 000000000..ff700a687 --- /dev/null +++ b/fluent-bit/plugins/out_kafka/kafka.c @@ -0,0 +1,658 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_log_event_decoder.h> + +#include "kafka_config.h" +#include "kafka_topic.h" + +void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, + void *opaque) +{ + struct flb_out_kafka *ctx = (struct flb_out_kafka *) opaque; + + if (rkmessage->err) { + flb_plg_warn(ctx->ins, "message delivery failed: %s", + rd_kafka_err2str(rkmessage->err)); + } + else { + flb_plg_debug(ctx->ins, "message delivered (%zd bytes, " + "partition %"PRId32")", + rkmessage->len, rkmessage->partition); + } +} + +void cb_kafka_logger(const rd_kafka_t *rk, int level, + const char *fac, const char *buf) +{ + struct flb_out_kafka *ctx; + + ctx = (struct flb_out_kafka *) rd_kafka_opaque(rk); + + if (level <= FLB_KAFKA_LOG_ERR) { + flb_plg_error(ctx->ins, "%s: %s", + rk ? rd_kafka_name(rk) : NULL, buf); + } + else if (level == FLB_KAFKA_LOG_WARNING) { + flb_plg_warn(ctx->ins, "%s: %s", + rk ? rd_kafka_name(rk) : NULL, buf); + } + else if (level == FLB_KAFKA_LOG_NOTICE || level == FLB_KAFKA_LOG_INFO) { + flb_plg_info(ctx->ins, "%s: %s", + rk ? rd_kafka_name(rk) : NULL, buf); + } + else if (level == FLB_KAFKA_LOG_DEBUG) { + flb_plg_debug(ctx->ins, "%s: %s", + rk ? rd_kafka_name(rk) : NULL, buf); + } +} + +static int cb_kafka_init(struct flb_output_instance *ins, + struct flb_config *config, + void *data) +{ + struct flb_out_kafka *ctx; + + /* Configuration */ + ctx = flb_out_kafka_create(ins, config); + if (!ctx) { + flb_plg_error(ins, "failed to initialize"); + return -1; + } + + /* Set global context */ + flb_output_set_context(ins, ctx); + return 0; +} + +int produce_message(struct flb_time *tm, msgpack_object *map, + struct flb_out_kafka *ctx, struct flb_config *config) +{ + int i; + int ret; + int size; + int queue_full_retries = 0; + char *out_buf; + size_t out_size; + struct mk_list *head; + struct mk_list *topics; + struct flb_split_entry *entry; + char *dynamic_topic; + char *message_key = NULL; + size_t message_key_len = 0; + struct flb_kafka_topic *topic = NULL; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + msgpack_object key; + msgpack_object val; + flb_sds_t s; + +#ifdef FLB_HAVE_AVRO_ENCODER + // used to flag when a buffer needs to be freed for avro + bool avro_fast_buffer = true; + + // avro encoding uses a buffer + // the majority of lines are fairly small + // so using static buffer for these is much more efficient + // larger sizes will allocate +#ifndef AVRO_DEFAULT_BUFFER_SIZE +#define AVRO_DEFAULT_BUFFER_SIZE 2048 +#endif + static char avro_buff[AVRO_DEFAULT_BUFFER_SIZE]; + + // don't take lines that are too large + // these lines will log a warning + // this roughly a log line of 250000 chars +#ifndef AVRO_LINE_MAX_LEN +#define AVRO_LINE_MAX_LEN 1000000 + + // this is a convenience +#define AVRO_FREE(X, Y) if (!X) { flb_free(Y); } +#endif + + // this is just to keep the code cleaner + // the avro encoding includes + // an embedded schemaid which is used + // the embedding is a null byte + // followed by a 16 byte schemaid +#define AVRO_SCHEMA_OVERHEAD 16 + 1 +#endif + + flb_debug("in produce_message\n"); + if (flb_log_check(FLB_LOG_DEBUG)) + msgpack_object_print(stderr, *map); + + /* Init temporal buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) { + /* Make room for the timestamp */ + size = map->via.map.size + 1; + msgpack_pack_map(&mp_pck, size); + + /* Pack timestamp */ + msgpack_pack_str(&mp_pck, ctx->timestamp_key_len); + msgpack_pack_str_body(&mp_pck, + ctx->timestamp_key, ctx->timestamp_key_len); + switch (ctx->timestamp_format) { + case FLB_JSON_DATE_DOUBLE: + msgpack_pack_double(&mp_pck, flb_time_to_double(tm)); + break; + + case FLB_JSON_DATE_ISO8601: + case FLB_JSON_DATE_ISO8601_NS: + { + size_t date_len; + int len; + struct tm _tm; + char time_formatted[36]; + + /* Format the time; use microsecond precision (not nanoseconds). */ + gmtime_r(&tm->tm.tv_sec, &_tm); + date_len = strftime(time_formatted, sizeof(time_formatted) - 1, + FLB_JSON_DATE_ISO8601_FMT, &_tm); + + if (ctx->timestamp_format == FLB_JSON_DATE_ISO8601) { + len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len, + ".%06" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec / 1000); + } + else { + /* FLB_JSON_DATE_ISO8601_NS */ + len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len, + ".%09" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec); + } + date_len += len; + + msgpack_pack_str(&mp_pck, date_len); + msgpack_pack_str_body(&mp_pck, time_formatted, date_len); + } + break; + } + } + else { + size = map->via.map.size; + msgpack_pack_map(&mp_pck, size); + } + + for (i = 0; i < map->via.map.size; i++) { + key = map->via.map.ptr[i].key; + val = map->via.map.ptr[i].val; + + msgpack_pack_object(&mp_pck, key); + msgpack_pack_object(&mp_pck, val); + + /* Lookup message key */ + if (ctx->message_key_field && !message_key && val.type == MSGPACK_OBJECT_STR) { + if (key.via.str.size == ctx->message_key_field_len && + strncmp(key.via.str.ptr, ctx->message_key_field, ctx->message_key_field_len) == 0) { + message_key = (char *) val.via.str.ptr; + message_key_len = val.via.str.size; + } + } + + /* Lookup key/topic */ + if (ctx->topic_key && !topic && val.type == MSGPACK_OBJECT_STR) { + if (key.via.str.size == ctx->topic_key_len && + strncmp(key.via.str.ptr, ctx->topic_key, ctx->topic_key_len) == 0) { + topic = flb_kafka_topic_lookup((char *) val.via.str.ptr, + val.via.str.size, ctx); + /* Add extracted topic on the fly to topiclist */ + if (ctx->dynamic_topic) { + /* Only if default topic is set and this topicname is not set for this message */ + if (strncmp(topic->name, flb_kafka_topic_default(ctx)->name, val.via.str.size) == 0 && + (strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) { + if (memchr(val.via.str.ptr, ',', val.via.str.size)) { + /* Don't allow commas in kafkatopic name */ + flb_warn("',' not allowed in dynamic_kafka topic names"); + continue; + } + if (val.via.str.size > 64) { + /* Don't allow length of dynamic kafka topics > 64 */ + flb_warn(" dynamic kafka topic length > 64 not allowed"); + continue; + } + dynamic_topic = flb_malloc(val.via.str.size + 1); + if (!dynamic_topic) { + /* Use default topic */ + flb_errno(); + continue; + } + strncpy(dynamic_topic, val.via.str.ptr, val.via.str.size); + dynamic_topic[val.via.str.size] = '\0'; + topics = flb_utils_split(dynamic_topic, ',', 0); + if (!topics) { + /* Use the default topic */ + flb_errno(); + flb_free(dynamic_topic); + continue; + } + mk_list_foreach(head, topics) { + /* Add the (one) found topicname to the topic configuration */ + entry = mk_list_entry(head, struct flb_split_entry, _head); + topic = flb_kafka_topic_create(entry->value, ctx); + if (!topic) { + /* Use default topic */ + flb_error("[out_kafka] cannot register topic '%s'", + entry->value); + topic = flb_kafka_topic_lookup((char *) val.via.str.ptr, + val.via.str.size, ctx); + } + else { + flb_info("[out_kafka] new topic added: %s", dynamic_topic); + } + } + flb_free(dynamic_topic); + } + } + } + } + } + + if (ctx->format == FLB_KAFKA_FMT_JSON) { + s = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + if (!s) { + flb_plg_error(ctx->ins, "error encoding to JSON"); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_ERROR; + } + out_buf = s; + out_size = flb_sds_len(out_buf); + } + else if (ctx->format == FLB_KAFKA_FMT_MSGP) { + out_buf = mp_sbuf.data; + out_size = mp_sbuf.size; + } + else if (ctx->format == FLB_KAFKA_FMT_GELF) { + s = flb_msgpack_raw_to_gelf(mp_sbuf.data, mp_sbuf.size, + tm, &(ctx->gelf_fields)); + if (s == NULL) { + flb_plg_error(ctx->ins, "error encoding to GELF"); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_ERROR; + } + out_buf = s; + out_size = flb_sds_len(s); + } +#ifdef FLB_HAVE_AVRO_ENCODER + else if (ctx->format == FLB_KAFKA_FMT_AVRO) { + + flb_plg_debug(ctx->ins, "avro schema ID:%s:\n", ctx->avro_fields.schema_id); + flb_plg_debug(ctx->ins, "avro schema string:%s:\n", ctx->avro_fields.schema_str); + + // if there's no data then log it and return + if (mp_sbuf.size == 0) { + flb_plg_error(ctx->ins, "got zero bytes decoding to avro AVRO:schemaID:%s:\n", ctx->avro_fields.schema_id); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_OK; + } + + // is the line is too long log it and return + if (mp_sbuf.size > AVRO_LINE_MAX_LEN) { + flb_plg_warn(ctx->ins, "skipping long line AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_LINE_MAX_LEN, ctx->avro_fields.schema_id); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_OK; + } + + flb_plg_debug(ctx->ins, "using default buffer AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_DEFAULT_BUFFER_SIZE, ctx->avro_fields.schema_id); + out_buf = avro_buff; + out_size = AVRO_DEFAULT_BUFFER_SIZE; + + if (mp_sbuf.size + AVRO_SCHEMA_OVERHEAD >= AVRO_DEFAULT_BUFFER_SIZE) { + flb_plg_info(ctx->ins, "upsizing to dynamic buffer AVRO:len:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, ctx->avro_fields.schema_id); + avro_fast_buffer = false; + // avro will always be smaller than msgpack + // it contains no meta-info aside from the schemaid + // all the metadata is in the schema which is not part of the msg + // add schemaid + magic byte for safety buffer and allocate + // that's 16 byte schemaid and one byte magic byte + out_size = mp_sbuf.size + AVRO_SCHEMA_OVERHEAD; + out_buf = flb_malloc(out_size); + if (!out_buf) { + flb_plg_error(ctx->ins, "error allocating memory for decoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_ERROR; + } + } + + if(!flb_msgpack_raw_to_avro_sds(mp_sbuf.data, mp_sbuf.size, &ctx->avro_fields, out_buf, &out_size)) { + flb_plg_error(ctx->ins, "error encoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id); + msgpack_sbuffer_destroy(&mp_sbuf); + if (!avro_fast_buffer) { + flb_free(out_buf); + } + return FLB_ERROR; + } + + } +#endif + + if (!message_key) { + message_key = ctx->message_key; + message_key_len = ctx->message_key_len; + } + + if (!topic) { + topic = flb_kafka_topic_default(ctx); + } + if (!topic) { + flb_plg_error(ctx->ins, "no default topic found"); + msgpack_sbuffer_destroy(&mp_sbuf); +#ifdef FLB_HAVE_AVRO_ENCODER + if (ctx->format == FLB_KAFKA_FMT_AVRO) { + AVRO_FREE(avro_fast_buffer, out_buf) + } +#endif + return FLB_ERROR; + } + + retry: + /* + * If the local rdkafka queue is full, we retry up to 'queue_full_retries' + * times set by the configuration (default: 10). If the configuration + * property was set to '0' or 'false', we don't impose a limit. Use that + * value under your own risk. + */ + if (ctx->queue_full_retries > 0 && + queue_full_retries >= ctx->queue_full_retries) { + if (ctx->format != FLB_KAFKA_FMT_MSGP) { + flb_sds_destroy(s); + } + msgpack_sbuffer_destroy(&mp_sbuf); +#ifdef FLB_HAVE_AVRO_ENCODER + if (ctx->format == FLB_KAFKA_FMT_AVRO) { + AVRO_FREE(avro_fast_buffer, out_buf) + } +#endif + /* + * Unblock the flush requests so that the + * engine could try sending data again. + */ + ctx->blocked = FLB_FALSE; + return FLB_RETRY; + } + + ret = rd_kafka_produce(topic->tp, + RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY, + out_buf, out_size, + message_key, message_key_len, + ctx); + + if (ret == -1) { + flb_error( + "%% Failed to produce to topic %s: %s\n", + rd_kafka_topic_name(topic->tp), + rd_kafka_err2str(rd_kafka_last_error())); + + /* + * rdkafka queue is full, keep trying 'locally' for a few seconds, + * otherwise let the caller to issue a main retry againt the engine. + */ + if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) { + flb_plg_warn(ctx->ins, + "internal queue is full, retrying in one second"); + + /* + * If the queue is full, first make sure to discard any further + * flush request from the engine. This means 'the caller will + * issue a retry at a later time'. + */ + ctx->blocked = FLB_TRUE; + + /* + * Next step is to give it some time to the background rdkafka + * library to do it own work. By default rdkafka wait 1 second + * or up to 10000 messages to be enqueued before delivery. + * + * If the kafka broker is down we should try a couple of times + * to enqueue this message, if we exceed 10 times, we just + * issue a full retry of the data chunk. + */ + flb_time_sleep(1000); + rd_kafka_poll(ctx->kafka.rk, 0); + + /* Issue a re-try */ + queue_full_retries++; + goto retry; + } + } + else { + flb_plg_debug(ctx->ins, "enqueued message (%zd bytes) for topic '%s'", + out_size, rd_kafka_topic_name(topic->tp)); + } + ctx->blocked = FLB_FALSE; + + rd_kafka_poll(ctx->kafka.rk, 0); + if (ctx->format == FLB_KAFKA_FMT_JSON) { + flb_sds_destroy(s); + } + if (ctx->format == FLB_KAFKA_FMT_GELF) { + flb_sds_destroy(s); + } +#ifdef FLB_HAVE_AVRO_ENCODER + if (ctx->format == FLB_KAFKA_FMT_AVRO) { + AVRO_FREE(avro_fast_buffer, out_buf) + } +#endif + + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_OK; +} + +static void cb_kafka_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + + int ret; + struct flb_out_kafka *ctx = out_context; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + /* + * If the context is blocked, means rdkafka queue is full and no more + * messages can be appended. For our called (Fluent Bit engine) means + * that is not possible to work on this now and it need to 'retry'. + */ + if (ctx->blocked == FLB_TRUE) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + ret = flb_log_event_decoder_init(&log_decoder, + (char *) event_chunk->data, + event_chunk->size); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Iterate the original buffer and perform adjustments */ + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + ret = produce_message(&log_event.timestamp, + log_event.body, + ctx, config); + + if (ret != FLB_OK) { + flb_log_event_decoder_destroy(&log_decoder); + + FLB_OUTPUT_RETURN(ret); + } + } + + flb_log_event_decoder_destroy(&log_decoder); + + FLB_OUTPUT_RETURN(FLB_OK); +} + +static void kafka_flush_force(struct flb_out_kafka *ctx, + struct flb_config *config) +{ + int ret; + + if (!ctx) { + return; + } + + if (ctx->kafka.rk) { + ret = rd_kafka_flush(ctx->kafka.rk, config->grace * 1000); + if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) { + flb_plg_warn(ctx->ins, "Failed to force flush: %s", + rd_kafka_err2str(ret)); + } + } +} + +static int cb_kafka_exit(void *data, struct flb_config *config) +{ + struct flb_out_kafka *ctx = data; + + kafka_flush_force(ctx, config); + flb_out_kafka_destroy(ctx); + return 0; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "topic_key", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, topic_key), + "Which record to use as the kafka topic." + }, + { + FLB_CONFIG_MAP_BOOL, "dynamic_topic", "false", + 0, FLB_TRUE, offsetof(struct flb_out_kafka, dynamic_topic), + "Activate dynamic topics." + }, + { + FLB_CONFIG_MAP_STR, "format", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, format_str), + "Set the record output format." + }, + { + FLB_CONFIG_MAP_STR, "message_key", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, message_key), + "Which record key to use as the message data." + }, + { + FLB_CONFIG_MAP_STR, "message_key_field", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, message_key_field), + "Which record key field to use as the message data." + }, + { + FLB_CONFIG_MAP_STR, "timestamp_key", FLB_KAFKA_TS_KEY, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_key), + "Set the key for the the timestamp." + }, + { + FLB_CONFIG_MAP_STR, "timestamp_format", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_format_str), + "Set the format the timestamp is in." + }, + { + FLB_CONFIG_MAP_INT, "queue_full_retries", FLB_KAFKA_QUEUE_FULL_RETRIES, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, queue_full_retries), + "Set the number of local retries to enqueue the data." + }, + { + FLB_CONFIG_MAP_STR, "gelf_timestamp_key", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the timestamp key for gelf output." + }, + { + FLB_CONFIG_MAP_STR, "gelf_host_key", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the host key for gelf output." + }, + { + FLB_CONFIG_MAP_STR, "gelf_short_message_key", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the short message key for gelf output." + }, + { + FLB_CONFIG_MAP_STR, "gelf_full_message_key", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the full message key for gelf output." + }, + { + FLB_CONFIG_MAP_STR, "gelf_level_key", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the level key for gelf output." + }, +#ifdef FLB_HAVE_AVRO_ENCODER + { + FLB_CONFIG_MAP_STR, "schema_str", (char *)NULL, + 0, FLB_FALSE, 0, + "Set AVRO schema." + }, + { + FLB_CONFIG_MAP_STR, "schema_id", (char *)NULL, + 0, FLB_FALSE, 0, + "Set AVRO schema ID." + }, +#endif + { + FLB_CONFIG_MAP_STR, "topics", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the kafka topics, delimited by commas." + }, + { + FLB_CONFIG_MAP_STR, "brokers", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the kafka brokers, delimited by commas." + }, + { + FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the kafka client_id." + }, + { + FLB_CONFIG_MAP_STR, "group_id", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the kafka group_id." + }, + { + FLB_CONFIG_MAP_STR_PREFIX, "rdkafka.", NULL, + //FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_kafka, rdkafka_opts), + 0, FLB_FALSE, 0, + "Set the kafka group_id." + }, + /* EOF */ + {0} +}; + +struct flb_output_plugin out_kafka_plugin = { + .name = "kafka", + .description = "Kafka", + .cb_init = cb_kafka_init, + .cb_flush = cb_kafka_flush, + .cb_exit = cb_kafka_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/fluent-bit/plugins/out_kafka/kafka_callbacks.h b/fluent-bit/plugins/out_kafka/kafka_callbacks.h new file mode 100644 index 000000000..f496cba8c --- /dev/null +++ b/fluent-bit/plugins/out_kafka/kafka_callbacks.h @@ -0,0 +1,31 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_KAFKA_CALLBACKS_H +#define FLB_OUT_KAFKA_CALLBACKS_H + +#include "rdkafka.h" + +void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, + void *opaque); + +void cb_kafka_logger(const rd_kafka_t *rk, int level, + const char *fac, const char *buf); + +#endif diff --git a/fluent-bit/plugins/out_kafka/kafka_config.c b/fluent-bit/plugins/out_kafka/kafka_config.c new file mode 100644 index 000000000..3c00f3682 --- /dev/null +++ b/fluent-bit/plugins/out_kafka/kafka_config.c @@ -0,0 +1,253 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_kv.h> +#include <fluent-bit/flb_utils.h> + +#include "kafka_config.h" +#include "kafka_topic.h" +#include "kafka_callbacks.h" + +struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret; + const char *tmp; + char errstr[512]; + struct mk_list *head; + struct mk_list *topics; + struct flb_split_entry *entry; + struct flb_out_kafka *ctx; + + /* Configuration context */ + ctx = flb_calloc(1, sizeof(struct flb_out_kafka)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + ctx->blocked = FLB_FALSE; + + ret = flb_output_config_map_set(ins, (void*) ctx); + if (ret == -1) { + flb_plg_error(ins, "unable to load configuration."); + flb_free(ctx); + + return NULL; + } + + /* rdkafka config context */ + ctx->conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 0); + if (!ctx->conf) { + flb_plg_error(ctx->ins, "error creating context"); + flb_free(ctx); + return NULL; + } + + /* Set our global opaque data (plugin context*/ + rd_kafka_conf_set_opaque(ctx->conf, ctx); + + /* Callback: message delivery */ + rd_kafka_conf_set_dr_msg_cb(ctx->conf, cb_kafka_msg); + + /* Callback: log */ + rd_kafka_conf_set_log_cb(ctx->conf, cb_kafka_logger); + + /* Config: Topic_Key */ + if (ctx->topic_key) { + ctx->topic_key_len = strlen(ctx->topic_key); + } + + /* Config: Format */ + if (ctx->format_str) { + if (strcasecmp(ctx->format_str, "json") == 0) { + ctx->format = FLB_KAFKA_FMT_JSON; + } + else if (strcasecmp(ctx->format_str, "msgpack") == 0) { + ctx->format = FLB_KAFKA_FMT_MSGP; + } + else if (strcasecmp(ctx->format_str, "gelf") == 0) { + ctx->format = FLB_KAFKA_FMT_GELF; + } +#ifdef FLB_HAVE_AVRO_ENCODER + else if (strcasecmp(ctx->format_str, "avro") == 0) { + ctx->format = FLB_KAFKA_FMT_AVRO; + } +#endif + } + else { + ctx->format = FLB_KAFKA_FMT_JSON; + } + + /* Config: Message_Key */ + if (ctx->message_key) { + ctx->message_key_len = strlen(ctx->message_key); + } + else { + ctx->message_key_len = 0; + } + + /* Config: Message_Key_Field */ + if (ctx->message_key_field) { + ctx->message_key_field_len = strlen(ctx->message_key_field); + } + else { + ctx->message_key_field_len = 0; + } + + /* Config: Timestamp_Key */ + if (ctx->timestamp_key) { + ctx->timestamp_key_len = strlen(ctx->timestamp_key); + } + + /* Config: Timestamp_Format */ + ctx->timestamp_format = FLB_JSON_DATE_DOUBLE; + if (ctx->timestamp_format_str) { + if (strcasecmp(ctx->timestamp_format_str, "iso8601") == 0) { + ctx->timestamp_format = FLB_JSON_DATE_ISO8601; + } + else if (strcasecmp(ctx->timestamp_format_str, "iso8601_ns") == 0) { + ctx->timestamp_format = FLB_JSON_DATE_ISO8601_NS; + } + } + + /* set number of retries: note that if the number is zero, means forever */ + if (ctx->queue_full_retries < 0) { + ctx->queue_full_retries = 0; + } + + /* Config Gelf_Short_Message_Key */ + tmp = flb_output_get_property("gelf_short_message_key", ins); + if (tmp) { + ctx->gelf_fields.short_message_key = flb_sds_create(tmp); + } + + /* Config Gelf_Full_Message_Key */ + tmp = flb_output_get_property("gelf_full_message_key", ins); + if (tmp) { + ctx->gelf_fields.full_message_key = flb_sds_create(tmp); + } + + /* Config Gelf_Level_Key */ + tmp = flb_output_get_property("gelf_level_key", ins); + if (tmp) { + ctx->gelf_fields.level_key = flb_sds_create(tmp); + } + + /* Kafka Producer */ + ctx->kafka.rk = rd_kafka_new(RD_KAFKA_PRODUCER, ctx->conf, + errstr, sizeof(errstr)); + if (!ctx->kafka.rk) { + flb_plg_error(ctx->ins, "failed to create producer: %s", + errstr); + flb_out_kafka_destroy(ctx); + return NULL; + } + +#ifdef FLB_HAVE_AVRO_ENCODER + /* Config AVRO */ + tmp = flb_output_get_property("schema_str", ins); + if (tmp) { + ctx->avro_fields.schema_str = flb_sds_create(tmp); + } + tmp = flb_output_get_property("schema_id", ins); + if (tmp) { + ctx->avro_fields.schema_id = flb_sds_create(tmp); + } +#endif + + /* Config: Topic */ + mk_list_init(&ctx->topics); + tmp = flb_output_get_property("topics", ins); + if (!tmp) { + flb_kafka_topic_create(FLB_KAFKA_TOPIC, ctx); + } + else { + topics = flb_utils_split(tmp, ',', -1); + if (!topics) { + flb_plg_warn(ctx->ins, "invalid topics defined, setting default"); + flb_kafka_topic_create(FLB_KAFKA_TOPIC, ctx); + } + else { + /* Register each topic */ + mk_list_foreach(head, topics) { + entry = mk_list_entry(head, struct flb_split_entry, _head); + if (!flb_kafka_topic_create(entry->value, ctx)) { + flb_plg_error(ctx->ins, "cannot register topic '%s'", + entry->value); + } + } + flb_utils_split_free(topics); + } + } + + flb_plg_info(ctx->ins, "brokers='%s' topics='%s'", ctx->kafka.brokers, tmp); +#ifdef FLB_HAVE_AVRO_ENCODER + flb_plg_info(ctx->ins, "schemaID='%s' schema='%s'", ctx->avro_fields.schema_id, ctx->avro_fields.schema_str); +#endif + + return ctx; +} + +int flb_out_kafka_destroy(struct flb_out_kafka *ctx) +{ + if (!ctx) { + return 0; + } + + if (ctx->kafka.brokers) { + flb_free(ctx->kafka.brokers); + } + + flb_kafka_topic_destroy_all(ctx); + + if (ctx->kafka.rk) { + rd_kafka_destroy(ctx->kafka.rk); + } + + if (ctx->topic_key) { + flb_free(ctx->topic_key); + } + + if (ctx->message_key) { + flb_free(ctx->message_key); + } + + if (ctx->message_key_field) { + flb_free(ctx->message_key_field); + } + + flb_sds_destroy(ctx->gelf_fields.timestamp_key); + flb_sds_destroy(ctx->gelf_fields.host_key); + flb_sds_destroy(ctx->gelf_fields.short_message_key); + flb_sds_destroy(ctx->gelf_fields.full_message_key); + flb_sds_destroy(ctx->gelf_fields.level_key); + +#ifdef FLB_HAVE_AVRO_ENCODER + // avro + flb_sds_destroy(ctx->avro_fields.schema_id); + flb_sds_destroy(ctx->avro_fields.schema_str); +#endif + + flb_free(ctx); + return 0; +} diff --git a/fluent-bit/plugins/out_kafka/kafka_config.h b/fluent-bit/plugins/out_kafka/kafka_config.h new file mode 100644 index 000000000..1ef2cce16 --- /dev/null +++ b/fluent-bit/plugins/out_kafka/kafka_config.h @@ -0,0 +1,129 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_KAFKA_CONFIG_H +#define FLB_OUT_KAFKA_CONFIG_H + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_pack.h> +#ifdef FLB_HAVE_AVRO_ENCODER +#include <fluent-bit/flb_avro.h> +#endif + +#include <fluent-bit/flb_kafka.h> + +#define FLB_KAFKA_FMT_JSON 0 +#define FLB_KAFKA_FMT_MSGP 1 +#define FLB_KAFKA_FMT_GELF 2 +#ifdef FLB_HAVE_AVRO_ENCODER +#define FLB_KAFKA_FMT_AVRO 3 +#endif +#define FLB_KAFKA_TS_KEY "@timestamp" +#define FLB_KAFKA_QUEUE_FULL_RETRIES "10" + +/* rdkafka log levels based on syslog(3) */ +#define FLB_KAFKA_LOG_EMERG 0 +#define FLB_KAFKA_LOG_ALERT 1 +#define FLB_KAFKA_LOG_CRIT 2 +#define FLB_KAFKA_LOG_ERR 3 +#define FLB_KAFKA_LOG_WARNING 4 +#define FLB_KAFKA_LOG_NOTICE 5 +#define FLB_KAFKA_LOG_INFO 6 +#define FLB_KAFKA_LOG_DEBUG 7 + +#define FLB_JSON_DATE_DOUBLE 0 +#define FLB_JSON_DATE_ISO8601 1 +#define FLB_JSON_DATE_ISO8601_NS 2 +#define FLB_JSON_DATE_ISO8601_FMT "%Y-%m-%dT%H:%M:%S" + +struct flb_kafka_topic { + int name_len; + char *name; + rd_kafka_topic_t *tp; + struct mk_list _head; +}; + +struct flb_out_kafka { + struct flb_kafka kafka; + /* Config Parameters */ + int format; + flb_sds_t format_str; + + /* Optional topic key for routing */ + int topic_key_len; + char *topic_key; + + int timestamp_key_len; + char *timestamp_key; + int timestamp_format; + flb_sds_t timestamp_format_str; + + int message_key_len; + char *message_key; + + int message_key_field_len; + char *message_key_field; + + /* Gelf Keys */ + struct flb_gelf_fields gelf_fields; + + /* Head of defined topics by configuration */ + struct mk_list topics; + + /* + * Blocked Status: since rdkafka have it own buffering queue, there is a + * chance that the queue becomes full, when that happens our default + * behavior is the following: + * + * - out_kafka yields and try to continue every second until it succeed. In + * the meanwhile blocked flag gets FLB_TRUE value. + * - when flushing more records and blocked == FLB_TRUE, issue + * a retry. + */ + int blocked; + + int dynamic_topic; + + int queue_full_retries; + + /* Internal */ + rd_kafka_conf_t *conf; + + /* Plugin instance */ + struct flb_output_instance *ins; + +#ifdef FLB_HAVE_AVRO_ENCODER + // avro serialization requires a schema + // the schema is stored in json in avro_schema_str + // + // optionally the schema ID can be stashed in the avro data stream + // the schema ID is stored in avro_schema_id + // this is common at this time with large kafka installations and schema registries + // flb_sds_t avro_schema_str; + // flb_sds_t avro_schema_id; + struct flb_avro_fields avro_fields; +#endif + +}; + +struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, + struct flb_config *config); +int flb_out_kafka_destroy(struct flb_out_kafka *ctx); + +#endif diff --git a/fluent-bit/plugins/out_kafka/kafka_topic.c b/fluent-bit/plugins/out_kafka/kafka_topic.c new file mode 100644 index 000000000..2db8698b1 --- /dev/null +++ b/fluent-bit/plugins/out_kafka/kafka_topic.c @@ -0,0 +1,120 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_mem.h> + +#include "kafka_config.h" +#include "rdkafka.h" + +struct flb_kafka_topic *flb_kafka_topic_create(char *name, + struct flb_out_kafka *ctx) +{ + rd_kafka_topic_t *tp; + struct flb_kafka_topic *topic; + + tp = rd_kafka_topic_new(ctx->kafka.rk, name, NULL); + if (!tp) { + flb_plg_error(ctx->ins, "failed to create topic: %s", + rd_kafka_err2str(rd_kafka_last_error())); + return NULL; + } + + topic = flb_malloc(sizeof(struct flb_kafka_topic)); + if (!topic) { + flb_errno(); + return NULL; + } + + topic->name = flb_strdup(name); + topic->name_len = strlen(name); + topic->tp = tp; + mk_list_add(&topic->_head, &ctx->topics); + + return topic; +} + +int flb_kafka_topic_destroy(struct flb_kafka_topic *topic, + struct flb_out_kafka *ctx) +{ + mk_list_del(&topic->_head); + rd_kafka_topic_destroy(topic->tp); + flb_free(topic->name); + flb_free(topic); + + return 0; +} + +int flb_kafka_topic_destroy_all(struct flb_out_kafka *ctx) +{ + int c = 0; + struct mk_list *tmp; + struct mk_list *head; + struct flb_kafka_topic *topic; + + mk_list_foreach_safe(head, tmp, &ctx->topics) { + topic = mk_list_entry(head, struct flb_kafka_topic, _head); + flb_kafka_topic_destroy(topic, ctx); + c++; + } + + return c; +} + +/* Get first topic of the list (default topic) */ +struct flb_kafka_topic *flb_kafka_topic_default(struct flb_out_kafka *ctx) +{ + struct flb_kafka_topic *topic; + + if (mk_list_is_empty(&ctx->topics) == 0) { + return NULL; + } + + topic = mk_list_entry_first(&ctx->topics, struct flb_kafka_topic, + _head); + return topic; +} + +struct flb_kafka_topic *flb_kafka_topic_lookup(char *name, + int name_len, + struct flb_out_kafka *ctx) +{ + struct mk_list *head; + struct flb_kafka_topic *topic; + + if (!ctx->topic_key) { + return flb_kafka_topic_default(ctx); + } + + mk_list_foreach(head, &ctx->topics) { + topic = mk_list_entry(head, struct flb_kafka_topic, _head); + if (topic->name_len != name_len) { + continue; + } + + if (strncmp(name, topic->name, topic->name_len) == 0) { + return topic; + } + } + + /* No matches, return the default topic */ + return flb_kafka_topic_default(ctx); + +} diff --git a/fluent-bit/plugins/out_kafka/kafka_topic.h b/fluent-bit/plugins/out_kafka/kafka_topic.h new file mode 100644 index 000000000..9b1203b96 --- /dev/null +++ b/fluent-bit/plugins/out_kafka/kafka_topic.h @@ -0,0 +1,34 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_KAFKA_TOPIC_H +#define FLB_KAFKA_TOPIC_H + +struct flb_kafka_topic *flb_kafka_topic_create(char *name, + struct flb_out_kafka *ctx); +int flb_kafka_topic_destroy(struct flb_kafka_topic *topic, + struct flb_out_kafka *ctx); +int flb_kafka_topic_destroy_all(struct flb_out_kafka *ctx); +struct flb_kafka_topic *flb_kafka_topic_default(struct flb_out_kafka *ctx); + +struct flb_kafka_topic *flb_kafka_topic_lookup(char *name, + int name_len, + struct flb_out_kafka *ctx); + +#endif |