diff options
Diffstat (limited to 'fluent-bit/plugins/out_vivo_exporter')
-rw-r--r-- | fluent-bit/plugins/out_vivo_exporter/CMakeLists.txt | 15 | ||||
-rw-r--r-- | fluent-bit/plugins/out_vivo_exporter/vivo.c | 343 | ||||
-rw-r--r-- | fluent-bit/plugins/out_vivo_exporter/vivo.h | 45 | ||||
-rw-r--r-- | fluent-bit/plugins/out_vivo_exporter/vivo_http.c | 266 | ||||
-rw-r--r-- | fluent-bit/plugins/out_vivo_exporter/vivo_http.h | 56 | ||||
-rw-r--r-- | fluent-bit/plugins/out_vivo_exporter/vivo_stream.c | 239 | ||||
-rw-r--r-- | fluent-bit/plugins/out_vivo_exporter/vivo_stream.h | 59 |
7 files changed, 1023 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_vivo_exporter/CMakeLists.txt b/fluent-bit/plugins/out_vivo_exporter/CMakeLists.txt new file mode 100644 index 000000000..e458b1ff4 --- /dev/null +++ b/fluent-bit/plugins/out_vivo_exporter/CMakeLists.txt @@ -0,0 +1,15 @@ +if(NOT FLB_HTTP_SERVER) + message( + FATAL_ERROR + "Vivo Exporter output plugin requires built-in HTTP Server be enabled: + Use -DFLB_HTTP_SERVER=On option to enable it" + ) +endif() + +set(src + vivo_http.c + vivo_stream.c + vivo.c + ) + +FLB_PLUGIN(out_vivo_exporter "${src}" "") diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo.c b/fluent-bit/plugins/out_vivo_exporter/vivo.c new file mode 100644 index 000000000..85e1e0159 --- /dev/null +++ b/fluent-bit/plugins/out_vivo_exporter/vivo.c @@ -0,0 +1,343 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_kv.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_log_event_decoder.h> +#include <fluent-bit/flb_log_event_encoder.h> + +#include "vivo.h" +#include "vivo_http.h" +#include "vivo_stream.h" + +static flb_sds_t format_logs(struct flb_event_chunk *event_chunk) +{ + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + int result; + int i; + flb_sds_t out_js; + flb_sds_t out_buf = NULL; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + + result = flb_log_event_decoder_init(&log_decoder, + (char *) event_chunk->data, + event_chunk->size); + + if (result != FLB_EVENT_DECODER_SUCCESS) { + return NULL; + } + + out_buf = flb_sds_create_size((event_chunk->size * 2) / 4); + if (!out_buf) { + flb_errno(); + return NULL; + } + + /* Create temporary msgpack buffer */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + + while ((result = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + /* + * If the caller specified FLB_PACK_JSON_DATE_FLUENT, we format the data + * by using the following structure: + * + * [[TIMESTAMP, {"_tag": "...", ...MORE_METADATA}], {RECORD CONTENT}] + */ + msgpack_pack_array(&tmp_pck, 2); + msgpack_pack_array(&tmp_pck, 2); + msgpack_pack_uint64(&tmp_pck, flb_time_to_nanosec(&log_event.timestamp)); + + /* add tag only */ + msgpack_pack_map(&tmp_pck, 1 + log_event.metadata->via.map.size); + + msgpack_pack_str(&tmp_pck, 4); + msgpack_pack_str_body(&tmp_pck, "_tag", 4); + + msgpack_pack_str(&tmp_pck, flb_sds_len(event_chunk->tag)); + msgpack_pack_str_body(&tmp_pck, event_chunk->tag, flb_sds_len(event_chunk->tag)); + + /* Append remaining keys/values */ + for (i = 0; + i < log_event.metadata->via.map.size; + i++) { + msgpack_pack_object(&tmp_pck, + log_event.metadata->via.map.ptr[i].key); + msgpack_pack_object(&tmp_pck, + log_event.metadata->via.map.ptr[i].val); + } + + /* pack the remaining content */ + msgpack_pack_map(&tmp_pck, log_event.body->via.map.size); + + /* Append remaining keys/values */ + for (i = 0; + i < log_event.body->via.map.size; + i++) { + msgpack_pack_object(&tmp_pck, + log_event.body->via.map.ptr[i].key); + msgpack_pack_object(&tmp_pck, + log_event.body->via.map.ptr[i].val); + } + + /* Concatenate by using break lines */ + out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size); + if (!out_js) { + flb_sds_destroy(out_buf); + msgpack_sbuffer_destroy(&tmp_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + return NULL; + } + + /* + * One map record has been converted, now append it to the + * outgoing out_buf sds variable. + */ + flb_sds_cat_safe(&out_buf, out_js, flb_sds_len(out_js)); + flb_sds_cat_safe(&out_buf, "\n", 1); + + flb_sds_destroy(out_js); + msgpack_sbuffer_clear(&tmp_sbuf); + } + + /* Release the unpacker */ + flb_log_event_decoder_destroy(&log_decoder); + + msgpack_sbuffer_destroy(&tmp_sbuf); + + return out_buf; +} + +static int logs_event_chunk_append(struct vivo_exporter *ctx, + struct flb_event_chunk *event_chunk) +{ + size_t len; + flb_sds_t json; + struct vivo_stream_entry *entry; + + + json = format_logs(event_chunk); + if (!json) { + flb_plg_error(ctx->ins, "cannot convert logs chunk to JSON"); + return -1; + } + + /* append content to the stream */ + len = flb_sds_len(json); + entry = vivo_stream_append(ctx->stream_logs, json, len); + + flb_sds_destroy(json); + + if (!entry) { + flb_plg_error(ctx->ins, "cannot append JSON log to stream"); + return -1; + } + + return 0; +} + +static int metrics_traces_event_chunk_append(struct vivo_exporter *ctx, + struct vivo_stream *vs, + struct flb_event_chunk *event_chunk) +{ + size_t len; + flb_sds_t json; + struct vivo_stream_entry *entry; + + /* Convert msgpack to readable JSON format */ + json = flb_msgpack_raw_to_json_sds(event_chunk->data, event_chunk->size); + if (!json) { + flb_plg_error(ctx->ins, "cannot convert metrics chunk to JSON"); + return -1; + } + + flb_sds_cat_safe(&json, "\n", 1); + + /* append content to the stream */ + len = flb_sds_len(json); + entry = vivo_stream_append(vs, json, len); + + flb_sds_destroy(json); + + if (!entry) { + flb_plg_error(ctx->ins, "cannot append JSON log to stream"); + return -1; + } + + return 0; +} + +static int cb_vivo_init(struct flb_output_instance *ins, + struct flb_config *config, + void *data) +{ + int ret; + struct vivo_exporter *ctx; + + flb_output_net_default("0.0.0.0", 2025 , ins); + + ctx = flb_calloc(1, sizeof(struct vivo_exporter)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + flb_output_set_context(ins, ctx); + + /* Load config map */ + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + return -1; + } + + /* Create Streams */ + ctx->stream_logs = vivo_stream_create(ctx); + if (!ctx->stream_logs) { + return -1; + } + + ctx->stream_metrics = vivo_stream_create(ctx); + if (!ctx->stream_metrics) { + return -1; + } + + ctx->stream_traces = vivo_stream_create(ctx); + if (!ctx->stream_traces) { + return -1; + } + + /* HTTP Server context */ + ctx->http = vivo_http_server_create(ctx, + ins->host.name, ins->host.port, config); + if (!ctx->http) { + flb_plg_error(ctx->ins, "could not initialize HTTP server, aborting"); + return -1; + } + + /* Start HTTP Server */ + ret = vivo_http_server_start(ctx->http); + if (ret == -1) { + return -1; + } + + flb_plg_info(ctx->ins, "listening iface=%s tcp_port=%d", + ins->host.name, ins->host.port); + + return 0; +} + +static void cb_vivo_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, void *out_context, + struct flb_config *config) +{ + int ret = -1; + struct vivo_exporter *ctx = out_context; + +#ifdef FLB_HAVE_METRICS + if (event_chunk->type == FLB_EVENT_TYPE_METRICS) { + ret = metrics_traces_event_chunk_append(ctx, ctx->stream_metrics, event_chunk); + } +#endif + if (event_chunk->type == FLB_EVENT_TYPE_LOGS) { + ret = logs_event_chunk_append(ctx, event_chunk); + } + else if (event_chunk->type == FLB_EVENT_TYPE_TRACES) { + ret = metrics_traces_event_chunk_append(ctx, ctx->stream_traces, event_chunk); + } + + if (ret == 0) { + FLB_OUTPUT_RETURN(FLB_OK); + } + + FLB_OUTPUT_RETURN(FLB_ERROR); +} + +static int cb_vivo_exit(void *data, struct flb_config *config) +{ + struct vivo_exporter *ctx = data; + + if (!ctx) { + return 0; + } + + if (ctx->http) { + vivo_http_server_stop(ctx->http); + vivo_http_server_destroy(ctx->http); + } + + vivo_stream_destroy(ctx->stream_logs); + vivo_stream_destroy(ctx->stream_metrics); + vivo_stream_destroy(ctx->stream_traces); + + flb_free(ctx); + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_BOOL, "empty_stream_on_read", "off", + 0, FLB_TRUE, offsetof(struct vivo_exporter, empty_stream_on_read), + "If enabled, when an HTTP client consumes the data from a stream, the queue " + "content will be removed" + }, + + { + FLB_CONFIG_MAP_SIZE, "stream_queue_size", "20M", + 0, FLB_TRUE, offsetof(struct vivo_exporter, stream_queue_size), + "Specify the maximum queue size per stream. Each specific stream for logs, metrics " + "and traces can hold up to 'stream_queue_size' bytes." + }, + + { + FLB_CONFIG_MAP_STR, "http_cors_allow_origin", NULL, + 0, FLB_TRUE, offsetof(struct vivo_exporter, http_cors_allow_origin), + "Specify the value for the HTTP Access-Control-Allow-Origin header (CORS)" + }, + + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_output_plugin out_vivo_exporter_plugin = { + .name = "vivo_exporter", + .description = "Vivo Exporter", + .cb_init = cb_vivo_init, + .cb_flush = cb_vivo_flush, + .cb_exit = cb_vivo_exit, + .flags = FLB_OUTPUT_NET, + .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES, + .config_map = config_map, + .workers = 1, +}; diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo.h b/fluent-bit/plugins/out_vivo_exporter/vivo.h new file mode 100644 index 000000000..943c40364 --- /dev/null +++ b/fluent-bit/plugins/out_vivo_exporter/vivo.h @@ -0,0 +1,45 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_VIVO_EXPORTER_H +#define FLB_VIVO_EXPORTER_H + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_ring_buffer.h> + +#define VIVO_RING_BUFFER_SIZE 10 + +/* Plugin context */ +struct vivo_exporter { + void *http; + + void *stream_logs; + void *stream_metrics; + void *stream_traces; + + /* options */ + int empty_stream_on_read; + size_t stream_queue_size; + flb_sds_t http_cors_allow_origin; + + /* instance context */ + struct flb_output_instance *ins; +}; + +#endif diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo_http.c b/fluent-bit/plugins/out_vivo_exporter/vivo_http.c new file mode 100644 index 000000000..efd39dcc8 --- /dev/null +++ b/fluent-bit/plugins/out_vivo_exporter/vivo_http.c @@ -0,0 +1,266 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_http_server.h> + +#include "vivo.h" +#include "vivo_http.h" +#include "vivo_stream.h" + +#define VIVO_CONTENT_TYPE "Content-Type" +#define VIVO_CONTENT_TYPE_JSON "application/json" +#define VIVO_STREAM_START_ID "Vivo-Stream-Start-ID" +#define VIVO_STREAM_END_ID "Vivo-Stream-End-ID" + +static int stream_get_uri_properties(mk_request_t *request, + int64_t *from, int64_t *to, int64_t *limit) +{ + char *ptr; + flb_sds_t buf; + + *from = -1; + *to = -1; + *limit = -1; + + buf = flb_sds_create_len(request->query_string.data, request->query_string.len); + if (!buf) { + return -1; + } + + ptr = strstr(buf, "from="); + if (ptr) { + *from = atol(ptr + 5); + } + + ptr = strstr(buf, "to="); + if (ptr) { + *to = atol(ptr + 3); + } + + ptr = strstr(buf, "limit="); + if (ptr) { + *limit = atol(ptr + 6); + } + + flb_sds_destroy(buf); + + return 0; +} + +static void headers_set(mk_request_t *request, struct vivo_stream *vs) +{ + struct vivo_exporter *ctx; + + + /* parent context */ + ctx = vs->parent; + + /* content type */ + mk_http_header(request, + VIVO_CONTENT_TYPE, sizeof(VIVO_CONTENT_TYPE) - 1, + VIVO_CONTENT_TYPE_JSON, sizeof(VIVO_CONTENT_TYPE_JSON) - 1); + + /* CORS */ + if (ctx->http_cors_allow_origin) { + mk_http_header(request, + "Access-Control-Allow-Origin", + sizeof("Access-Control-Allow-Origin") - 1, + ctx->http_cors_allow_origin, + flb_sds_len(ctx->http_cors_allow_origin)); + + mk_http_header(request, + "Access-Control-Allow-Headers", + sizeof("Access-Control-Allow-Headers") - 1, + "Origin, X-Requested-With, Content-Type, Accept", + sizeof("Origin, X-Requested-With, Content-Type, Accept") - 1); + + mk_http_header(request, + "Access-Control-Expose-Headers", + sizeof("Access-Control-Expose-Headers") - 1, + "vivo-stream-start-id, vivo-stream-end-id", + sizeof("vivo-stream-start-id, vivo-stream-end-id") - 1); + + } +} + +static void serve_content(mk_request_t *request, struct vivo_stream *vs) +{ + int64_t from = -1; + int64_t to = -1; + int64_t limit = -1; + int64_t stream_start_id = -1; + int64_t stream_end_id = -1; + flb_sds_t payload; + flb_sds_t str_start; + flb_sds_t str_end; + + + if (request->query_string.len > 0) { + stream_get_uri_properties(request, &from, &to, &limit); + } + + payload = vivo_stream_get_content(vs, from, to, limit, + &stream_start_id, &stream_end_id); + if (!payload) { + mk_http_status(request, 500); + return; + } + + if (flb_sds_len(payload) == 0) { + mk_http_status(request, 200); + headers_set(request, vs); + flb_sds_destroy(payload); + return; + } + + mk_http_status(request, 200); + + /* set response headers */ + headers_set(request, vs); + + /* stream ids served: compose buffer and set headers */ + str_start = flb_sds_create_size(32); + flb_sds_printf(&str_start, "%" PRId64, stream_start_id); + + str_end = flb_sds_create_size(32); + flb_sds_printf(&str_end, "%" PRId64, stream_end_id); + + mk_http_header(request, + VIVO_STREAM_START_ID, sizeof(VIVO_STREAM_START_ID) - 1, + str_start, flb_sds_len(str_start)); + + mk_http_header(request, + VIVO_STREAM_END_ID, sizeof(VIVO_STREAM_END_ID) - 1, + str_end, flb_sds_len(str_end)); + + /* send payload */ + mk_http_send(request, payload, flb_sds_len(payload), NULL); + + /* release */ + flb_sds_destroy(payload); + flb_sds_destroy(str_start); + flb_sds_destroy(str_end); +} + +/* HTTP endpoint: /logs */ +static void cb_logs(mk_request_t *request, void *data) +{ + struct vivo_exporter *ctx; + + ctx = (struct vivo_exporter *) data; + + serve_content(request, ctx->stream_logs); + mk_http_done(request); +} + +/* HTTP endpoint: /metrics */ +static void cb_metrics(mk_request_t *request, void *data) +{ + struct vivo_exporter *ctx; + + ctx = (struct vivo_exporter *) data; + + serve_content(request, ctx->stream_metrics); + mk_http_done(request); +} + +static void cb_traces(mk_request_t *request, void *data) +{ + struct vivo_exporter *ctx; + + ctx = (struct vivo_exporter *) data; + + serve_content(request, ctx->stream_traces); + mk_http_done(request); +} + +/* HTTP endpoint: / (root) */ +static void cb_root(mk_request_t *request, void *data) +{ + (void) data; + + mk_http_status(request, 200); + mk_http_send(request, "Fluent Bit Vivo Exporter\n", 24, NULL); + mk_http_done(request); +} + +struct vivo_http *vivo_http_server_create(struct vivo_exporter *ctx, + const char *listen, + int tcp_port, + struct flb_config *config) +{ + int vid; + char tmp[32]; + struct vivo_http *ph; + + ph = flb_malloc(sizeof(struct vivo_http)); + if (!ph) { + flb_errno(); + return NULL; + } + ph->config = config; + + /* HTTP Server context */ + ph->ctx = mk_create(); + if (!ph->ctx) { + flb_free(ph); + return NULL; + } + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) -1, "%s:%d", listen, tcp_port); + mk_config_set(ph->ctx, + "Listen", tmp, + "Workers", "1", + NULL); + + /* Virtual host */ + vid = mk_vhost_create(ph->ctx, NULL); + ph->vid = vid; + + /* Set HTTP URI callbacks */ + mk_vhost_handler(ph->ctx, vid, "/logs", cb_logs, ctx); + mk_vhost_handler(ph->ctx, vid, "/metrics", cb_metrics, ctx); + mk_vhost_handler(ph->ctx, vid, "/traces", cb_traces, ctx); + mk_vhost_handler(ph->ctx, vid, "/", cb_root, NULL); + + return ph; +} + +void vivo_http_server_destroy(struct vivo_http *ph) +{ + if (ph) { + /* TODO: release mk_vhost */ + if (ph->ctx) { + mk_destroy(ph->ctx); + } + flb_free(ph); + } +} + +int vivo_http_server_start(struct vivo_http *ph) +{ + return mk_start(ph->ctx); +} + +int vivo_http_server_stop(struct vivo_http *ph) +{ + return mk_stop(ph->ctx); +} diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo_http.h b/fluent-bit/plugins/out_vivo_exporter/vivo_http.h new file mode 100644 index 000000000..77453d289 --- /dev/null +++ b/fluent-bit/plugins/out_vivo_exporter/vivo_http.h @@ -0,0 +1,56 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_VIVO_EXPORTER_HTTP_H +#define FLB_VIVO_EXPORTER_HTTP_H + +#include <fluent-bit/flb_output_plugin.h> +#include <monkey/mk_lib.h> + +#include "vivo.h" + +/* HTTP response payload received through a Message Queue */ +struct vivo_http_buf { + int users; + char *buf_data; + size_t buf_size; + struct mk_list _head; +}; + +/* Vivo HTTP Server context */ +struct vivo_http { + mk_ctx_t *ctx; /* Monkey HTTP Context */ + int vid; /* Virtual host ID */ + int qid_metrics; /* Queue ID for Metrics buffer */ + struct flb_config *config; /* Fluent Bit context */ +}; + +struct vivo_http *vivo_http_server_create(struct vivo_exporter *ctx, + const char *listen, + int tcp_port, + struct flb_config *config); +void vivo_http_server_destroy(struct vivo_http *ph); + +int vivo_http_server_start(struct vivo_http *ph); +int vivo_http_server_stop(struct vivo_http *ph); + +int vivo_http_server_mq_push_metrics(struct vivo_http *ph, + void *data, size_t size); + +#endif diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo_stream.c b/fluent-bit/plugins/out_vivo_exporter/vivo_stream.c new file mode 100644 index 000000000..9c8edb9ea --- /dev/null +++ b/fluent-bit/plugins/out_vivo_exporter/vivo_stream.c @@ -0,0 +1,239 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_sds.h> + +#include "vivo.h" +#include "vivo_stream.h" + +static inline void stream_lock(struct vivo_stream *vs) +{ + pthread_mutex_lock(&vs->stream_mutex); +} + +static inline void stream_unlock(struct vivo_stream *vs) +{ + pthread_mutex_unlock(&vs->stream_mutex); +} + +struct vivo_stream *vivo_stream_create(struct vivo_exporter *ctx) +{ + struct vivo_stream *vs; + + vs = flb_calloc(1, sizeof(struct vivo_stream)); + if (!vs) { + flb_errno(); + return NULL; + } + vs->parent = ctx; + vs->entries_added = 0; + pthread_mutex_init(&vs->stream_mutex, NULL); + mk_list_init(&vs->entries); + mk_list_init(&vs->purge); + + return vs; +} + +static uint64_t vivo_stream_get_new_id(struct vivo_stream *vs) +{ + uint64_t id = 0; + + stream_lock(vs); + + /* to get the next id, we simply use the value of the counter 'entries' added */ + id = vs->entries_added; + + stream_unlock(vs); + + return id; +} + + +struct vivo_stream_entry *vivo_stream_entry_create(struct vivo_stream *vs, + void *data, size_t size) +{ + struct vivo_stream_entry *e; + + if (size == 0) { + return NULL; + } + + e = flb_calloc(1, sizeof(struct vivo_stream_entry)); + if (!e) { + flb_errno(); + return NULL; + } + e->id = vivo_stream_get_new_id(vs); + + e->data = flb_sds_create_len(data, size); + if (!e->data) { + flb_free(e); + return NULL; + } + + return e; +} + +/* + * NOTE: this function must always invoked under the stream_mutex in a locked state, we don't do the lock + * inside the function since the caller might be itering the parent list + */ +static void vivo_stream_entry_destroy(struct vivo_stream *vs, struct vivo_stream_entry *e) +{ + mk_list_del(&e->_head); + vs->current_bytes_size -= flb_sds_len(e->data); + flb_sds_destroy(e->data); + flb_free(e); +} + +/* NOTE: this function must run inside a stream_lock()/stream_unlock() protection */ +static void vivo_stream_cleanup(struct vivo_stream *vs) +{ + struct mk_list *tmp; + struct mk_list *head; + struct vivo_stream_entry *e; + + mk_list_foreach_safe(head, tmp, &vs->entries) { + e = mk_list_entry(head, struct vivo_stream_entry, _head); + vivo_stream_entry_destroy(vs, e); + } +} + +void vivo_stream_destroy(struct vivo_stream *vs) +{ + struct mk_list *tmp; + struct mk_list *head; + struct vivo_stream_entry *e; + + stream_lock(vs); + mk_list_foreach_safe(head, tmp, &vs->entries) { + e = mk_list_entry(head, struct vivo_stream_entry, _head); + vivo_stream_entry_destroy(vs, e); + } + stream_unlock(vs); + + flb_free(vs); +} + +flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t to, + int64_t limit, + int64_t *stream_start_id, int64_t *stream_end_id) +{ + int64_t count = 0; + flb_sds_t buf; + struct mk_list *head; + struct vivo_stream_entry *e; + struct vivo_exporter *ctx = vs->parent; + + buf = flb_sds_create_size(vs->current_bytes_size); + if (!buf) { + return NULL; + } + + stream_lock(vs); + + mk_list_foreach(head, &vs->entries) { + e = mk_list_entry(head, struct vivo_stream_entry, _head); + + if (e->id < from && from != -1) { + continue; + } + + if (e->id > to && to != -1 && to != 0) { + break; + } + + if (count == 0) { + *stream_start_id = e->id; + } + + flb_sds_cat_safe(&buf, e->data, flb_sds_len(e->data)); + + *stream_end_id = e->id; + count++; + + if (limit > 0 && count >= limit) { + break; + } + } + + if (ctx->empty_stream_on_read) { + vivo_stream_cleanup(vs); + } + + stream_unlock(vs); + + return buf; +} + +/* Remove entries from the stream until cleanup 'size' bytes. This function is inside a stream_lock()/stream_unlock() */ +static void vivo_stream_make_room(struct vivo_stream *vs, size_t size) +{ + size_t deleted = 0; + struct mk_list *tmp; + struct mk_list *head; + struct vivo_stream_entry *e; + + mk_list_foreach_safe(head, tmp, &vs->entries) { + e = mk_list_entry(head, struct vivo_stream_entry, _head); + deleted += flb_sds_len(e->data); + vivo_stream_entry_destroy(vs, e); + if (deleted >= size) { + break; + } + } +} + +struct vivo_stream_entry *vivo_stream_append(struct vivo_stream *vs, void *data, size_t size) +{ + struct vivo_stream_entry *e; + struct vivo_exporter *ctx = vs->parent; + + e = vivo_stream_entry_create(vs, data, size); + if (!e) { + return NULL; + } + + stream_lock(vs); + + /* check queue space */ + if (vs->current_bytes_size + size > ctx->stream_queue_size) { + /* free up some space */ + if (mk_list_size(&vs->entries) == 0) { + /* do nothing, the user size setup is smaller that the incoming size, let it pass */ + } + else { + /* release at least 'size' bytes */ + vivo_stream_make_room(vs, size); + } + } + + /* add entry to the end of the list */ + mk_list_add(&e->_head, &vs->entries); + + vs->entries_added++; + vs->current_bytes_size += size; + + stream_unlock(vs); + + return e; +} diff --git a/fluent-bit/plugins/out_vivo_exporter/vivo_stream.h b/fluent-bit/plugins/out_vivo_exporter/vivo_stream.h new file mode 100644 index 000000000..fb0ca6053 --- /dev/null +++ b/fluent-bit/plugins/out_vivo_exporter/vivo_stream.h @@ -0,0 +1,59 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_VIVO_STREAM_H +#define FLB_VIVO_STREAM_H + +#include <fluent-bit/flb_info.h> + +#include "vivo.h" + +struct vivo_stream_entry { + int64_t id; + flb_sds_t data; + struct mk_list _head; +}; + +struct vivo_stream { + size_t entries_added; + + size_t current_bytes_size; + + struct mk_list entries; + struct mk_list purge; + + /* mutex to protect the context */ + pthread_mutex_t stream_mutex; + + /* back reference to struct vivo_exporter context */ + void *parent; +}; + + +struct vivo_stream *vivo_stream_create(struct vivo_exporter *ctx); +void vivo_stream_destroy(struct vivo_stream *vs); +struct vivo_stream_entry *vivo_stream_entry_create(struct vivo_stream *vs, + void *data, size_t size); +struct vivo_stream_entry *vivo_stream_append(struct vivo_stream *vs, void *data, + size_t size); +flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t to, + int64_t limit, + int64_t *stream_start_id, int64_t *stream_end_id); + +#endif |