summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_storage.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
commitbe1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch)
tree9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/src/flb_storage.c
parentInitial commit. (diff)
downloadnetdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz
netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/flb_storage.c')
-rw-r--r--fluent-bit/src/flb_storage.c718
1 files changed, 718 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_storage.c b/fluent-bit/src/flb_storage.c
new file mode 100644
index 00000000..a89a4c4b
--- /dev/null
+++ b/fluent-bit/src/flb_storage.c
@@ -0,0 +1,718 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_storage.h>
+#include <fluent-bit/flb_scheduler.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_http_server.h>
+
+static struct cmt *metrics_context_create(struct flb_storage_metrics *sm)
+{
+ struct cmt *cmt;
+
+ cmt = cmt_create();
+ if (!cmt) {
+ return NULL;
+ }
+
+ sm->cmt_chunks = cmt_gauge_create(cmt,
+ "fluentbit", "storage", "chunks",
+ "Total number of chunks in the storage layer.",
+ 0, (char *[]) { NULL });
+
+ sm->cmt_mem_chunks = cmt_gauge_create(cmt,
+ "fluentbit", "storage", "mem_chunks",
+ "Total number of memory chunks.",
+ 0, (char *[]) { NULL });
+
+ sm->cmt_fs_chunks = cmt_gauge_create(cmt,
+ "fluentbit", "storage", "fs_chunks",
+ "Total number of filesystem chunks.",
+ 0, (char *[]) { NULL });
+
+ sm->cmt_fs_chunks_up = cmt_gauge_create(cmt,
+ "fluentbit", "storage", "fs_chunks_up",
+ "Total number of filesystem chunks up in memory.",
+ 0, (char *[]) { NULL });
+
+ sm->cmt_fs_chunks_down = cmt_gauge_create(cmt,
+ "fluentbit", "storage", "fs_chunks_down",
+ "Total number of filesystem chunks down.",
+ 0, (char *[]) { NULL });
+
+ return cmt;
+}
+
+
+/* This function collect the 'global' metrics of the storage layer (cmetrics) */
+int flb_storage_metrics_update(struct flb_config *ctx, struct flb_storage_metrics *sm)
+{
+ uint64_t ts;
+ struct cio_stats st;
+
+ /* Retrieve general stats from the storage layer */
+ cio_stats_get(ctx->cio, &st);
+
+ ts = cfl_time_now();
+
+ cmt_gauge_set(sm->cmt_chunks, ts, st.chunks_total, 0, NULL);
+ cmt_gauge_set(sm->cmt_mem_chunks, ts, st.chunks_mem, 0, NULL);
+ cmt_gauge_set(sm->cmt_fs_chunks, ts, st.chunks_fs, 0, NULL);
+ cmt_gauge_set(sm->cmt_fs_chunks_up, ts, st.chunks_fs_up, 0, NULL);
+ cmt_gauge_set(sm->cmt_fs_chunks_down, ts, st.chunks_fs_down, 0, NULL);
+
+ return 0;
+}
+
+static void metrics_append_general(msgpack_packer *mp_pck,
+ struct flb_config *ctx,
+ struct flb_storage_metrics *sm)
+{
+ struct cio_stats storage_st;
+
+ /* Retrieve general stats from the storage layer */
+ cio_stats_get(ctx->cio, &storage_st);
+
+ msgpack_pack_str(mp_pck, 13);
+ msgpack_pack_str_body(mp_pck, "storage_layer", 13);
+ msgpack_pack_map(mp_pck, 1);
+
+ /* Chunks */
+ msgpack_pack_str(mp_pck, 6);
+ msgpack_pack_str_body(mp_pck, "chunks", 6);
+ msgpack_pack_map(mp_pck, 5);
+
+ /* chunks['total_chunks'] */
+ msgpack_pack_str(mp_pck, 12);
+ msgpack_pack_str_body(mp_pck, "total_chunks", 12);
+ msgpack_pack_uint64(mp_pck, storage_st.chunks_total);
+
+ /* chunks['mem_chunks'] */
+ msgpack_pack_str(mp_pck, 10);
+ msgpack_pack_str_body(mp_pck, "mem_chunks", 10);
+ msgpack_pack_uint64(mp_pck, storage_st.chunks_mem);
+
+ /* chunks['fs_chunks'] */
+ msgpack_pack_str(mp_pck, 9);
+ msgpack_pack_str_body(mp_pck, "fs_chunks", 9);
+ msgpack_pack_uint64(mp_pck, storage_st.chunks_fs);
+
+ /* chunks['fs_up_chunks'] */
+ msgpack_pack_str(mp_pck, 12);
+ msgpack_pack_str_body(mp_pck, "fs_chunks_up", 12);
+ msgpack_pack_uint64(mp_pck, storage_st.chunks_fs_up);
+
+ /* chunks['fs_down_chunks'] */
+ msgpack_pack_str(mp_pck, 14);
+ msgpack_pack_str_body(mp_pck, "fs_chunks_down", 14);
+ msgpack_pack_uint64(mp_pck, storage_st.chunks_fs_down);
+}
+
+static void metrics_append_input(msgpack_packer *mp_pck,
+ struct flb_config *ctx,
+ struct flb_storage_metrics *sm)
+{
+ int len;
+ int ret;
+ uint64_t ts;
+ const char *tmp;
+ char buf[32];
+ ssize_t size;
+ size_t total_chunks;
+
+ /* chunks */
+ int up;
+ int down;
+ int busy;
+ char *name;
+ ssize_t busy_size;
+ struct mk_list *head;
+ struct mk_list *h_chunks;
+ struct flb_input_instance *i;
+ struct flb_input_chunk *ic;
+
+ /*
+ * DISCLAIMER: This interface will be deprecated once we extend Chunk I/O
+ * stats per stream.
+ *
+ * For now and to avoid duplication of iterating chunks we are adding the
+ * metrics counting for CMetrics inside the same logic for the old code.
+ */
+
+ msgpack_pack_str(mp_pck, 12);
+ msgpack_pack_str_body(mp_pck, "input_chunks", 12);
+ msgpack_pack_map(mp_pck, mk_list_size(&ctx->inputs));
+
+ /* current time */
+ ts = cfl_time_now();
+
+ /* Input Plugins Ingestion */
+ mk_list_foreach(head, &ctx->inputs) {
+ i = mk_list_entry(head, struct flb_input_instance, _head);
+
+ name = (char *) flb_input_name(i);
+ total_chunks = mk_list_size(&i->chunks);
+
+ tmp = flb_input_name(i);
+ len = strlen(tmp);
+
+ msgpack_pack_str(mp_pck, len);
+ msgpack_pack_str_body(mp_pck, tmp, len);
+
+ /* Map for 'status' and 'chunks' */
+ msgpack_pack_map(mp_pck, 2);
+
+ /*
+ * Status
+ * ======
+ */
+ msgpack_pack_str(mp_pck, 6);
+ msgpack_pack_str_body(mp_pck, "status", 6);
+
+ /* 'status' map has 2 keys: overlimit and chunks */
+ msgpack_pack_map(mp_pck, 3);
+
+ /* status['overlimit'] */
+ msgpack_pack_str(mp_pck, 9);
+ msgpack_pack_str_body(mp_pck, "overlimit", 9);
+
+
+ /* CMetrics */
+ ret = FLB_FALSE;
+ if (i->mem_buf_limit > 0) {
+ if (i->mem_chunks_size >= i->mem_buf_limit) {
+ ret = FLB_TRUE;
+ }
+ }
+ if (ret == FLB_TRUE) {
+ /* cmetrics */
+ cmt_gauge_set(i->cmt_storage_overlimit, ts, 1,
+ 1, (char *[]) {name});
+
+ /* old code */
+ msgpack_pack_true(mp_pck);
+ }
+ else {
+ /* cmetrics */
+ cmt_gauge_set(i->cmt_storage_overlimit, ts, 0,
+ 1, (char *[]) {name});
+
+ /* old code */
+ msgpack_pack_false(mp_pck);
+ }
+
+ /* fluentbit_storage_memory_bytes */
+ cmt_gauge_set(i->cmt_storage_memory_bytes, ts, i->mem_chunks_size,
+ 1, (char *[]) {name});
+
+ /* status['mem_size'] */
+ msgpack_pack_str(mp_pck, 8);
+ msgpack_pack_str_body(mp_pck, "mem_size", 8);
+
+ /* Current memory size used based on last ingestion */
+ flb_utils_bytes_to_human_readable_size(i->mem_chunks_size,
+ buf, sizeof(buf) - 1);
+ len = strlen(buf);
+ msgpack_pack_str(mp_pck, len);
+ msgpack_pack_str_body(mp_pck, buf, len);
+
+ /* status['mem_limit'] */
+ msgpack_pack_str(mp_pck, 9);
+ msgpack_pack_str_body(mp_pck, "mem_limit", 9);
+
+ flb_utils_bytes_to_human_readable_size(i->mem_buf_limit,
+ buf, sizeof(buf) - 1);
+ len = strlen(buf);
+ msgpack_pack_str(mp_pck, len);
+ msgpack_pack_str_body(mp_pck, buf, len);
+
+ /*
+ * Chunks
+ * ======
+ */
+
+ /* cmetrics */
+ cmt_gauge_set(i->cmt_storage_chunks, ts, total_chunks,
+ 1, (char *[]) {name});
+
+
+ /* old code */
+ msgpack_pack_str(mp_pck, 6);
+ msgpack_pack_str_body(mp_pck, "chunks", 6);
+
+ /* 'chunks' has 3 keys: total, up, down, busy and busy_size */
+ msgpack_pack_map(mp_pck, 5);
+
+ /* chunks['total_chunks'] */
+ msgpack_pack_str(mp_pck, 5);
+ msgpack_pack_str_body(mp_pck, "total", 5);
+ msgpack_pack_uint64(mp_pck, total_chunks);
+
+ /*
+ * chunks Details: chunks marked as 'busy' are 'locked' since they are in
+ * a 'flush' state. No more data can be appended to a busy chunk.
+ */
+ busy = 0;
+ busy_size = 0;
+
+ /* up/down */
+ up = 0;
+ down = 0;
+
+ /* Iterate chunks for the input instance in question */
+ mk_list_foreach(h_chunks, &i->chunks) {
+ ic = mk_list_entry(h_chunks, struct flb_input_chunk, _head);
+ if (ic->busy == FLB_TRUE) {
+ busy++;
+ size = cio_chunk_get_content_size(ic->chunk);
+ if (size >= 0) {
+ busy_size += size;
+ }
+ }
+
+ if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
+ up++;
+ }
+ else {
+ down++;
+ }
+
+ }
+
+ /* fluentbit_storage_chunks_up */
+ cmt_gauge_set(i->cmt_storage_chunks_up, ts, up,
+ 1, (char *[]) {name});
+
+ /* chunks['up'] */
+ msgpack_pack_str(mp_pck, 2);
+ msgpack_pack_str_body(mp_pck, "up", 2);
+ msgpack_pack_uint64(mp_pck, up);
+
+ /* fluentbit_storage_chunks_down */
+ cmt_gauge_set(i->cmt_storage_chunks_down, ts, down,
+ 1, (char *[]) {name});
+
+ /* chunks['down'] */
+ msgpack_pack_str(mp_pck, 4);
+ msgpack_pack_str_body(mp_pck, "down", 4);
+ msgpack_pack_uint64(mp_pck, down);
+
+ /* fluentbit_storage_chunks_busy */
+ cmt_gauge_set(i->cmt_storage_chunks_busy, ts, busy,
+ 1, (char *[]) {name});
+
+ /* chunks['busy'] */
+ msgpack_pack_str(mp_pck, 4);
+ msgpack_pack_str_body(mp_pck, "busy", 4);
+ msgpack_pack_uint64(mp_pck, busy);
+
+ /* fluentbit_storage_chunks_busy_size */
+ cmt_gauge_set(i->cmt_storage_chunks_busy_bytes, ts, busy_size,
+ 1, (char *[]) {name});
+
+ /* chunks['busy_size'] */
+ msgpack_pack_str(mp_pck, 9);
+ msgpack_pack_str_body(mp_pck, "busy_size", 9);
+
+ flb_utils_bytes_to_human_readable_size(busy_size, buf, sizeof(buf) - 1);
+ len = strlen(buf);
+ msgpack_pack_str(mp_pck, len);
+ msgpack_pack_str_body(mp_pck, buf, len);
+ }
+}
+
+static void cb_storage_metrics_collect(struct flb_config *ctx, void *data)
+{
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+
+ /* Prepare new outgoing buffer */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /* Pack main map and append relevant data */
+ msgpack_pack_map(&mp_pck, 2);
+ metrics_append_general(&mp_pck, ctx, data);
+ metrics_append_input(&mp_pck, ctx, data);
+
+#ifdef FLB_HAVE_HTTP_SERVER
+ if (ctx->http_server == FLB_TRUE && ctx->storage_metrics == FLB_TRUE) {
+ flb_hs_push_storage_metrics(ctx->http_ctx, mp_sbuf.data, mp_sbuf.size);
+ }
+#endif
+ msgpack_sbuffer_destroy(&mp_sbuf);
+}
+
+struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx)
+{
+ int ret;
+ struct flb_storage_metrics *sm;
+
+ sm = flb_calloc(1, sizeof(struct flb_storage_metrics));
+ if (!sm) {
+ flb_errno();
+ return NULL;
+ }
+ sm->cmt = metrics_context_create(sm);
+ if(!sm->cmt) {
+ flb_free(sm);
+ return NULL;
+ }
+
+ ret = flb_sched_timer_cb_create(ctx->sched, FLB_SCHED_TIMER_CB_PERM, 5000,
+ cb_storage_metrics_collect,
+ ctx->storage_metrics_ctx, NULL);
+ if (ret == -1) {
+ flb_error("[storage metrics] cannot create timer to collect metrics");
+ flb_free(sm);
+ return NULL;
+ }
+
+ return sm;
+}
+
+static int sort_chunk_cmp(const void *a_arg, const void *b_arg)
+{
+ char *p;
+ struct cio_chunk *chunk_a = *(struct cio_chunk **) a_arg;
+ struct cio_chunk *chunk_b = *(struct cio_chunk **) b_arg;
+ struct timespec tm_a;
+ struct timespec tm_b;
+
+ /* Scan Chunk A */
+ p = strchr(chunk_a->name, '-');
+ if (!p) {
+ return -1;
+ }
+ p++;
+
+ sscanf(p, "%lu.%lu.flb", &tm_a.tv_sec, &tm_a.tv_nsec);
+
+ /* Scan Chunk B */
+ p = strchr(chunk_b->name, '-');
+ if (!p) {
+ return -1;
+ }
+ p++;
+ sscanf(p, "%lu.%lu.flb", &tm_b.tv_sec, &tm_b.tv_nsec);
+
+ /* Compare */
+ if (tm_a.tv_sec != tm_b.tv_sec) {
+ if (tm_a.tv_sec > tm_b.tv_sec) {
+ return 1;
+ }
+ else {
+ return -1;
+ }
+ }
+ else {
+ if (tm_a.tv_nsec > tm_b.tv_nsec) {
+ return 1;
+ }
+ else if (tm_a.tv_nsec < tm_b.tv_nsec) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static void print_storage_info(struct flb_config *ctx, struct cio_ctx *cio)
+{
+ char *type;
+ char *sync;
+ char *checksum;
+ struct flb_input_instance *in;
+
+ if (cio->options.root_path) {
+ type = "memory+filesystem";
+ }
+ else {
+ type = "memory";
+ }
+
+ if (cio->options.flags & CIO_FULL_SYNC) {
+ sync = "full";
+ }
+ else {
+ sync = "normal";
+ }
+
+ if (cio->options.flags & CIO_CHECKSUM) {
+ checksum = "on";
+ }
+ else {
+ checksum = "off";
+ }
+
+ flb_info("[storage] ver=%s, type=%s, sync=%s, checksum=%s, max_chunks_up=%i",
+ cio_version(), type, sync, checksum, ctx->storage_max_chunks_up);
+
+ /* Storage input plugin */
+ if (ctx->storage_input_plugin) {
+ in = (struct flb_input_instance *) ctx->storage_input_plugin;
+ flb_info("[storage] backlog input plugin: %s", in->name);
+ }
+}
+
+static int log_cb(struct cio_ctx *ctx, int level, const char *file, int line,
+ char *str)
+{
+ if (level == CIO_LOG_ERROR) {
+ flb_error("[storage] %s", str);
+ }
+ else if (level == CIO_LOG_WARN) {
+ flb_warn("[storage] %s", str);
+ }
+ else if (level == CIO_LOG_INFO) {
+ flb_info("[storage] %s", str);
+ }
+ else if (level == CIO_LOG_DEBUG) {
+ flb_debug("[storage] %s", str);
+ }
+
+ return 0;
+}
+
+int flb_storage_input_create(struct cio_ctx *cio,
+ struct flb_input_instance *in)
+{
+ int cio_storage_type;
+ struct flb_storage_input *si;
+ struct cio_stream *stream;
+
+ /* storage config: get stream type */
+ if (in->storage_type == -1) {
+ in->storage_type = FLB_STORAGE_MEM;
+ }
+
+ if (in->storage_type == FLB_STORAGE_FS && cio->options.root_path == NULL) {
+ flb_error("[storage] instance '%s' requested filesystem storage "
+ "but no filesystem path was defined.",
+ flb_input_name(in));
+ return -1;
+ }
+
+ /*
+ * The input instance can define it owns storage type which is based on some
+ * specific Chunk I/O storage type. We handle the proper initialization here.
+ */
+ cio_storage_type = in->storage_type;
+ if (in->storage_type == FLB_STORAGE_MEMRB) {
+ cio_storage_type = FLB_STORAGE_MEM;
+ }
+
+ /* Check for duplicates */
+ stream = cio_stream_get(cio, in->name);
+ if (!stream) {
+ /* create stream for input instance */
+ stream = cio_stream_create(cio, in->name, cio_storage_type);
+ if (!stream) {
+ flb_error("[storage] cannot create stream for instance %s",
+ in->name);
+ return -1;
+ }
+ }
+
+ /* allocate storage context for the input instance */
+ si = flb_malloc(sizeof(struct flb_storage_input));
+ if (!si) {
+ flb_errno();
+ return -1;
+ }
+
+ si->stream = stream;
+ si->cio = cio;
+ si->type = in->storage_type;
+ in->storage = si;
+
+ return 0;
+}
+
+void flb_storage_input_destroy(struct flb_input_instance *in)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_input_chunk *ic;
+
+ /* Save current temporary data and destroy chunk references */
+ mk_list_foreach_safe(head, tmp, &in->chunks) {
+ ic = mk_list_entry(head, struct flb_input_chunk, _head);
+ flb_input_chunk_destroy(ic, FLB_FALSE);
+ }
+
+ flb_free(in->storage);
+ in->storage = NULL;
+}
+
+static int storage_contexts_create(struct flb_config *config)
+{
+ int c = 0;
+ int ret;
+ struct mk_list *head;
+ struct flb_input_instance *in;
+
+ /* Iterate each input instance and create a stream for it */
+ mk_list_foreach(head, &config->inputs) {
+ in = mk_list_entry(head, struct flb_input_instance, _head);
+ ret = flb_storage_input_create(config->cio, in);
+ if (ret == -1) {
+ flb_error("[storage] could not create storage for instance: %s",
+ in->name);
+ return -1;
+ }
+ c++;
+ }
+
+ return c;
+}
+
+int flb_storage_create(struct flb_config *ctx)
+{
+ int ret;
+ int flags;
+ struct flb_input_instance *in = NULL;
+ struct cio_ctx *cio;
+ struct cio_options opts = {0};
+
+ /* always use read/write mode */
+ flags = CIO_OPEN;
+
+ /* if explicitly stated any irrecoverably corrupted
+ * chunks will be deleted */
+ if (ctx->storage_del_bad_chunks) {
+ flags |= CIO_DELETE_IRRECOVERABLE;
+ }
+
+ /* synchronization mode */
+ if (ctx->storage_sync) {
+ if (strcasecmp(ctx->storage_sync, "normal") == 0) {
+ /* do nothing, keep the default */
+ }
+ else if (strcasecmp(ctx->storage_sync, "full") == 0) {
+ flags |= CIO_FULL_SYNC;
+ }
+ else {
+ flb_error("[storage] invalid synchronization mode");
+ return -1;
+ }
+ }
+
+ /* checksum */
+ if (ctx->storage_checksum == FLB_TRUE) {
+ flags |= CIO_CHECKSUM;
+ }
+
+ /* file trimming */
+ if (ctx->storage_trim_files == FLB_TRUE) {
+ flags |= CIO_TRIM_FILES;
+ }
+
+ /* chunkio options */
+ cio_options_init(&opts);
+
+ opts.root_path = ctx->storage_path;
+ opts.flags = flags;
+ opts.log_cb = log_cb;
+ opts.log_level = CIO_LOG_INFO;
+
+ /* Create chunkio context */
+ cio = cio_create(&opts);
+ if (!cio) {
+ flb_error("[storage] error initializing storage engine");
+ return -1;
+ }
+ ctx->cio = cio;
+
+ /* Set Chunk I/O maximum number of chunks up */
+ if (ctx->storage_max_chunks_up == 0) {
+ ctx->storage_max_chunks_up = FLB_STORAGE_MAX_CHUNKS_UP;
+ }
+ cio_set_max_chunks_up(ctx->cio, ctx->storage_max_chunks_up);
+
+ /* Load content from the file system if any */
+ ret = cio_load(ctx->cio, NULL);
+ if (ret == -1) {
+ flb_error("[storage] error scanning root path content: %s",
+ ctx->storage_path);
+ cio_destroy(ctx->cio);
+ return -1;
+ }
+
+ /* Sort chunks */
+ cio_qsort(ctx->cio, sort_chunk_cmp);
+
+ /*
+ * If we have a filesystem storage path, create an instance of the
+ * storage_backlog input plugin to consume any possible pending
+ * chunks.
+ */
+ if (ctx->storage_path) {
+ in = flb_input_new(ctx, "storage_backlog", cio, FLB_FALSE);
+ if (!in) {
+ flb_error("[storage] cannot init storage backlog input plugin");
+ cio_destroy(cio);
+ ctx->cio = NULL;
+ return -1;
+ }
+ ctx->storage_input_plugin = in;
+
+ /* Set a queue memory limit */
+ if (!ctx->storage_bl_mem_limit) {
+ ctx->storage_bl_mem_limit = flb_strdup(FLB_STORAGE_BL_MEM_LIMIT);
+ }
+ }
+
+ /* Create streams for input instances */
+ ret = storage_contexts_create(ctx);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* print storage info */
+ print_storage_info(ctx, cio);
+
+ return 0;
+}
+
+void flb_storage_destroy(struct flb_config *ctx)
+{
+ struct cio_ctx *cio;
+ struct flb_storage_metrics *sm;
+
+ /* Destroy Chunk I/O context */
+ cio = (struct cio_ctx *) ctx->cio;
+
+ if (!cio) {
+ return;
+ }
+
+ sm = ctx->storage_metrics_ctx;
+ if (ctx->storage_metrics == FLB_TRUE && sm != NULL) {
+ cmt_destroy(sm->cmt);
+ flb_free(sm);
+ ctx->storage_metrics_ctx = NULL;
+ }
+
+ cio_destroy(cio);
+ ctx->cio = NULL;
+}