summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/out_vivo_exporter
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/plugins/out_vivo_exporter')
-rw-r--r--fluent-bit/plugins/out_vivo_exporter/CMakeLists.txt15
-rw-r--r--fluent-bit/plugins/out_vivo_exporter/vivo.c343
-rw-r--r--fluent-bit/plugins/out_vivo_exporter/vivo.h45
-rw-r--r--fluent-bit/plugins/out_vivo_exporter/vivo_http.c266
-rw-r--r--fluent-bit/plugins/out_vivo_exporter/vivo_http.h56
-rw-r--r--fluent-bit/plugins/out_vivo_exporter/vivo_stream.c239
-rw-r--r--fluent-bit/plugins/out_vivo_exporter/vivo_stream.h59
7 files changed, 1023 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_vivo_exporter/CMakeLists.txt b/fluent-bit/plugins/out_vivo_exporter/CMakeLists.txt
new file mode 100644
index 000000000..e458b1ff4
--- /dev/null
+++ b/fluent-bit/plugins/out_vivo_exporter/CMakeLists.txt
@@ -0,0 +1,15 @@
+if(NOT FLB_HTTP_SERVER)
+ message(
+ FATAL_ERROR
+ "Vivo Exporter output plugin requires built-in HTTP Server be enabled:
+ Use -DFLB_HTTP_SERVER=On option to enable it"
+ )
+endif()
+
+set(src
+ vivo_http.c
+ vivo_stream.c
+ vivo.c
+ )
+
+FLB_PLUGIN(out_vivo_exporter "${src}" "")
diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo.c b/fluent-bit/plugins/out_vivo_exporter/vivo.c
new file mode 100644
index 000000000..85e1e0159
--- /dev/null
+++ b/fluent-bit/plugins/out_vivo_exporter/vivo.c
@@ -0,0 +1,343 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#include "vivo.h"
+#include "vivo_http.h"
+#include "vivo_stream.h"
+
+static flb_sds_t format_logs(struct flb_event_chunk *event_chunk)
+{
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int result;
+ int i;
+ flb_sds_t out_js;
+ flb_sds_t out_buf = NULL;
+ msgpack_sbuffer tmp_sbuf;
+ msgpack_packer tmp_pck;
+
+ result = flb_log_event_decoder_init(&log_decoder,
+ (char *) event_chunk->data,
+ event_chunk->size);
+
+ if (result != FLB_EVENT_DECODER_SUCCESS) {
+ return NULL;
+ }
+
+ out_buf = flb_sds_create_size((event_chunk->size * 2) / 4);
+ if (!out_buf) {
+ flb_errno();
+ return NULL;
+ }
+
+ /* Create temporary msgpack buffer */
+ msgpack_sbuffer_init(&tmp_sbuf);
+ msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
+
+ while ((result = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ /*
+ * If the caller specified FLB_PACK_JSON_DATE_FLUENT, we format the data
+ * by using the following structure:
+ *
+ * [[TIMESTAMP, {"_tag": "...", ...MORE_METADATA}], {RECORD CONTENT}]
+ */
+ msgpack_pack_array(&tmp_pck, 2);
+ msgpack_pack_array(&tmp_pck, 2);
+ msgpack_pack_uint64(&tmp_pck, flb_time_to_nanosec(&log_event.timestamp));
+
+ /* add tag only */
+ msgpack_pack_map(&tmp_pck, 1 + log_event.metadata->via.map.size);
+
+ msgpack_pack_str(&tmp_pck, 4);
+ msgpack_pack_str_body(&tmp_pck, "_tag", 4);
+
+ msgpack_pack_str(&tmp_pck, flb_sds_len(event_chunk->tag));
+ msgpack_pack_str_body(&tmp_pck, event_chunk->tag, flb_sds_len(event_chunk->tag));
+
+ /* Append remaining keys/values */
+ for (i = 0;
+ i < log_event.metadata->via.map.size;
+ i++) {
+ msgpack_pack_object(&tmp_pck,
+ log_event.metadata->via.map.ptr[i].key);
+ msgpack_pack_object(&tmp_pck,
+ log_event.metadata->via.map.ptr[i].val);
+ }
+
+ /* pack the remaining content */
+ msgpack_pack_map(&tmp_pck, log_event.body->via.map.size);
+
+ /* Append remaining keys/values */
+ for (i = 0;
+ i < log_event.body->via.map.size;
+ i++) {
+ msgpack_pack_object(&tmp_pck,
+ log_event.body->via.map.ptr[i].key);
+ msgpack_pack_object(&tmp_pck,
+ log_event.body->via.map.ptr[i].val);
+ }
+
+ /* Concatenate by using break lines */
+ out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
+ if (!out_js) {
+ flb_sds_destroy(out_buf);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ flb_log_event_decoder_destroy(&log_decoder);
+ return NULL;
+ }
+
+ /*
+ * One map record has been converted, now append it to the
+ * outgoing out_buf sds variable.
+ */
+ flb_sds_cat_safe(&out_buf, out_js, flb_sds_len(out_js));
+ flb_sds_cat_safe(&out_buf, "\n", 1);
+
+ flb_sds_destroy(out_js);
+ msgpack_sbuffer_clear(&tmp_sbuf);
+ }
+
+ /* Release the unpacker */
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+
+ return out_buf;
+}
+
+static int logs_event_chunk_append(struct vivo_exporter *ctx,
+ struct flb_event_chunk *event_chunk)
+{
+ size_t len;
+ flb_sds_t json;
+ struct vivo_stream_entry *entry;
+
+
+ json = format_logs(event_chunk);
+ if (!json) {
+ flb_plg_error(ctx->ins, "cannot convert logs chunk to JSON");
+ return -1;
+ }
+
+ /* append content to the stream */
+ len = flb_sds_len(json);
+ entry = vivo_stream_append(ctx->stream_logs, json, len);
+
+ flb_sds_destroy(json);
+
+ if (!entry) {
+ flb_plg_error(ctx->ins, "cannot append JSON log to stream");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int metrics_traces_event_chunk_append(struct vivo_exporter *ctx,
+ struct vivo_stream *vs,
+ struct flb_event_chunk *event_chunk)
+{
+ size_t len;
+ flb_sds_t json;
+ struct vivo_stream_entry *entry;
+
+ /* Convert msgpack to readable JSON format */
+ json = flb_msgpack_raw_to_json_sds(event_chunk->data, event_chunk->size);
+ if (!json) {
+ flb_plg_error(ctx->ins, "cannot convert metrics chunk to JSON");
+ return -1;
+ }
+
+ flb_sds_cat_safe(&json, "\n", 1);
+
+ /* append content to the stream */
+ len = flb_sds_len(json);
+ entry = vivo_stream_append(vs, json, len);
+
+ flb_sds_destroy(json);
+
+ if (!entry) {
+ flb_plg_error(ctx->ins, "cannot append JSON log to stream");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int cb_vivo_init(struct flb_output_instance *ins,
+ struct flb_config *config,
+ void *data)
+{
+ int ret;
+ struct vivo_exporter *ctx;
+
+ flb_output_net_default("0.0.0.0", 2025 , ins);
+
+ ctx = flb_calloc(1, sizeof(struct vivo_exporter));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+ ctx->ins = ins;
+
+ ret = flb_output_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return -1;
+ }
+
+ flb_output_set_context(ins, ctx);
+
+ /* Load config map */
+ ret = flb_output_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* Create Streams */
+ ctx->stream_logs = vivo_stream_create(ctx);
+ if (!ctx->stream_logs) {
+ return -1;
+ }
+
+ ctx->stream_metrics = vivo_stream_create(ctx);
+ if (!ctx->stream_metrics) {
+ return -1;
+ }
+
+ ctx->stream_traces = vivo_stream_create(ctx);
+ if (!ctx->stream_traces) {
+ return -1;
+ }
+
+ /* HTTP Server context */
+ ctx->http = vivo_http_server_create(ctx,
+ ins->host.name, ins->host.port, config);
+ if (!ctx->http) {
+ flb_plg_error(ctx->ins, "could not initialize HTTP server, aborting");
+ return -1;
+ }
+
+ /* Start HTTP Server */
+ ret = vivo_http_server_start(ctx->http);
+ if (ret == -1) {
+ return -1;
+ }
+
+ flb_plg_info(ctx->ins, "listening iface=%s tcp_port=%d",
+ ins->host.name, ins->host.port);
+
+ return 0;
+}
+
+static void cb_vivo_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *ins, void *out_context,
+ struct flb_config *config)
+{
+ int ret = -1;
+ struct vivo_exporter *ctx = out_context;
+
+#ifdef FLB_HAVE_METRICS
+ if (event_chunk->type == FLB_EVENT_TYPE_METRICS) {
+ ret = metrics_traces_event_chunk_append(ctx, ctx->stream_metrics, event_chunk);
+ }
+#endif
+ if (event_chunk->type == FLB_EVENT_TYPE_LOGS) {
+ ret = logs_event_chunk_append(ctx, event_chunk);
+ }
+ else if (event_chunk->type == FLB_EVENT_TYPE_TRACES) {
+ ret = metrics_traces_event_chunk_append(ctx, ctx->stream_traces, event_chunk);
+ }
+
+ if (ret == 0) {
+ FLB_OUTPUT_RETURN(FLB_OK);
+ }
+
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+}
+
+static int cb_vivo_exit(void *data, struct flb_config *config)
+{
+ struct vivo_exporter *ctx = data;
+
+ if (!ctx) {
+ return 0;
+ }
+
+ if (ctx->http) {
+ vivo_http_server_stop(ctx->http);
+ vivo_http_server_destroy(ctx->http);
+ }
+
+ vivo_stream_destroy(ctx->stream_logs);
+ vivo_stream_destroy(ctx->stream_metrics);
+ vivo_stream_destroy(ctx->stream_traces);
+
+ flb_free(ctx);
+
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_BOOL, "empty_stream_on_read", "off",
+ 0, FLB_TRUE, offsetof(struct vivo_exporter, empty_stream_on_read),
+ "If enabled, when an HTTP client consumes the data from a stream, the queue "
+ "content will be removed"
+ },
+
+ {
+ FLB_CONFIG_MAP_SIZE, "stream_queue_size", "20M",
+ 0, FLB_TRUE, offsetof(struct vivo_exporter, stream_queue_size),
+ "Specify the maximum queue size per stream. Each specific stream for logs, metrics "
+ "and traces can hold up to 'stream_queue_size' bytes."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "http_cors_allow_origin", NULL,
+ 0, FLB_TRUE, offsetof(struct vivo_exporter, http_cors_allow_origin),
+ "Specify the value for the HTTP Access-Control-Allow-Origin header (CORS)"
+ },
+
+ /* EOF */
+ {0}
+};
+
+/* Plugin reference */
+struct flb_output_plugin out_vivo_exporter_plugin = {
+ .name = "vivo_exporter",
+ .description = "Vivo Exporter",
+ .cb_init = cb_vivo_init,
+ .cb_flush = cb_vivo_flush,
+ .cb_exit = cb_vivo_exit,
+ .flags = FLB_OUTPUT_NET,
+ .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES,
+ .config_map = config_map,
+ .workers = 1,
+};
diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo.h b/fluent-bit/plugins/out_vivo_exporter/vivo.h
new file mode 100644
index 000000000..943c40364
--- /dev/null
+++ b/fluent-bit/plugins/out_vivo_exporter/vivo.h
@@ -0,0 +1,45 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_VIVO_EXPORTER_H
+#define FLB_VIVO_EXPORTER_H
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_ring_buffer.h>
+
+#define VIVO_RING_BUFFER_SIZE 10
+
+/* Plugin context */
+struct vivo_exporter {
+ void *http;
+
+ void *stream_logs;
+ void *stream_metrics;
+ void *stream_traces;
+
+ /* options */
+ int empty_stream_on_read;
+ size_t stream_queue_size;
+ flb_sds_t http_cors_allow_origin;
+
+ /* instance context */
+ struct flb_output_instance *ins;
+};
+
+#endif
diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo_http.c b/fluent-bit/plugins/out_vivo_exporter/vivo_http.c
new file mode 100644
index 000000000..efd39dcc8
--- /dev/null
+++ b/fluent-bit/plugins/out_vivo_exporter/vivo_http.c
@@ -0,0 +1,266 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_http_server.h>
+
+#include "vivo.h"
+#include "vivo_http.h"
+#include "vivo_stream.h"
+
+#define VIVO_CONTENT_TYPE "Content-Type"
+#define VIVO_CONTENT_TYPE_JSON "application/json"
+#define VIVO_STREAM_START_ID "Vivo-Stream-Start-ID"
+#define VIVO_STREAM_END_ID "Vivo-Stream-End-ID"
+
+static int stream_get_uri_properties(mk_request_t *request,
+ int64_t *from, int64_t *to, int64_t *limit)
+{
+ char *ptr;
+ flb_sds_t buf;
+
+ *from = -1;
+ *to = -1;
+ *limit = -1;
+
+ buf = flb_sds_create_len(request->query_string.data, request->query_string.len);
+ if (!buf) {
+ return -1;
+ }
+
+ ptr = strstr(buf, "from=");
+ if (ptr) {
+ *from = atol(ptr + 5);
+ }
+
+ ptr = strstr(buf, "to=");
+ if (ptr) {
+ *to = atol(ptr + 3);
+ }
+
+ ptr = strstr(buf, "limit=");
+ if (ptr) {
+ *limit = atol(ptr + 6);
+ }
+
+ flb_sds_destroy(buf);
+
+ return 0;
+}
+
+static void headers_set(mk_request_t *request, struct vivo_stream *vs)
+{
+ struct vivo_exporter *ctx;
+
+
+ /* parent context */
+ ctx = vs->parent;
+
+ /* content type */
+ mk_http_header(request,
+ VIVO_CONTENT_TYPE, sizeof(VIVO_CONTENT_TYPE) - 1,
+ VIVO_CONTENT_TYPE_JSON, sizeof(VIVO_CONTENT_TYPE_JSON) - 1);
+
+ /* CORS */
+ if (ctx->http_cors_allow_origin) {
+ mk_http_header(request,
+ "Access-Control-Allow-Origin",
+ sizeof("Access-Control-Allow-Origin") - 1,
+ ctx->http_cors_allow_origin,
+ flb_sds_len(ctx->http_cors_allow_origin));
+
+ mk_http_header(request,
+ "Access-Control-Allow-Headers",
+ sizeof("Access-Control-Allow-Headers") - 1,
+ "Origin, X-Requested-With, Content-Type, Accept",
+ sizeof("Origin, X-Requested-With, Content-Type, Accept") - 1);
+
+ mk_http_header(request,
+ "Access-Control-Expose-Headers",
+ sizeof("Access-Control-Expose-Headers") - 1,
+ "vivo-stream-start-id, vivo-stream-end-id",
+ sizeof("vivo-stream-start-id, vivo-stream-end-id") - 1);
+
+ }
+}
+
+static void serve_content(mk_request_t *request, struct vivo_stream *vs)
+{
+ int64_t from = -1;
+ int64_t to = -1;
+ int64_t limit = -1;
+ int64_t stream_start_id = -1;
+ int64_t stream_end_id = -1;
+ flb_sds_t payload;
+ flb_sds_t str_start;
+ flb_sds_t str_end;
+
+
+ if (request->query_string.len > 0) {
+ stream_get_uri_properties(request, &from, &to, &limit);
+ }
+
+ payload = vivo_stream_get_content(vs, from, to, limit,
+ &stream_start_id, &stream_end_id);
+ if (!payload) {
+ mk_http_status(request, 500);
+ return;
+ }
+
+ if (flb_sds_len(payload) == 0) {
+ mk_http_status(request, 200);
+ headers_set(request, vs);
+ flb_sds_destroy(payload);
+ return;
+ }
+
+ mk_http_status(request, 200);
+
+ /* set response headers */
+ headers_set(request, vs);
+
+ /* stream ids served: compose buffer and set headers */
+ str_start = flb_sds_create_size(32);
+ flb_sds_printf(&str_start, "%" PRId64, stream_start_id);
+
+ str_end = flb_sds_create_size(32);
+ flb_sds_printf(&str_end, "%" PRId64, stream_end_id);
+
+ mk_http_header(request,
+ VIVO_STREAM_START_ID, sizeof(VIVO_STREAM_START_ID) - 1,
+ str_start, flb_sds_len(str_start));
+
+ mk_http_header(request,
+ VIVO_STREAM_END_ID, sizeof(VIVO_STREAM_END_ID) - 1,
+ str_end, flb_sds_len(str_end));
+
+ /* send payload */
+ mk_http_send(request, payload, flb_sds_len(payload), NULL);
+
+ /* release */
+ flb_sds_destroy(payload);
+ flb_sds_destroy(str_start);
+ flb_sds_destroy(str_end);
+}
+
+/* HTTP endpoint: /logs */
+static void cb_logs(mk_request_t *request, void *data)
+{
+ struct vivo_exporter *ctx;
+
+ ctx = (struct vivo_exporter *) data;
+
+ serve_content(request, ctx->stream_logs);
+ mk_http_done(request);
+}
+
+/* HTTP endpoint: /metrics */
+static void cb_metrics(mk_request_t *request, void *data)
+{
+ struct vivo_exporter *ctx;
+
+ ctx = (struct vivo_exporter *) data;
+
+ serve_content(request, ctx->stream_metrics);
+ mk_http_done(request);
+}
+
+static void cb_traces(mk_request_t *request, void *data)
+{
+ struct vivo_exporter *ctx;
+
+ ctx = (struct vivo_exporter *) data;
+
+ serve_content(request, ctx->stream_traces);
+ mk_http_done(request);
+}
+
+/* HTTP endpoint: / (root) */
+static void cb_root(mk_request_t *request, void *data)
+{
+ (void) data;
+
+ mk_http_status(request, 200);
+ mk_http_send(request, "Fluent Bit Vivo Exporter\n", 24, NULL);
+ mk_http_done(request);
+}
+
+struct vivo_http *vivo_http_server_create(struct vivo_exporter *ctx,
+ const char *listen,
+ int tcp_port,
+ struct flb_config *config)
+{
+ int vid;
+ char tmp[32];
+ struct vivo_http *ph;
+
+ ph = flb_malloc(sizeof(struct vivo_http));
+ if (!ph) {
+ flb_errno();
+ return NULL;
+ }
+ ph->config = config;
+
+ /* HTTP Server context */
+ ph->ctx = mk_create();
+ if (!ph->ctx) {
+ flb_free(ph);
+ return NULL;
+ }
+
+ /* Compose listen address */
+ snprintf(tmp, sizeof(tmp) -1, "%s:%d", listen, tcp_port);
+ mk_config_set(ph->ctx,
+ "Listen", tmp,
+ "Workers", "1",
+ NULL);
+
+ /* Virtual host */
+ vid = mk_vhost_create(ph->ctx, NULL);
+ ph->vid = vid;
+
+ /* Set HTTP URI callbacks */
+ mk_vhost_handler(ph->ctx, vid, "/logs", cb_logs, ctx);
+ mk_vhost_handler(ph->ctx, vid, "/metrics", cb_metrics, ctx);
+ mk_vhost_handler(ph->ctx, vid, "/traces", cb_traces, ctx);
+ mk_vhost_handler(ph->ctx, vid, "/", cb_root, NULL);
+
+ return ph;
+}
+
+void vivo_http_server_destroy(struct vivo_http *ph)
+{
+ if (ph) {
+ /* TODO: release mk_vhost */
+ if (ph->ctx) {
+ mk_destroy(ph->ctx);
+ }
+ flb_free(ph);
+ }
+}
+
+int vivo_http_server_start(struct vivo_http *ph)
+{
+ return mk_start(ph->ctx);
+}
+
+int vivo_http_server_stop(struct vivo_http *ph)
+{
+ return mk_stop(ph->ctx);
+}
diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo_http.h b/fluent-bit/plugins/out_vivo_exporter/vivo_http.h
new file mode 100644
index 000000000..77453d289
--- /dev/null
+++ b/fluent-bit/plugins/out_vivo_exporter/vivo_http.h
@@ -0,0 +1,56 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_VIVO_EXPORTER_HTTP_H
+#define FLB_VIVO_EXPORTER_HTTP_H
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <monkey/mk_lib.h>
+
+#include "vivo.h"
+
+/* HTTP response payload received through a Message Queue */
+struct vivo_http_buf {
+ int users;
+ char *buf_data;
+ size_t buf_size;
+ struct mk_list _head;
+};
+
+/* Vivo HTTP Server context */
+struct vivo_http {
+ mk_ctx_t *ctx; /* Monkey HTTP Context */
+ int vid; /* Virtual host ID */
+ int qid_metrics; /* Queue ID for Metrics buffer */
+ struct flb_config *config; /* Fluent Bit context */
+};
+
+struct vivo_http *vivo_http_server_create(struct vivo_exporter *ctx,
+ const char *listen,
+ int tcp_port,
+ struct flb_config *config);
+void vivo_http_server_destroy(struct vivo_http *ph);
+
+int vivo_http_server_start(struct vivo_http *ph);
+int vivo_http_server_stop(struct vivo_http *ph);
+
+int vivo_http_server_mq_push_metrics(struct vivo_http *ph,
+ void *data, size_t size);
+
+#endif
diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo_stream.c b/fluent-bit/plugins/out_vivo_exporter/vivo_stream.c
new file mode 100644
index 000000000..9c8edb9ea
--- /dev/null
+++ b/fluent-bit/plugins/out_vivo_exporter/vivo_stream.c
@@ -0,0 +1,239 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_sds.h>
+
+#include "vivo.h"
+#include "vivo_stream.h"
+
+static inline void stream_lock(struct vivo_stream *vs)
+{
+ pthread_mutex_lock(&vs->stream_mutex);
+}
+
+static inline void stream_unlock(struct vivo_stream *vs)
+{
+ pthread_mutex_unlock(&vs->stream_mutex);
+}
+
+struct vivo_stream *vivo_stream_create(struct vivo_exporter *ctx)
+{
+ struct vivo_stream *vs;
+
+ vs = flb_calloc(1, sizeof(struct vivo_stream));
+ if (!vs) {
+ flb_errno();
+ return NULL;
+ }
+ vs->parent = ctx;
+ vs->entries_added = 0;
+ pthread_mutex_init(&vs->stream_mutex, NULL);
+ mk_list_init(&vs->entries);
+ mk_list_init(&vs->purge);
+
+ return vs;
+}
+
+static uint64_t vivo_stream_get_new_id(struct vivo_stream *vs)
+{
+ uint64_t id = 0;
+
+ stream_lock(vs);
+
+ /* to get the next id, we simply use the value of the counter 'entries' added */
+ id = vs->entries_added;
+
+ stream_unlock(vs);
+
+ return id;
+}
+
+
+struct vivo_stream_entry *vivo_stream_entry_create(struct vivo_stream *vs,
+ void *data, size_t size)
+{
+ struct vivo_stream_entry *e;
+
+ if (size == 0) {
+ return NULL;
+ }
+
+ e = flb_calloc(1, sizeof(struct vivo_stream_entry));
+ if (!e) {
+ flb_errno();
+ return NULL;
+ }
+ e->id = vivo_stream_get_new_id(vs);
+
+ e->data = flb_sds_create_len(data, size);
+ if (!e->data) {
+ flb_free(e);
+ return NULL;
+ }
+
+ return e;
+}
+
+/*
+ * NOTE: this function must always invoked under the stream_mutex in a locked state, we don't do the lock
+ * inside the function since the caller might be itering the parent list
+ */
+static void vivo_stream_entry_destroy(struct vivo_stream *vs, struct vivo_stream_entry *e)
+{
+ mk_list_del(&e->_head);
+ vs->current_bytes_size -= flb_sds_len(e->data);
+ flb_sds_destroy(e->data);
+ flb_free(e);
+}
+
+/* NOTE: this function must run inside a stream_lock()/stream_unlock() protection */
+static void vivo_stream_cleanup(struct vivo_stream *vs)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct vivo_stream_entry *e;
+
+ mk_list_foreach_safe(head, tmp, &vs->entries) {
+ e = mk_list_entry(head, struct vivo_stream_entry, _head);
+ vivo_stream_entry_destroy(vs, e);
+ }
+}
+
+void vivo_stream_destroy(struct vivo_stream *vs)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct vivo_stream_entry *e;
+
+ stream_lock(vs);
+ mk_list_foreach_safe(head, tmp, &vs->entries) {
+ e = mk_list_entry(head, struct vivo_stream_entry, _head);
+ vivo_stream_entry_destroy(vs, e);
+ }
+ stream_unlock(vs);
+
+ flb_free(vs);
+}
+
+flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t to,
+ int64_t limit,
+ int64_t *stream_start_id, int64_t *stream_end_id)
+{
+ int64_t count = 0;
+ flb_sds_t buf;
+ struct mk_list *head;
+ struct vivo_stream_entry *e;
+ struct vivo_exporter *ctx = vs->parent;
+
+ buf = flb_sds_create_size(vs->current_bytes_size);
+ if (!buf) {
+ return NULL;
+ }
+
+ stream_lock(vs);
+
+ mk_list_foreach(head, &vs->entries) {
+ e = mk_list_entry(head, struct vivo_stream_entry, _head);
+
+ if (e->id < from && from != -1) {
+ continue;
+ }
+
+ if (e->id > to && to != -1 && to != 0) {
+ break;
+ }
+
+ if (count == 0) {
+ *stream_start_id = e->id;
+ }
+
+ flb_sds_cat_safe(&buf, e->data, flb_sds_len(e->data));
+
+ *stream_end_id = e->id;
+ count++;
+
+ if (limit > 0 && count >= limit) {
+ break;
+ }
+ }
+
+ if (ctx->empty_stream_on_read) {
+ vivo_stream_cleanup(vs);
+ }
+
+ stream_unlock(vs);
+
+ return buf;
+}
+
+/* Remove entries from the stream until cleanup 'size' bytes. This function is inside a stream_lock()/stream_unlock() */
+static void vivo_stream_make_room(struct vivo_stream *vs, size_t size)
+{
+ size_t deleted = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct vivo_stream_entry *e;
+
+ mk_list_foreach_safe(head, tmp, &vs->entries) {
+ e = mk_list_entry(head, struct vivo_stream_entry, _head);
+ deleted += flb_sds_len(e->data);
+ vivo_stream_entry_destroy(vs, e);
+ if (deleted >= size) {
+ break;
+ }
+ }
+}
+
+struct vivo_stream_entry *vivo_stream_append(struct vivo_stream *vs, void *data, size_t size)
+{
+ struct vivo_stream_entry *e;
+ struct vivo_exporter *ctx = vs->parent;
+
+ e = vivo_stream_entry_create(vs, data, size);
+ if (!e) {
+ return NULL;
+ }
+
+ stream_lock(vs);
+
+ /* check queue space */
+ if (vs->current_bytes_size + size > ctx->stream_queue_size) {
+ /* free up some space */
+ if (mk_list_size(&vs->entries) == 0) {
+ /* do nothing, the user size setup is smaller that the incoming size, let it pass */
+ }
+ else {
+ /* release at least 'size' bytes */
+ vivo_stream_make_room(vs, size);
+ }
+ }
+
+ /* add entry to the end of the list */
+ mk_list_add(&e->_head, &vs->entries);
+
+ vs->entries_added++;
+ vs->current_bytes_size += size;
+
+ stream_unlock(vs);
+
+ return e;
+}
diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo_stream.h b/fluent-bit/plugins/out_vivo_exporter/vivo_stream.h
new file mode 100644
index 000000000..fb0ca6053
--- /dev/null
+++ b/fluent-bit/plugins/out_vivo_exporter/vivo_stream.h
@@ -0,0 +1,59 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_VIVO_STREAM_H
+#define FLB_VIVO_STREAM_H
+
+#include <fluent-bit/flb_info.h>
+
+#include "vivo.h"
+
+struct vivo_stream_entry {
+ int64_t id;
+ flb_sds_t data;
+ struct mk_list _head;
+};
+
+struct vivo_stream {
+ size_t entries_added;
+
+ size_t current_bytes_size;
+
+ struct mk_list entries;
+ struct mk_list purge;
+
+ /* mutex to protect the context */
+ pthread_mutex_t stream_mutex;
+
+ /* back reference to struct vivo_exporter context */
+ void *parent;
+};
+
+
+struct vivo_stream *vivo_stream_create(struct vivo_exporter *ctx);
+void vivo_stream_destroy(struct vivo_stream *vs);
+struct vivo_stream_entry *vivo_stream_entry_create(struct vivo_stream *vs,
+ void *data, size_t size);
+struct vivo_stream_entry *vivo_stream_append(struct vivo_stream *vs, void *data,
+ size_t size);
+flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t to,
+ int64_t limit,
+ int64_t *stream_start_id, int64_t *stream_end_id);
+
+#endif