diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/plugins/out_kafka_rest/kafka.c | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/out_kafka_rest/kafka.c')
-rw-r--r-- | fluent-bit/plugins/out_kafka_rest/kafka.c | 351 |
1 files changed, 351 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_kafka_rest/kafka.c b/fluent-bit/plugins/out_kafka_rest/kafka.c new file mode 100644 index 000000000..f3b6153a6 --- /dev/null +++ b/fluent-bit/plugins/out_kafka_rest/kafka.c @@ -0,0 +1,351 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_config_map.h> +#include <fluent-bit/flb_log_event_decoder.h> +#include <msgpack.h> + +#include "kafka.h" +#include "kafka_conf.h" + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "message_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, message_key), + "Specify a message key. " + }, + + { + FLB_CONFIG_MAP_STR, "time_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, time_key), + "Specify the name of the field that holds the record timestamp. " + }, + + { + FLB_CONFIG_MAP_STR, "topic", "fluent-bit", + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, topic), + "Specify the kafka topic. " + }, + + { + FLB_CONFIG_MAP_STR, "url_path", NULL, + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, url_path), + "Specify an optional HTTP URL path for the target web server, e.g: /something" + }, + + { + FLB_CONFIG_MAP_DOUBLE, "partition", "-1", + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, partition), + "Specify kafka partition number. " + }, + + { + FLB_CONFIG_MAP_STR, "time_key_format", FLB_KAFKA_TIME_KEYF, + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, time_key_format), + "Specify the format of the timestamp. " + }, + + { + FLB_CONFIG_MAP_BOOL, "include_tag_key", "false", + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, include_tag_key), + "Specify whether to append tag name to final record. " + }, + + { + FLB_CONFIG_MAP_STR, "tag_key", "_flb-key", + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, tag_key), + "Specify the key name of the record if include_tag_key is enabled. " + }, + { + FLB_CONFIG_MAP_BOOL, "avro_http_header", "false", + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, avro_http_header), + "Specify if the format has avro header in http request" + }, + + /* EOF */ + {0} +}; +/* + * Convert the internal Fluent Bit data representation to the required + * one by Kafka REST Proxy. + */ +static flb_sds_t kafka_rest_format(const void *data, size_t bytes, + const char *tag, int tag_len, + size_t *out_size, + struct flb_kafka_rest *ctx) +{ + int i; + int len; + int arr_size = 0; + int map_size; + size_t s; + flb_sds_t out_buf; + char time_formatted[256]; + msgpack_object map; + msgpack_object key; + msgpack_object val; + struct tm tm; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + int ret; + + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + return NULL; + } + + /* Init temporary buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Count number of entries */ + arr_size = flb_mp_count(data, bytes); + + /* Root map */ + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str(&mp_pck, 7); + msgpack_pack_str_body(&mp_pck, "records", 7); + + msgpack_pack_array(&mp_pck, arr_size); + + /* Iterate and compose array content */ + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + map = *log_event.body; + map_size = 1; + + if (ctx->partition >= 0) { + map_size++; + } + + if (ctx->message_key != NULL) { + map_size++; + } + + msgpack_pack_map(&mp_pck, map_size); + if (ctx->partition >= 0) { + msgpack_pack_str(&mp_pck, 9); + msgpack_pack_str_body(&mp_pck, "partition", 9); + msgpack_pack_int64(&mp_pck, ctx->partition); + } + + + if (ctx->message_key != NULL) { + msgpack_pack_str(&mp_pck, 3); + msgpack_pack_str_body(&mp_pck, "key", 3); + msgpack_pack_str(&mp_pck, ctx->message_key_len); + msgpack_pack_str_body(&mp_pck, ctx->message_key, ctx->message_key_len); + } + + /* Value Map Size */ + map_size = map.via.map.size; + map_size++; + if (ctx->include_tag_key == FLB_TRUE) { + map_size++; + } + + msgpack_pack_str(&mp_pck, 5); + msgpack_pack_str_body(&mp_pck, "value", 5); + + msgpack_pack_map(&mp_pck, map_size); + + /* Time key and time formatted */ + msgpack_pack_str(&mp_pck, ctx->time_key_len); + msgpack_pack_str_body(&mp_pck, ctx->time_key, ctx->time_key_len); + + /* Format the time */ + gmtime_r(&log_event.timestamp.tm.tv_sec, &tm); + s = strftime(time_formatted, sizeof(time_formatted) - 1, + ctx->time_key_format, &tm); + len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, + ".%09" PRIu64 "Z", (uint64_t) log_event.timestamp.tm.tv_nsec); + s += len; + msgpack_pack_str(&mp_pck, s); + msgpack_pack_str_body(&mp_pck, time_formatted, s); + + /* Tag Key */ + if (ctx->include_tag_key == FLB_TRUE) { + msgpack_pack_str(&mp_pck, ctx->tag_key_len); + msgpack_pack_str_body(&mp_pck, ctx->tag_key, ctx->tag_key_len); + msgpack_pack_str(&mp_pck, tag_len); + msgpack_pack_str_body(&mp_pck, tag, tag_len); + } + + for (i = 0; i < map.via.map.size; i++) { + key = map.via.map.ptr[i].key; + val = map.via.map.ptr[i].val; + + msgpack_pack_object(&mp_pck, key); + msgpack_pack_object(&mp_pck, val); + } + } + flb_log_event_decoder_destroy(&log_decoder); + + /* Convert to JSON */ + out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); + if (!out_buf) { + return NULL; + } + + *out_size = flb_sds_len(out_buf); + + return out_buf; +} + +static int cb_kafka_init(struct flb_output_instance *ins, + struct flb_config *config, + void *data) +{ + (void) ins; + (void) config; + (void) data; + struct flb_kafka_rest *ctx; + + ctx = flb_kr_conf_create(ins, config); + if (!ctx) { + flb_plg_error(ins, "cannot initialize plugin"); + return -1; + } + + flb_plg_debug(ctx->ins, "host=%s port=%i", + ins->host.name, ins->host.port); + flb_output_set_context(ins, ctx); + + return 0; +} + +static void cb_kafka_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + flb_sds_t js; + size_t js_size; + size_t b_sent; + struct flb_http_client *c; + struct flb_connection *u_conn; + struct flb_kafka_rest *ctx = out_context; + (void) i_ins; + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Convert format */ + js = kafka_rest_format(event_chunk->data, event_chunk->size, + event_chunk->tag, flb_sds_len(event_chunk->tag), + &js_size, ctx); + if (!js) { + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Compose HTTP Client request */ + c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, + js, js_size, NULL, 0, NULL, 0); + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + if (ctx->avro_http_header == FLB_TRUE) { + flb_http_add_header(c, + "Content-Type", 12, + "application/vnd.kafka.avro.v2+json", 34); + } + else { + flb_http_add_header(c, + "Content-Type", 12, + "application/vnd.kafka.json.v2+json", 34); + } + + if (ctx->http_user && ctx->http_passwd) { + flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); + } + + ret = flb_http_do(c, &b_sent); + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i", ret); + goto retry; + } + else { + /* The request was issued successfully, validate the 'error' field */ + flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); + if (c->resp.status != 200) { + if (c->resp.payload_size > 0) { + flb_plg_debug(ctx->ins, "Kafka REST response\n%s", + c->resp.payload); + } + goto retry; + } + + if (c->resp.payload_size > 0) { + flb_plg_debug(ctx->ins, "Kafka REST response\n%s", + c->resp.payload); + } + else { + goto retry; + } + } + + /* Cleanup */ + flb_http_client_destroy(c); + flb_sds_destroy(js); + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_OK); + + /* Issue a retry */ + retry: + flb_http_client_destroy(c); + flb_sds_destroy(js); + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_RETRY); +} + +static int cb_kafka_exit(void *data, struct flb_config *config) +{ + struct flb_kafka_rest *ctx = data; + + flb_kr_conf_destroy(ctx); + return 0; +} + +struct flb_output_plugin out_kafka_rest_plugin = { + .name = "kafka-rest", + .description = "Kafka REST Proxy", + .cb_init = cb_kafka_init, + .cb_flush = cb_kafka_flush, + .cb_exit = cb_kafka_exit, + .config_map = config_map, + .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, +}; |