diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_vivo_exporter/vivo.c')
-rw-r--r-- | src/fluent-bit/plugins/out_vivo_exporter/vivo.c | 343 |
1 files changed, 343 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_vivo_exporter/vivo.c b/src/fluent-bit/plugins/out_vivo_exporter/vivo.c new file mode 100644 index 000000000..85e1e0159 --- /dev/null +++ b/src/fluent-bit/plugins/out_vivo_exporter/vivo.c @@ -0,0 +1,343 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_kv.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_log_event_decoder.h> +#include <fluent-bit/flb_log_event_encoder.h> + +#include "vivo.h" +#include "vivo_http.h" +#include "vivo_stream.h" + +static flb_sds_t format_logs(struct flb_event_chunk *event_chunk) +{ + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + int result; + int i; + flb_sds_t out_js; + flb_sds_t out_buf = NULL; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + + result = flb_log_event_decoder_init(&log_decoder, + (char *) event_chunk->data, + event_chunk->size); + + if (result != FLB_EVENT_DECODER_SUCCESS) { + return NULL; + } + + out_buf = flb_sds_create_size((event_chunk->size * 2) / 4); + if (!out_buf) { + flb_errno(); + return NULL; + } + + /* Create temporary msgpack buffer */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + + while ((result = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + /* + * If the caller specified FLB_PACK_JSON_DATE_FLUENT, we format the data + * by using the following structure: + * + * [[TIMESTAMP, {"_tag": "...", ...MORE_METADATA}], {RECORD CONTENT}] + */ + msgpack_pack_array(&tmp_pck, 2); + msgpack_pack_array(&tmp_pck, 2); + msgpack_pack_uint64(&tmp_pck, flb_time_to_nanosec(&log_event.timestamp)); + + /* add tag only */ + msgpack_pack_map(&tmp_pck, 1 + log_event.metadata->via.map.size); + + msgpack_pack_str(&tmp_pck, 4); + msgpack_pack_str_body(&tmp_pck, "_tag", 4); + + msgpack_pack_str(&tmp_pck, flb_sds_len(event_chunk->tag)); + msgpack_pack_str_body(&tmp_pck, event_chunk->tag, flb_sds_len(event_chunk->tag)); + + /* Append remaining keys/values */ + for (i = 0; + i < log_event.metadata->via.map.size; + i++) { + msgpack_pack_object(&tmp_pck, + log_event.metadata->via.map.ptr[i].key); + msgpack_pack_object(&tmp_pck, + log_event.metadata->via.map.ptr[i].val); + } + + /* pack the remaining content */ + msgpack_pack_map(&tmp_pck, log_event.body->via.map.size); + + /* Append remaining keys/values */ + for (i = 0; + i < log_event.body->via.map.size; + i++) { + msgpack_pack_object(&tmp_pck, + log_event.body->via.map.ptr[i].key); + msgpack_pack_object(&tmp_pck, + log_event.body->via.map.ptr[i].val); + } + + /* Concatenate by using break lines */ + out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size); + if (!out_js) { + flb_sds_destroy(out_buf); + msgpack_sbuffer_destroy(&tmp_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + return NULL; + } + + /* + * One map record has been converted, now append it to the + * outgoing out_buf sds variable. + */ + flb_sds_cat_safe(&out_buf, out_js, flb_sds_len(out_js)); + flb_sds_cat_safe(&out_buf, "\n", 1); + + flb_sds_destroy(out_js); + msgpack_sbuffer_clear(&tmp_sbuf); + } + + /* Release the unpacker */ + flb_log_event_decoder_destroy(&log_decoder); + + msgpack_sbuffer_destroy(&tmp_sbuf); + + return out_buf; +} + +static int logs_event_chunk_append(struct vivo_exporter *ctx, + struct flb_event_chunk *event_chunk) +{ + size_t len; + flb_sds_t json; + struct vivo_stream_entry *entry; + + + json = format_logs(event_chunk); + if (!json) { + flb_plg_error(ctx->ins, "cannot convert logs chunk to JSON"); + return -1; + } + + /* append content to the stream */ + len = flb_sds_len(json); + entry = vivo_stream_append(ctx->stream_logs, json, len); + + flb_sds_destroy(json); + + if (!entry) { + flb_plg_error(ctx->ins, "cannot append JSON log to stream"); + return -1; + } + + return 0; +} + +static int metrics_traces_event_chunk_append(struct vivo_exporter *ctx, + struct vivo_stream *vs, + struct flb_event_chunk *event_chunk) +{ + size_t len; + flb_sds_t json; + struct vivo_stream_entry *entry; + + /* Convert msgpack to readable JSON format */ + json = flb_msgpack_raw_to_json_sds(event_chunk->data, event_chunk->size); + if (!json) { + flb_plg_error(ctx->ins, "cannot convert metrics chunk to JSON"); + return -1; + } + + flb_sds_cat_safe(&json, "\n", 1); + + /* append content to the stream */ + len = flb_sds_len(json); + entry = vivo_stream_append(vs, json, len); + + flb_sds_destroy(json); + + if (!entry) { + flb_plg_error(ctx->ins, "cannot append JSON log to stream"); + return -1; + } + + return 0; +} + +static int cb_vivo_init(struct flb_output_instance *ins, + struct flb_config *config, + void *data) +{ + int ret; + struct vivo_exporter *ctx; + + flb_output_net_default("0.0.0.0", 2025 , ins); + + ctx = flb_calloc(1, sizeof(struct vivo_exporter)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + flb_output_set_context(ins, ctx); + + /* Load config map */ + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + return -1; + } + + /* Create Streams */ + ctx->stream_logs = vivo_stream_create(ctx); + if (!ctx->stream_logs) { + return -1; + } + + ctx->stream_metrics = vivo_stream_create(ctx); + if (!ctx->stream_metrics) { + return -1; + } + + ctx->stream_traces = vivo_stream_create(ctx); + if (!ctx->stream_traces) { + return -1; + } + + /* HTTP Server context */ + ctx->http = vivo_http_server_create(ctx, + ins->host.name, ins->host.port, config); + if (!ctx->http) { + flb_plg_error(ctx->ins, "could not initialize HTTP server, aborting"); + return -1; + } + + /* Start HTTP Server */ + ret = vivo_http_server_start(ctx->http); + if (ret == -1) { + return -1; + } + + flb_plg_info(ctx->ins, "listening iface=%s tcp_port=%d", + ins->host.name, ins->host.port); + + return 0; +} + +static void cb_vivo_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, void *out_context, + struct flb_config *config) +{ + int ret = -1; + struct vivo_exporter *ctx = out_context; + +#ifdef FLB_HAVE_METRICS + if (event_chunk->type == FLB_EVENT_TYPE_METRICS) { + ret = metrics_traces_event_chunk_append(ctx, ctx->stream_metrics, event_chunk); + } +#endif + if (event_chunk->type == FLB_EVENT_TYPE_LOGS) { + ret = logs_event_chunk_append(ctx, event_chunk); + } + else if (event_chunk->type == FLB_EVENT_TYPE_TRACES) { + ret = metrics_traces_event_chunk_append(ctx, ctx->stream_traces, event_chunk); + } + + if (ret == 0) { + FLB_OUTPUT_RETURN(FLB_OK); + } + + FLB_OUTPUT_RETURN(FLB_ERROR); +} + +static int cb_vivo_exit(void *data, struct flb_config *config) +{ + struct vivo_exporter *ctx = data; + + if (!ctx) { + return 0; + } + + if (ctx->http) { + vivo_http_server_stop(ctx->http); + vivo_http_server_destroy(ctx->http); + } + + vivo_stream_destroy(ctx->stream_logs); + vivo_stream_destroy(ctx->stream_metrics); + vivo_stream_destroy(ctx->stream_traces); + + flb_free(ctx); + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_BOOL, "empty_stream_on_read", "off", + 0, FLB_TRUE, offsetof(struct vivo_exporter, empty_stream_on_read), + "If enabled, when an HTTP client consumes the data from a stream, the queue " + "content will be removed" + }, + + { + FLB_CONFIG_MAP_SIZE, "stream_queue_size", "20M", + 0, FLB_TRUE, offsetof(struct vivo_exporter, stream_queue_size), + "Specify the maximum queue size per stream. Each specific stream for logs, metrics " + "and traces can hold up to 'stream_queue_size' bytes." + }, + + { + FLB_CONFIG_MAP_STR, "http_cors_allow_origin", NULL, + 0, FLB_TRUE, offsetof(struct vivo_exporter, http_cors_allow_origin), + "Specify the value for the HTTP Access-Control-Allow-Origin header (CORS)" + }, + + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_output_plugin out_vivo_exporter_plugin = { + .name = "vivo_exporter", + .description = "Vivo Exporter", + .cb_init = cb_vivo_init, + .cb_flush = cb_vivo_flush, + .cb_exit = cb_vivo_exit, + .flags = FLB_OUTPUT_NET, + .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES, + .config_map = config_map, + .workers = 1, +}; |