diff options
Diffstat (limited to 'src/fluent-bit/plugins/in_storage_backlog/sb.c')
-rw-r--r-- | src/fluent-bit/plugins/in_storage_backlog/sb.c | 713 |
1 files changed, 713 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/in_storage_backlog/sb.c b/src/fluent-bit/plugins/in_storage_backlog/sb.c new file mode 100644 index 000000000..1380caf8a --- /dev/null +++ b/src/fluent-bit/plugins/in_storage_backlog/sb.c @@ -0,0 +1,713 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_input_chunk.h> +#include <fluent-bit/flb_storage.h> +#include <fluent-bit/flb_utils.h> +#include <chunkio/chunkio.h> +#include <chunkio/cio_error.h> + +#include <sys/types.h> +#include <sys/stat.h> + +#ifndef FLB_SYSTEM_WINDOWS +#include <unistd.h> +#endif + +struct sb_out_chunk { + struct cio_chunk *chunk; + struct cio_stream *stream; + size_t size; + struct mk_list _head; +}; + +struct sb_out_queue { + struct flb_output_instance *ins; + struct mk_list chunks; /* head for every sb_out_chunk */ + struct mk_list _head; +}; + +struct flb_sb { + int coll_fd; /* collector id */ + size_t mem_limit; /* memory limit */ + struct flb_input_instance *ins; /* input instance */ + struct cio_ctx *cio; /* chunk i/o instance */ + struct mk_list backlogs; /* list of all pending chunks segregated by output plugin */ +}; + + +static inline struct flb_sb *sb_get_context(struct flb_config *config); + +static struct sb_out_chunk *sb_allocate_chunk(struct cio_chunk *chunk, + struct cio_stream *stream, + size_t size); + +static void sb_destroy_chunk(struct sb_out_chunk *chunk); + +static void sb_destroy_backlog(struct sb_out_queue *backlog, struct flb_sb *context); + +static int sb_allocate_backlogs(struct flb_sb *ctx); + +static void sb_destroy_backlogs(struct flb_sb *ctx); + +static struct sb_out_queue *sb_find_segregated_backlog_by_output_plugin_instance( + struct flb_output_instance *output_plugin, + struct flb_sb *context); + +static void sb_remove_chunk_from_segregated_backlog(struct cio_chunk *target_chunk, + struct sb_out_queue *backlog, + int destroy); + +static void sb_remove_chunk_from_segregated_backlogs(struct cio_chunk *chunk, + struct flb_sb *context); + +static int sb_append_chunk_to_segregated_backlog(struct cio_chunk *target_chunk, + struct cio_stream *stream, + size_t target_chunk_size, + struct sb_out_queue *backlog); + +static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chunk, + struct cio_stream *stream, + struct flb_sb *context); + +int sb_segregate_chunks(struct flb_config *config); + +int sb_release_output_queue_space(struct flb_output_instance *output_plugin, + ssize_t *required_space); + +ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin, + size_t required_space); + + +static inline struct flb_sb *sb_get_context(struct flb_config *config) +{ + if (config == NULL) { + return NULL; + } + + if (config->storage_input_plugin == NULL) { + return NULL; + } + + return ((struct flb_input_instance *) config->storage_input_plugin)->context; +} + +static struct sb_out_chunk *sb_allocate_chunk(struct cio_chunk *chunk, + struct cio_stream *stream, + size_t size) +{ + struct sb_out_chunk *result; + + result = (struct sb_out_chunk *) flb_calloc(1, sizeof(struct sb_out_chunk)); + + if (result != NULL) { + result->chunk = chunk; + result->stream = stream; + result->size = size; + } + + return result; +} + +static void sb_destroy_chunk(struct sb_out_chunk *chunk) +{ + flb_free(chunk); +} + +static void sb_destroy_backlog(struct sb_out_queue *backlog, struct flb_sb *context) +{ + struct mk_list *chunk_iterator_tmp; + struct mk_list *chunk_iterator; + struct sb_out_chunk *chunk; + + mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) { + chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); + + sb_remove_chunk_from_segregated_backlogs(chunk->chunk, context); + } +} + +static int sb_allocate_backlogs(struct flb_sb *context) +{ + struct mk_list *output_plugin_iterator; + struct flb_output_instance *output_plugin; + struct sb_out_queue *backlog; + + mk_list_foreach(output_plugin_iterator, &context->ins->config->outputs) { + output_plugin = mk_list_entry(output_plugin_iterator, + struct flb_output_instance, + _head); + + backlog = (struct sb_out_queue *) \ + flb_calloc(1, sizeof(struct sb_out_queue)); + + if (backlog == NULL) { + sb_destroy_backlogs(context); + + return -1; + } + + backlog->ins = output_plugin; + + mk_list_init(&backlog->chunks); + + mk_list_add(&backlog->_head, &context->backlogs); + } + + return 0; +} + +static void sb_destroy_backlogs(struct flb_sb *context) +{ + struct mk_list *backlog_iterator_tmp; + struct mk_list *backlog_iterator; + struct sb_out_queue *backlog; + + mk_list_foreach_safe(backlog_iterator, backlog_iterator_tmp, &context->backlogs) { + backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head); + + mk_list_del(&backlog->_head); + + sb_destroy_backlog(backlog, context); + + flb_free(backlog); + } +} + +static struct sb_out_queue *sb_find_segregated_backlog_by_output_plugin_instance( + struct flb_output_instance *output_plugin, + struct flb_sb *context) +{ + struct mk_list *backlog_iterator; + struct sb_out_queue *backlog; + + mk_list_foreach(backlog_iterator, &context->backlogs) { + backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head); + + if (output_plugin == backlog->ins) { + return backlog; + } + } + + return NULL; +} + +static void sb_remove_chunk_from_segregated_backlog(struct cio_chunk *target_chunk, + struct sb_out_queue *backlog, + int destroy) +{ + struct mk_list *chunk_iterator_tmp; + struct mk_list *chunk_iterator; + struct sb_out_chunk *chunk; + + mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) { + chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); + + if (chunk->chunk == target_chunk) { + mk_list_del(&chunk->_head); + + backlog->ins->fs_backlog_chunks_size -= cio_chunk_get_real_size(target_chunk); + + if (destroy) { + sb_destroy_chunk(chunk); + } + + break; + } + } +} + +static void sb_remove_chunk_from_segregated_backlogs(struct cio_chunk *target_chunk, + struct flb_sb *context) +{ + struct mk_list *backlog_iterator; + struct sb_out_queue *backlog; + + mk_list_foreach(backlog_iterator, &context->backlogs) { + backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head); + + sb_remove_chunk_from_segregated_backlog(target_chunk, backlog, FLB_TRUE); + } +} + +static int sb_append_chunk_to_segregated_backlog(struct cio_chunk *target_chunk, + struct cio_stream *stream, + size_t target_chunk_size, + struct sb_out_queue *backlog) +{ + struct sb_out_chunk *chunk; + + chunk = sb_allocate_chunk(target_chunk, stream, target_chunk_size); + + if (chunk == NULL) { + flb_errno(); + return -1; + } + + mk_list_add(&chunk->_head, &backlog->chunks); + + backlog->ins->fs_backlog_chunks_size += target_chunk_size; + + return 0; +} + +static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chunk, + struct cio_stream *stream, + struct flb_sb *context) +{ + struct flb_input_chunk dummy_input_chunk; + struct mk_list *tmp; + struct mk_list *head; + size_t chunk_size; + struct sb_out_queue *backlog; + int tag_len; + const char * tag_buf; + int result; + + memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk)); + + dummy_input_chunk.in = context->ins; + dummy_input_chunk.chunk = target_chunk; + + chunk_size = cio_chunk_get_real_size(target_chunk); + + if (chunk_size < 0) { + flb_warn("[storage backlog] could not get real size of chunk %s/%s", + stream->name, target_chunk->name); + return -1; + } + + result = flb_input_chunk_get_tag(&dummy_input_chunk, &tag_buf, &tag_len); + if (result == -1) { + flb_error("[storage backlog] could not retrieve chunk tag from %s/%s, " + "removing it from the queue", + stream->name, target_chunk->name); + return -2; + } + + flb_routes_mask_set_by_tag(dummy_input_chunk.routes_mask, tag_buf, tag_len, + context->ins); + + mk_list_foreach_safe(head, tmp, &context->backlogs) { + backlog = mk_list_entry(head, struct sb_out_queue, _head); + if (flb_routes_mask_get_bit(dummy_input_chunk.routes_mask, + backlog->ins->id)) { + result = sb_append_chunk_to_segregated_backlog(target_chunk, stream, + chunk_size, backlog); + if (result) { + return -3; + } + } + } + + return 0; +} + +int sb_segregate_chunks(struct flb_config *config) +{ + int ret; + size_t size; + struct mk_list *tmp; + struct mk_list *stream_iterator; + struct mk_list *chunk_iterator; + int chunk_error; + struct flb_sb *context; + struct cio_stream *stream; + struct cio_chunk *chunk; + + context = sb_get_context(config); + + if (context == NULL) { + return 0; + } + + ret = sb_allocate_backlogs(context); + if (ret) { + return -2; + } + + mk_list_foreach(stream_iterator, &context->cio->streams) { + stream = mk_list_entry(stream_iterator, struct cio_stream, _head); + + mk_list_foreach_safe(chunk_iterator, tmp, &stream->chunks) { + chunk = mk_list_entry(chunk_iterator, struct cio_chunk, _head); + + if (!cio_chunk_is_up(chunk)) { + ret = cio_chunk_up_force(chunk); + if (ret == CIO_CORRUPTED) { + if (config->storage_del_bad_chunks) { + chunk_error = cio_error_get(chunk); + + if (chunk_error == CIO_ERR_BAD_FILE_SIZE || + chunk_error == CIO_ERR_BAD_LAYOUT) + { + flb_plg_error(context->ins, "discarding irrecoverable chunk %s/%s", stream->name, chunk->name); + + cio_chunk_close(chunk, CIO_TRUE); + } + } + + continue; + } + } + + if (!cio_chunk_is_up(chunk)) { + return -3; + } + + /* try to segregate a chunk */ + ret = sb_append_chunk_to_segregated_backlogs(chunk, stream, context); + if (ret) { + /* + * if the chunk could not be segregated, just remove it from the + * queue and continue. + * + * if content size is zero, it's safe to 'delete it'. + */ + size = cio_chunk_get_content_size(chunk); + if (size <= 0) { + cio_chunk_close(chunk, CIO_TRUE); + } + else { + cio_chunk_close(chunk, CIO_FALSE); + } + continue; + } + + /* lock the chunk */ + flb_plg_info(context->ins, "register %s/%s", stream->name, chunk->name); + + cio_chunk_lock(chunk); + cio_chunk_down(chunk); + } + } + + return 0; +} + +ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin, + size_t required_space) +{ + ssize_t releasable_space; + struct mk_list *chunk_iterator; + struct flb_sb *context; + struct sb_out_queue *backlog; + struct sb_out_chunk *chunk; + + context = sb_get_context(output_plugin->config); + + if (context == NULL) { + return 0; + } + + backlog = sb_find_segregated_backlog_by_output_plugin_instance( + output_plugin, context); + + if (backlog == NULL) { + return 0; + } + + releasable_space = 0; + + mk_list_foreach(chunk_iterator, &backlog->chunks) { + chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); + + releasable_space += chunk->size; + + if (releasable_space >= required_space) { + break; + } + } + + return releasable_space; +} + +int sb_release_output_queue_space(struct flb_output_instance *output_plugin, + ssize_t *required_space) +{ + struct mk_list *chunk_iterator_tmp; + struct cio_chunk *underlying_chunk; + struct mk_list *chunk_iterator; + size_t released_space; + struct flb_sb *context; + struct sb_out_queue *backlog; + struct sb_out_chunk *chunk; + + context = sb_get_context(output_plugin->config); + + if (context == NULL) { + return -1; + } + + backlog = sb_find_segregated_backlog_by_output_plugin_instance( + output_plugin, context); + + if (backlog == NULL) { + return -2; + } + + released_space = 0; + + mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) { + chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); + + released_space += chunk->size; + underlying_chunk = chunk->chunk; + + sb_remove_chunk_from_segregated_backlogs(underlying_chunk, context); + cio_chunk_close(underlying_chunk, FLB_TRUE); + + if (released_space >= *required_space) { + break; + } + } + + *required_space -= released_space; + + return 0; +} + +/* Collection callback */ +static int cb_queue_chunks(struct flb_input_instance *in, + struct flb_config *config, void *data) +{ + size_t empty_output_queue_count; + struct mk_list *output_queue_iterator; + struct sb_out_queue *output_queue_instance; + struct sb_out_chunk *chunk_instance; + struct flb_sb *ctx; + struct flb_input_chunk *ic; + struct flb_input_chunk tmp_ic; + void *ch; + size_t total = 0; + ssize_t size; + int ret; + int event_type; + + /* Get context */ + ctx = (struct flb_sb *) data; + + /* Get the total number of bytes already enqueued */ + total = flb_input_chunk_total_size(in); + + /* If we already hitted our limit, just wait and re-check later */ + if (total >= ctx->mem_limit) { + return 0; + } + + empty_output_queue_count = 0; + + while (total < ctx->mem_limit && + empty_output_queue_count < mk_list_size(&ctx->backlogs)) { + empty_output_queue_count = 0; + + mk_list_foreach(output_queue_iterator, &ctx->backlogs) { + output_queue_instance = mk_list_entry(output_queue_iterator, + struct sb_out_queue, + _head); + + if (mk_list_is_empty(&output_queue_instance->chunks) != 0) { + chunk_instance = mk_list_entry_first(&output_queue_instance->chunks, + struct sb_out_chunk, + _head); + + /* Try to enqueue one chunk */ + /* + * All chunks on this backlog are 'file' based, always try to set + * them up. We validate the status. + */ + ret = cio_chunk_is_up(chunk_instance->chunk); + + if (ret == CIO_FALSE) { + ret = cio_chunk_up_force(chunk_instance->chunk); + + if (ret == CIO_CORRUPTED) { + flb_plg_error(ctx->ins, "removing corrupted chunk from the " + "queue %s:%s", + chunk_instance->stream->name, chunk_instance->chunk->name); + cio_chunk_close(chunk_instance->chunk, FLB_FALSE); + sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); + /* This function will indirecly release chunk_instance so it has to be + * called last. + */ + continue; + } + else if (ret == CIO_ERROR || ret == CIO_RETRY) { + continue; + } + } + + /* + * Map the chunk file context into a temporary buffer since the + * flb_input_chunk_get_event_type() interface needs an + * struct fb_input_chunk argument. + */ + tmp_ic.chunk = chunk_instance->chunk; + + /* Retrieve the event type: FLB_INPUT_LOGS, FLB_INPUT_METRICS of FLB_INPUT_TRACES */ + ret = flb_input_chunk_get_event_type(&tmp_ic); + if (ret == -1) { + flb_plg_error(ctx->ins, "removing chunk with wrong metadata " + "from the queue %s:%s", + chunk_instance->stream->name, + chunk_instance->chunk->name); + cio_chunk_close(chunk_instance->chunk, FLB_TRUE); + sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, + ctx); + continue; + } + event_type = ret; + + /* get the number of bytes being used by the chunk */ + size = cio_chunk_get_content_size(chunk_instance->chunk); + if (size <= 0) { + flb_plg_error(ctx->ins, "removing empty chunk from the " + "queue %s:%s", + chunk_instance->stream->name, chunk_instance->chunk->name); + cio_chunk_close(chunk_instance->chunk, FLB_TRUE); + sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); + /* This function will indirecly release chunk_instance so it has to be + * called last. + */ + continue; + } + + ch = chunk_instance->chunk; + + /* Associate this backlog chunk to this instance into the engine */ + ic = flb_input_chunk_map(in, event_type, ch); + if (!ic) { + flb_plg_error(ctx->ins, "removing chunk %s:%s from the queue", + chunk_instance->stream->name, chunk_instance->chunk->name); + cio_chunk_down(chunk_instance->chunk); + + /* + * If the file cannot be mapped, just drop it. Failures are all + * associated with data corruption. + */ + cio_chunk_close(chunk_instance->chunk, FLB_TRUE); + sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); + /* This function will indirecly release chunk_instance so it has to be + * called last. + */ + continue; + } + + flb_plg_info(ctx->ins, "queueing %s:%s", + chunk_instance->stream->name, chunk_instance->chunk->name); + + /* We are removing this chunk reference from this specific backlog + * queue but we need to leave it in the remainder queues. + */ + sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); + cio_chunk_down(ch); + + /* check our limits */ + total += size; + } + else { + empty_output_queue_count++; + } + } + } + + return 0; +} + +/* Initialize plugin */ +static int cb_sb_init(struct flb_input_instance *in, + struct flb_config *config, void *data) +{ + int ret; + char mem[32]; + struct flb_sb *ctx; + + ctx = flb_malloc(sizeof(struct flb_sb)); + if (!ctx) { + flb_errno(); + return -1; + } + + ctx->cio = data; + ctx->ins = in; + ctx->mem_limit = flb_utils_size_to_bytes(config->storage_bl_mem_limit); + + mk_list_init(&ctx->backlogs); + + flb_utils_bytes_to_human_readable_size(ctx->mem_limit, mem, sizeof(mem) - 1); + flb_plg_info(ctx->ins, "queue memory limit: %s", mem); + + /* export plugin context */ + flb_input_set_context(in, ctx); + + /* Set a collector to trigger the callback to queue data every second */ + ret = flb_input_set_collector_time(in, cb_queue_chunks, 1, 0, config); + if (ret < 0) { + flb_plg_error(ctx->ins, "could not create collector"); + flb_free(ctx); + return -1; + } + ctx->coll_fd = ret; + + return 0; +} + +static void cb_sb_pause(void *data, struct flb_config *config) +{ + struct flb_sb *ctx = data; + flb_input_collector_pause(ctx->coll_fd, ctx->ins); +} + +static void cb_sb_resume(void *data, struct flb_config *config) +{ + struct flb_sb *ctx = data; + flb_input_collector_resume(ctx->coll_fd, ctx->ins); +} + +static int cb_sb_exit(void *data, struct flb_config *config) +{ + struct flb_sb *ctx = data; + + flb_input_collector_pause(ctx->coll_fd, ctx->ins); + + sb_destroy_backlogs(ctx); + + flb_free(ctx); + + return 0; +} + +/* Plugin reference */ +struct flb_input_plugin in_storage_backlog_plugin = { + .name = "storage_backlog", + .description = "Storage Backlog", + .cb_init = cb_sb_init, + .cb_pre_run = NULL, + .cb_collect = NULL, + .cb_ingest = NULL, + .cb_flush_buf = NULL, + .cb_pause = cb_sb_pause, + .cb_resume = cb_sb_resume, + .cb_exit = cb_sb_exit, + + /* This plugin can only be configured and invoked by the Engine */ + .flags = FLB_INPUT_PRIVATE +}; |