diff options
Diffstat (limited to 'fluent-bit/plugins/out_kafka_rest')
-rw-r--r-- | fluent-bit/plugins/out_kafka_rest/CMakeLists.txt | 5 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kafka_rest/kafka.c | 351 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kafka_rest/kafka.h | 66 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kafka_rest/kafka_conf.c | 223 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kafka_rest/kafka_conf.h | 33 |
5 files changed, 678 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_kafka_rest/CMakeLists.txt b/fluent-bit/plugins/out_kafka_rest/CMakeLists.txt new file mode 100644 index 000000000..39df92f77 --- /dev/null +++ b/fluent-bit/plugins/out_kafka_rest/CMakeLists.txt @@ -0,0 +1,5 @@ +set(src + kafka_conf.c + kafka.c) + +FLB_PLUGIN(out_kafka_rest "${src}" "") diff --git a/fluent-bit/plugins/out_kafka_rest/kafka.c b/fluent-bit/plugins/out_kafka_rest/kafka.c new file mode 100644 index 000000000..f3b6153a6 --- /dev/null +++ b/fluent-bit/plugins/out_kafka_rest/kafka.c @@ -0,0 +1,351 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_config_map.h> +#include <fluent-bit/flb_log_event_decoder.h> +#include <msgpack.h> + +#include "kafka.h" +#include "kafka_conf.h" + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "message_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, message_key), + "Specify a message key. " + }, + + { + FLB_CONFIG_MAP_STR, "time_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, time_key), + "Specify the name of the field that holds the record timestamp. " + }, + + { + FLB_CONFIG_MAP_STR, "topic", "fluent-bit", + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, topic), + "Specify the kafka topic. " + }, + + { + FLB_CONFIG_MAP_STR, "url_path", NULL, + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, url_path), + "Specify an optional HTTP URL path for the target web server, e.g: /something" + }, + + { + FLB_CONFIG_MAP_DOUBLE, "partition", "-1", + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, partition), + "Specify kafka partition number. " + }, + + { + FLB_CONFIG_MAP_STR, "time_key_format", FLB_KAFKA_TIME_KEYF, + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, time_key_format), + "Specify the format of the timestamp. " + }, + + { + FLB_CONFIG_MAP_BOOL, "include_tag_key", "false", + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, include_tag_key), + "Specify whether to append tag name to final record. " + }, + + { + FLB_CONFIG_MAP_STR, "tag_key", "_flb-key", + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, tag_key), + "Specify the key name of the record if include_tag_key is enabled. " + }, + { + FLB_CONFIG_MAP_BOOL, "avro_http_header", "false", + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, avro_http_header), + "Specify if the format has avro header in http request" + }, + + /* EOF */ + {0} +}; +/* + * Convert the internal Fluent Bit data representation to the required + * one by Kafka REST Proxy. + */ +static flb_sds_t kafka_rest_format(const void *data, size_t bytes, + const char *tag, int tag_len, + size_t *out_size, + struct flb_kafka_rest *ctx) +{ + int i; + int len; + int arr_size = 0; + int map_size; + size_t s; + flb_sds_t out_buf; + char time_formatted[256]; + msgpack_object map; + msgpack_object key; + msgpack_object val; + struct tm tm; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + int ret; + + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + return NULL; + } + + /* Init temporary buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Count number of entries */ + arr_size = flb_mp_count(data, bytes); + + /* Root map */ + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str(&mp_pck, 7); + msgpack_pack_str_body(&mp_pck, "records", 7); + + msgpack_pack_array(&mp_pck, arr_size); + + /* Iterate and compose array content */ + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + map = *log_event.body; + map_size = 1; + + if (ctx->partition >= 0) { + map_size++; + } + + if (ctx->message_key != NULL) { + map_size++; + } + + msgpack_pack_map(&mp_pck, map_size); + if (ctx->partition >= 0) { + msgpack_pack_str(&mp_pck, 9); + msgpack_pack_str_body(&mp_pck, "partition", 9); + msgpack_pack_int64(&mp_pck, ctx->partition); + } + + + if (ctx->message_key != NULL) { + msgpack_pack_str(&mp_pck, 3); + msgpack_pack_str_body(&mp_pck, "key", 3); + msgpack_pack_str(&mp_pck, ctx->message_key_len); + msgpack_pack_str_body(&mp_pck, ctx->message_key, ctx->message_key_len); + } + + /* Value Map Size */ + map_size = map.via.map.size; + map_size++; + if (ctx->include_tag_key == FLB_TRUE) { + map_size++; + } + + msgpack_pack_str(&mp_pck, 5); + msgpack_pack_str_body(&mp_pck, "value", 5); + + msgpack_pack_map(&mp_pck, map_size); + + /* Time key and time formatted */ + msgpack_pack_str(&mp_pck, ctx->time_key_len); + msgpack_pack_str_body(&mp_pck, ctx->time_key, ctx->time_key_len); + + /* Format the time */ + gmtime_r(&log_event.timestamp.tm.tv_sec, &tm); + s = strftime(time_formatted, sizeof(time_formatted) - 1, + ctx->time_key_format, &tm); + len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, + ".%09" PRIu64 "Z", (uint64_t) log_event.timestamp.tm.tv_nsec); + s += len; + msgpack_pack_str(&mp_pck, s); + msgpack_pack_str_body(&mp_pck, time_formatted, s); + + /* Tag Key */ + if (ctx->include_tag_key == FLB_TRUE) { + msgpack_pack_str(&mp_pck, ctx->tag_key_len); + msgpack_pack_str_body(&mp_pck, ctx->tag_key, ctx->tag_key_len); + msgpack_pack_str(&mp_pck, tag_len); + msgpack_pack_str_body(&mp_pck, tag, tag_len); + } + + for (i = 0; i < map.via.map.size; i++) { + key = map.via.map.ptr[i].key; + val = map.via.map.ptr[i].val; + + msgpack_pack_object(&mp_pck, key); + msgpack_pack_object(&mp_pck, val); + } + } + flb_log_event_decoder_destroy(&log_decoder); + + /* Convert to JSON */ + out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); + if (!out_buf) { + return NULL; + } + + *out_size = flb_sds_len(out_buf); + + return out_buf; +} + +static int cb_kafka_init(struct flb_output_instance *ins, + struct flb_config *config, + void *data) +{ + (void) ins; + (void) config; + (void) data; + struct flb_kafka_rest *ctx; + + ctx = flb_kr_conf_create(ins, config); + if (!ctx) { + flb_plg_error(ins, "cannot initialize plugin"); + return -1; + } + + flb_plg_debug(ctx->ins, "host=%s port=%i", + ins->host.name, ins->host.port); + flb_output_set_context(ins, ctx); + + return 0; +} + +static void cb_kafka_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + flb_sds_t js; + size_t js_size; + size_t b_sent; + struct flb_http_client *c; + struct flb_connection *u_conn; + struct flb_kafka_rest *ctx = out_context; + (void) i_ins; + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Convert format */ + js = kafka_rest_format(event_chunk->data, event_chunk->size, + event_chunk->tag, flb_sds_len(event_chunk->tag), + &js_size, ctx); + if (!js) { + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Compose HTTP Client request */ + c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, + js, js_size, NULL, 0, NULL, 0); + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + if (ctx->avro_http_header == FLB_TRUE) { + flb_http_add_header(c, + "Content-Type", 12, + "application/vnd.kafka.avro.v2+json", 34); + } + else { + flb_http_add_header(c, + "Content-Type", 12, + "application/vnd.kafka.json.v2+json", 34); + } + + if (ctx->http_user && ctx->http_passwd) { + flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); + } + + ret = flb_http_do(c, &b_sent); + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i", ret); + goto retry; + } + else { + /* The request was issued successfully, validate the 'error' field */ + flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); + if (c->resp.status != 200) { + if (c->resp.payload_size > 0) { + flb_plg_debug(ctx->ins, "Kafka REST response\n%s", + c->resp.payload); + } + goto retry; + } + + if (c->resp.payload_size > 0) { + flb_plg_debug(ctx->ins, "Kafka REST response\n%s", + c->resp.payload); + } + else { + goto retry; + } + } + + /* Cleanup */ + flb_http_client_destroy(c); + flb_sds_destroy(js); + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_OK); + + /* Issue a retry */ + retry: + flb_http_client_destroy(c); + flb_sds_destroy(js); + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_RETRY); +} + +static int cb_kafka_exit(void *data, struct flb_config *config) +{ + struct flb_kafka_rest *ctx = data; + + flb_kr_conf_destroy(ctx); + return 0; +} + +struct flb_output_plugin out_kafka_rest_plugin = { + .name = "kafka-rest", + .description = "Kafka REST Proxy", + .cb_init = cb_kafka_init, + .cb_flush = cb_kafka_flush, + .cb_exit = cb_kafka_exit, + .config_map = config_map, + .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, +}; diff --git a/fluent-bit/plugins/out_kafka_rest/kafka.h b/fluent-bit/plugins/out_kafka_rest/kafka.h new file mode 100644 index 000000000..c2d220e7d --- /dev/null +++ b/fluent-bit/plugins/out_kafka_rest/kafka.h @@ -0,0 +1,66 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_KAFKA_REST_H +#define FLB_OUT_KAFKA_REST_H + +#define FLB_KAFKA_TIME_KEY "@timestamp" +#define FLB_KAFKA_TIME_KEYF "%Y-%m-%dT%H:%M:%S" +#define FLB_KAFKA_TAG_KEY "_flb-key" + +struct flb_kafka_rest { + /* Kafka specifics */ + long partition; + char *topic; + int message_key_len; + char *message_key; + + /* HTTP Auth */ + char *http_user; + char *http_passwd; + + /* time key */ + int time_key_len; + char *time_key; + + /* time key format */ + int time_key_format_len; + char *time_key_format; + + /* include_tag_key */ + int include_tag_key; + int tag_key_len; + char *tag_key; + + /* HTTP URI */ + char uri[256]; + char *url_path; + + /* Upstream connection to the backend server */ + struct flb_upstream *u; + + /* Plugin instance */ + struct flb_output_instance *ins; + + /* Avro http header*/ + int avro_http_header; +}; + + +#endif diff --git a/fluent-bit/plugins/out_kafka_rest/kafka_conf.c b/fluent-bit/plugins/out_kafka_rest/kafka_conf.c new file mode 100644 index 000000000..3df50eb8b --- /dev/null +++ b/fluent-bit/plugins/out_kafka_rest/kafka_conf.c @@ -0,0 +1,223 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_utils.h> + +#include "kafka.h" +#include "kafka_conf.h" + +struct flb_kafka_rest *flb_kr_conf_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + long part; + int io_flags = 0; + const char *tmp; + char *endptr; + struct flb_upstream *upstream; + struct flb_kafka_rest *ctx; + int ret; + + /* Allocate context */ + ctx = flb_calloc(1, sizeof(struct flb_kafka_rest)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Get network configuration */ + flb_output_net_default("127.0.0.1", 8082, ins); + + /* use TLS ? */ + if (ins->use_tls == FLB_TRUE) { + io_flags = FLB_IO_TLS; + } + else { + io_flags = FLB_IO_TCP; + } + + if (ins->host.ipv6 == FLB_TRUE) { + io_flags |= FLB_IO_IPV6; + } + + /* Prepare an upstream handler */ + upstream = flb_upstream_create(config, + ins->host.name, + ins->host.port, + io_flags, + ins->tls); + if (!upstream) { + flb_plg_error(ctx->ins, "cannot create Upstream context"); + flb_kr_conf_destroy(ctx); + return NULL; + } + ctx->u = upstream; + flb_output_upstream_set(ctx->u, ins); + + flb_output_upstream_set(ctx->u, ins); + + /* HTTP Auth */ + tmp = flb_output_get_property("http_user", ins); + if (tmp) { + ctx->http_user = flb_strdup(tmp); + + tmp = flb_output_get_property("http_passwd", ins); + if (tmp) { + ctx->http_passwd = flb_strdup(tmp); + } + else { + ctx->http_passwd = flb_strdup(""); + } + } + + /* Time Key */ + tmp = flb_output_get_property("time_key", ins); + if (tmp) { + ctx->time_key = flb_strdup(tmp); + ctx->time_key_len = strlen(tmp); + } + else { + ctx->time_key = flb_strdup(FLB_KAFKA_TIME_KEY); + ctx->time_key_len = sizeof(FLB_KAFKA_TIME_KEY) - 1; + } + + /* Time Key Format */ + tmp = flb_output_get_property("time_key_format", ins); + if (tmp) { + ctx->time_key_format = flb_strdup(tmp); + ctx->time_key_format_len = strlen(tmp); + } + else { + ctx->time_key_format = flb_strdup(FLB_KAFKA_TIME_KEYF); + ctx->time_key_format_len = sizeof(FLB_KAFKA_TIME_KEYF) - 1; + } + + /* Include Tag key */ + tmp = flb_output_get_property("include_tag_key", ins); + if (tmp) { + ctx->include_tag_key = flb_utils_bool(tmp); + } + else { + ctx->include_tag_key = FLB_FALSE; + } + + /* Tag Key */ + if (ctx->include_tag_key == FLB_TRUE) { + tmp = flb_output_get_property("tag_key", ins); + if (tmp) { + ctx->tag_key = flb_strdup(tmp); + ctx->tag_key_len = strlen(tmp); + if (tmp[0] != '_') { + flb_plg_warn(ctx->ins, "consider use a tag_key " + "that starts with '_'"); + } + } + else { + ctx->tag_key = flb_strdup(FLB_KAFKA_TAG_KEY); + ctx->tag_key_len = sizeof(FLB_KAFKA_TAG_KEY) - 1; + } + } + + /* Kafka: partition */ + tmp = flb_output_get_property("partition", ins); + if (tmp) { + errno = 0; + part = strtol(tmp, &endptr, 10); + if ((errno == ERANGE && (part == LONG_MAX || part == LONG_MIN)) + || (errno != 0 && part == 0)) { + flb_plg_error(ctx->ins, "invalid partition number"); + } + + if (endptr == tmp) { + flb_plg_error(ctx->ins, "invalid partition number"); + } + ctx->partition = part; + } + else { + ctx->partition = -1; + } + + /* Kafka: topic */ + tmp = flb_output_get_property("topic", ins); + if (tmp) { + ctx->topic = flb_strdup(tmp); + } + else { + ctx->topic = flb_strdup("fluent-bit"); + } + + /* Set partition based on topic */ + tmp = flb_output_get_property("url_path", ins); + if (tmp) { + ctx->url_path = flb_strdup(tmp); + snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/topics/%s", ctx->url_path, ctx->topic); + } else { + ctx->url_path = NULL; + snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/topics/%s", ctx->topic); + } + + /* Kafka: message key */ + tmp = flb_output_get_property("message_key", ins); + if (tmp) { + ctx->message_key = flb_strdup(tmp); + ctx->message_key_len = strlen(tmp); + } + else { + ctx->message_key = NULL; + ctx->message_key_len = 0; + } + + return ctx; +} + +int flb_kr_conf_destroy(struct flb_kafka_rest *ctx) +{ + flb_free(ctx->topic); + flb_free(ctx->http_user); + flb_free(ctx->http_passwd); + + flb_free(ctx->time_key); + flb_free(ctx->time_key_format); + + if (ctx->url_path) { + flb_free(ctx->url_path); + } + + if (ctx->include_tag_key) { + flb_free(ctx->tag_key); + } + + if (ctx->message_key) { + flb_free(ctx->message_key); + } + + flb_upstream_destroy(ctx->u); + flb_free(ctx); + + return 0; +} diff --git a/fluent-bit/plugins/out_kafka_rest/kafka_conf.h b/fluent-bit/plugins/out_kafka_rest/kafka_conf.h new file mode 100644 index 000000000..1d80445b3 --- /dev/null +++ b/fluent-bit/plugins/out_kafka_rest/kafka_conf.h @@ -0,0 +1,33 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_KAFKA_REST_CONF_H +#define FLB_OUT_KAFKA_REST_CONF_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_output.h> + +#include "kafka.h" + +struct flb_kafka_rest *flb_kr_conf_create(struct flb_output_instance *ins, + struct flb_config *config); +int flb_kr_conf_destroy(struct flb_kafka_rest *ctx); + +#endif |