diff options
Diffstat (limited to 'src/fluent-bit/plugins/in_storage_backlog/sb.c')
-rw-r--r-- | src/fluent-bit/plugins/in_storage_backlog/sb.c | 713 |
1 files changed, 0 insertions, 713 deletions
diff --git a/src/fluent-bit/plugins/in_storage_backlog/sb.c b/src/fluent-bit/plugins/in_storage_backlog/sb.c deleted file mode 100644 index 1380caf8a..000000000 --- a/src/fluent-bit/plugins/in_storage_backlog/sb.c +++ /dev/null @@ -1,713 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_input_chunk.h> -#include <fluent-bit/flb_storage.h> -#include <fluent-bit/flb_utils.h> -#include <chunkio/chunkio.h> -#include <chunkio/cio_error.h> - -#include <sys/types.h> -#include <sys/stat.h> - -#ifndef FLB_SYSTEM_WINDOWS -#include <unistd.h> -#endif - -struct sb_out_chunk { - struct cio_chunk *chunk; - struct cio_stream *stream; - size_t size; - struct mk_list _head; -}; - -struct sb_out_queue { - struct flb_output_instance *ins; - struct mk_list chunks; /* head for every sb_out_chunk */ - struct mk_list _head; -}; - -struct flb_sb { - int coll_fd; /* collector id */ - size_t mem_limit; /* memory limit */ - struct flb_input_instance *ins; /* input instance */ - struct cio_ctx *cio; /* chunk i/o instance */ - struct mk_list backlogs; /* list of all pending chunks segregated by output plugin */ -}; - - -static inline struct flb_sb *sb_get_context(struct flb_config *config); - -static struct sb_out_chunk *sb_allocate_chunk(struct cio_chunk *chunk, - struct cio_stream *stream, - size_t size); - -static void sb_destroy_chunk(struct sb_out_chunk *chunk); - -static void sb_destroy_backlog(struct sb_out_queue *backlog, struct flb_sb *context); - -static int sb_allocate_backlogs(struct flb_sb *ctx); - -static void sb_destroy_backlogs(struct flb_sb *ctx); - -static struct sb_out_queue *sb_find_segregated_backlog_by_output_plugin_instance( - struct flb_output_instance *output_plugin, - struct flb_sb *context); - -static void sb_remove_chunk_from_segregated_backlog(struct cio_chunk *target_chunk, - struct sb_out_queue *backlog, - int destroy); - -static void sb_remove_chunk_from_segregated_backlogs(struct cio_chunk *chunk, - struct flb_sb *context); - -static int sb_append_chunk_to_segregated_backlog(struct cio_chunk *target_chunk, - struct cio_stream *stream, - size_t target_chunk_size, - struct sb_out_queue *backlog); - -static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chunk, - struct cio_stream *stream, - struct flb_sb *context); - -int sb_segregate_chunks(struct flb_config *config); - -int sb_release_output_queue_space(struct flb_output_instance *output_plugin, - ssize_t *required_space); - -ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin, - size_t required_space); - - -static inline struct flb_sb *sb_get_context(struct flb_config *config) -{ - if (config == NULL) { - return NULL; - } - - if (config->storage_input_plugin == NULL) { - return NULL; - } - - return ((struct flb_input_instance *) config->storage_input_plugin)->context; -} - -static struct sb_out_chunk *sb_allocate_chunk(struct cio_chunk *chunk, - struct cio_stream *stream, - size_t size) -{ - struct sb_out_chunk *result; - - result = (struct sb_out_chunk *) flb_calloc(1, sizeof(struct sb_out_chunk)); - - if (result != NULL) { - result->chunk = chunk; - result->stream = stream; - result->size = size; - } - - return result; -} - -static void sb_destroy_chunk(struct sb_out_chunk *chunk) -{ - flb_free(chunk); -} - -static void sb_destroy_backlog(struct sb_out_queue *backlog, struct flb_sb *context) -{ - struct mk_list *chunk_iterator_tmp; - struct mk_list *chunk_iterator; - struct sb_out_chunk *chunk; - - mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) { - chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); - - sb_remove_chunk_from_segregated_backlogs(chunk->chunk, context); - } -} - -static int sb_allocate_backlogs(struct flb_sb *context) -{ - struct mk_list *output_plugin_iterator; - struct flb_output_instance *output_plugin; - struct sb_out_queue *backlog; - - mk_list_foreach(output_plugin_iterator, &context->ins->config->outputs) { - output_plugin = mk_list_entry(output_plugin_iterator, - struct flb_output_instance, - _head); - - backlog = (struct sb_out_queue *) \ - flb_calloc(1, sizeof(struct sb_out_queue)); - - if (backlog == NULL) { - sb_destroy_backlogs(context); - - return -1; - } - - backlog->ins = output_plugin; - - mk_list_init(&backlog->chunks); - - mk_list_add(&backlog->_head, &context->backlogs); - } - - return 0; -} - -static void sb_destroy_backlogs(struct flb_sb *context) -{ - struct mk_list *backlog_iterator_tmp; - struct mk_list *backlog_iterator; - struct sb_out_queue *backlog; - - mk_list_foreach_safe(backlog_iterator, backlog_iterator_tmp, &context->backlogs) { - backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head); - - mk_list_del(&backlog->_head); - - sb_destroy_backlog(backlog, context); - - flb_free(backlog); - } -} - -static struct sb_out_queue *sb_find_segregated_backlog_by_output_plugin_instance( - struct flb_output_instance *output_plugin, - struct flb_sb *context) -{ - struct mk_list *backlog_iterator; - struct sb_out_queue *backlog; - - mk_list_foreach(backlog_iterator, &context->backlogs) { - backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head); - - if (output_plugin == backlog->ins) { - return backlog; - } - } - - return NULL; -} - -static void sb_remove_chunk_from_segregated_backlog(struct cio_chunk *target_chunk, - struct sb_out_queue *backlog, - int destroy) -{ - struct mk_list *chunk_iterator_tmp; - struct mk_list *chunk_iterator; - struct sb_out_chunk *chunk; - - mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) { - chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); - - if (chunk->chunk == target_chunk) { - mk_list_del(&chunk->_head); - - backlog->ins->fs_backlog_chunks_size -= cio_chunk_get_real_size(target_chunk); - - if (destroy) { - sb_destroy_chunk(chunk); - } - - break; - } - } -} - -static void sb_remove_chunk_from_segregated_backlogs(struct cio_chunk *target_chunk, - struct flb_sb *context) -{ - struct mk_list *backlog_iterator; - struct sb_out_queue *backlog; - - mk_list_foreach(backlog_iterator, &context->backlogs) { - backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head); - - sb_remove_chunk_from_segregated_backlog(target_chunk, backlog, FLB_TRUE); - } -} - -static int sb_append_chunk_to_segregated_backlog(struct cio_chunk *target_chunk, - struct cio_stream *stream, - size_t target_chunk_size, - struct sb_out_queue *backlog) -{ - struct sb_out_chunk *chunk; - - chunk = sb_allocate_chunk(target_chunk, stream, target_chunk_size); - - if (chunk == NULL) { - flb_errno(); - return -1; - } - - mk_list_add(&chunk->_head, &backlog->chunks); - - backlog->ins->fs_backlog_chunks_size += target_chunk_size; - - return 0; -} - -static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chunk, - struct cio_stream *stream, - struct flb_sb *context) -{ - struct flb_input_chunk dummy_input_chunk; - struct mk_list *tmp; - struct mk_list *head; - size_t chunk_size; - struct sb_out_queue *backlog; - int tag_len; - const char * tag_buf; - int result; - - memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk)); - - dummy_input_chunk.in = context->ins; - dummy_input_chunk.chunk = target_chunk; - - chunk_size = cio_chunk_get_real_size(target_chunk); - - if (chunk_size < 0) { - flb_warn("[storage backlog] could not get real size of chunk %s/%s", - stream->name, target_chunk->name); - return -1; - } - - result = flb_input_chunk_get_tag(&dummy_input_chunk, &tag_buf, &tag_len); - if (result == -1) { - flb_error("[storage backlog] could not retrieve chunk tag from %s/%s, " - "removing it from the queue", - stream->name, target_chunk->name); - return -2; - } - - flb_routes_mask_set_by_tag(dummy_input_chunk.routes_mask, tag_buf, tag_len, - context->ins); - - mk_list_foreach_safe(head, tmp, &context->backlogs) { - backlog = mk_list_entry(head, struct sb_out_queue, _head); - if (flb_routes_mask_get_bit(dummy_input_chunk.routes_mask, - backlog->ins->id)) { - result = sb_append_chunk_to_segregated_backlog(target_chunk, stream, - chunk_size, backlog); - if (result) { - return -3; - } - } - } - - return 0; -} - -int sb_segregate_chunks(struct flb_config *config) -{ - int ret; - size_t size; - struct mk_list *tmp; - struct mk_list *stream_iterator; - struct mk_list *chunk_iterator; - int chunk_error; - struct flb_sb *context; - struct cio_stream *stream; - struct cio_chunk *chunk; - - context = sb_get_context(config); - - if (context == NULL) { - return 0; - } - - ret = sb_allocate_backlogs(context); - if (ret) { - return -2; - } - - mk_list_foreach(stream_iterator, &context->cio->streams) { - stream = mk_list_entry(stream_iterator, struct cio_stream, _head); - - mk_list_foreach_safe(chunk_iterator, tmp, &stream->chunks) { - chunk = mk_list_entry(chunk_iterator, struct cio_chunk, _head); - - if (!cio_chunk_is_up(chunk)) { - ret = cio_chunk_up_force(chunk); - if (ret == CIO_CORRUPTED) { - if (config->storage_del_bad_chunks) { - chunk_error = cio_error_get(chunk); - - if (chunk_error == CIO_ERR_BAD_FILE_SIZE || - chunk_error == CIO_ERR_BAD_LAYOUT) - { - flb_plg_error(context->ins, "discarding irrecoverable chunk %s/%s", stream->name, chunk->name); - - cio_chunk_close(chunk, CIO_TRUE); - } - } - - continue; - } - } - - if (!cio_chunk_is_up(chunk)) { - return -3; - } - - /* try to segregate a chunk */ - ret = sb_append_chunk_to_segregated_backlogs(chunk, stream, context); - if (ret) { - /* - * if the chunk could not be segregated, just remove it from the - * queue and continue. - * - * if content size is zero, it's safe to 'delete it'. - */ - size = cio_chunk_get_content_size(chunk); - if (size <= 0) { - cio_chunk_close(chunk, CIO_TRUE); - } - else { - cio_chunk_close(chunk, CIO_FALSE); - } - continue; - } - - /* lock the chunk */ - flb_plg_info(context->ins, "register %s/%s", stream->name, chunk->name); - - cio_chunk_lock(chunk); - cio_chunk_down(chunk); - } - } - - return 0; -} - -ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin, - size_t required_space) -{ - ssize_t releasable_space; - struct mk_list *chunk_iterator; - struct flb_sb *context; - struct sb_out_queue *backlog; - struct sb_out_chunk *chunk; - - context = sb_get_context(output_plugin->config); - - if (context == NULL) { - return 0; - } - - backlog = sb_find_segregated_backlog_by_output_plugin_instance( - output_plugin, context); - - if (backlog == NULL) { - return 0; - } - - releasable_space = 0; - - mk_list_foreach(chunk_iterator, &backlog->chunks) { - chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); - - releasable_space += chunk->size; - - if (releasable_space >= required_space) { - break; - } - } - - return releasable_space; -} - -int sb_release_output_queue_space(struct flb_output_instance *output_plugin, - ssize_t *required_space) -{ - struct mk_list *chunk_iterator_tmp; - struct cio_chunk *underlying_chunk; - struct mk_list *chunk_iterator; - size_t released_space; - struct flb_sb *context; - struct sb_out_queue *backlog; - struct sb_out_chunk *chunk; - - context = sb_get_context(output_plugin->config); - - if (context == NULL) { - return -1; - } - - backlog = sb_find_segregated_backlog_by_output_plugin_instance( - output_plugin, context); - - if (backlog == NULL) { - return -2; - } - - released_space = 0; - - mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) { - chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); - - released_space += chunk->size; - underlying_chunk = chunk->chunk; - - sb_remove_chunk_from_segregated_backlogs(underlying_chunk, context); - cio_chunk_close(underlying_chunk, FLB_TRUE); - - if (released_space >= *required_space) { - break; - } - } - - *required_space -= released_space; - - return 0; -} - -/* Collection callback */ -static int cb_queue_chunks(struct flb_input_instance *in, - struct flb_config *config, void *data) -{ - size_t empty_output_queue_count; - struct mk_list *output_queue_iterator; - struct sb_out_queue *output_queue_instance; - struct sb_out_chunk *chunk_instance; - struct flb_sb *ctx; - struct flb_input_chunk *ic; - struct flb_input_chunk tmp_ic; - void *ch; - size_t total = 0; - ssize_t size; - int ret; - int event_type; - - /* Get context */ - ctx = (struct flb_sb *) data; - - /* Get the total number of bytes already enqueued */ - total = flb_input_chunk_total_size(in); - - /* If we already hitted our limit, just wait and re-check later */ - if (total >= ctx->mem_limit) { - return 0; - } - - empty_output_queue_count = 0; - - while (total < ctx->mem_limit && - empty_output_queue_count < mk_list_size(&ctx->backlogs)) { - empty_output_queue_count = 0; - - mk_list_foreach(output_queue_iterator, &ctx->backlogs) { - output_queue_instance = mk_list_entry(output_queue_iterator, - struct sb_out_queue, - _head); - - if (mk_list_is_empty(&output_queue_instance->chunks) != 0) { - chunk_instance = mk_list_entry_first(&output_queue_instance->chunks, - struct sb_out_chunk, - _head); - - /* Try to enqueue one chunk */ - /* - * All chunks on this backlog are 'file' based, always try to set - * them up. We validate the status. - */ - ret = cio_chunk_is_up(chunk_instance->chunk); - - if (ret == CIO_FALSE) { - ret = cio_chunk_up_force(chunk_instance->chunk); - - if (ret == CIO_CORRUPTED) { - flb_plg_error(ctx->ins, "removing corrupted chunk from the " - "queue %s:%s", - chunk_instance->stream->name, chunk_instance->chunk->name); - cio_chunk_close(chunk_instance->chunk, FLB_FALSE); - sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); - /* This function will indirecly release chunk_instance so it has to be - * called last. - */ - continue; - } - else if (ret == CIO_ERROR || ret == CIO_RETRY) { - continue; - } - } - - /* - * Map the chunk file context into a temporary buffer since the - * flb_input_chunk_get_event_type() interface needs an - * struct fb_input_chunk argument. - */ - tmp_ic.chunk = chunk_instance->chunk; - - /* Retrieve the event type: FLB_INPUT_LOGS, FLB_INPUT_METRICS of FLB_INPUT_TRACES */ - ret = flb_input_chunk_get_event_type(&tmp_ic); - if (ret == -1) { - flb_plg_error(ctx->ins, "removing chunk with wrong metadata " - "from the queue %s:%s", - chunk_instance->stream->name, - chunk_instance->chunk->name); - cio_chunk_close(chunk_instance->chunk, FLB_TRUE); - sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, - ctx); - continue; - } - event_type = ret; - - /* get the number of bytes being used by the chunk */ - size = cio_chunk_get_content_size(chunk_instance->chunk); - if (size <= 0) { - flb_plg_error(ctx->ins, "removing empty chunk from the " - "queue %s:%s", - chunk_instance->stream->name, chunk_instance->chunk->name); - cio_chunk_close(chunk_instance->chunk, FLB_TRUE); - sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); - /* This function will indirecly release chunk_instance so it has to be - * called last. - */ - continue; - } - - ch = chunk_instance->chunk; - - /* Associate this backlog chunk to this instance into the engine */ - ic = flb_input_chunk_map(in, event_type, ch); - if (!ic) { - flb_plg_error(ctx->ins, "removing chunk %s:%s from the queue", - chunk_instance->stream->name, chunk_instance->chunk->name); - cio_chunk_down(chunk_instance->chunk); - - /* - * If the file cannot be mapped, just drop it. Failures are all - * associated with data corruption. - */ - cio_chunk_close(chunk_instance->chunk, FLB_TRUE); - sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); - /* This function will indirecly release chunk_instance so it has to be - * called last. - */ - continue; - } - - flb_plg_info(ctx->ins, "queueing %s:%s", - chunk_instance->stream->name, chunk_instance->chunk->name); - - /* We are removing this chunk reference from this specific backlog - * queue but we need to leave it in the remainder queues. - */ - sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); - cio_chunk_down(ch); - - /* check our limits */ - total += size; - } - else { - empty_output_queue_count++; - } - } - } - - return 0; -} - -/* Initialize plugin */ -static int cb_sb_init(struct flb_input_instance *in, - struct flb_config *config, void *data) -{ - int ret; - char mem[32]; - struct flb_sb *ctx; - - ctx = flb_malloc(sizeof(struct flb_sb)); - if (!ctx) { - flb_errno(); - return -1; - } - - ctx->cio = data; - ctx->ins = in; - ctx->mem_limit = flb_utils_size_to_bytes(config->storage_bl_mem_limit); - - mk_list_init(&ctx->backlogs); - - flb_utils_bytes_to_human_readable_size(ctx->mem_limit, mem, sizeof(mem) - 1); - flb_plg_info(ctx->ins, "queue memory limit: %s", mem); - - /* export plugin context */ - flb_input_set_context(in, ctx); - - /* Set a collector to trigger the callback to queue data every second */ - ret = flb_input_set_collector_time(in, cb_queue_chunks, 1, 0, config); - if (ret < 0) { - flb_plg_error(ctx->ins, "could not create collector"); - flb_free(ctx); - return -1; - } - ctx->coll_fd = ret; - - return 0; -} - -static void cb_sb_pause(void *data, struct flb_config *config) -{ - struct flb_sb *ctx = data; - flb_input_collector_pause(ctx->coll_fd, ctx->ins); -} - -static void cb_sb_resume(void *data, struct flb_config *config) -{ - struct flb_sb *ctx = data; - flb_input_collector_resume(ctx->coll_fd, ctx->ins); -} - -static int cb_sb_exit(void *data, struct flb_config *config) -{ - struct flb_sb *ctx = data; - - flb_input_collector_pause(ctx->coll_fd, ctx->ins); - - sb_destroy_backlogs(ctx); - - flb_free(ctx); - - return 0; -} - -/* Plugin reference */ -struct flb_input_plugin in_storage_backlog_plugin = { - .name = "storage_backlog", - .description = "Storage Backlog", - .cb_init = cb_sb_init, - .cb_pre_run = NULL, - .cb_collect = NULL, - .cb_ingest = NULL, - .cb_flush_buf = NULL, - .cb_pause = cb_sb_pause, - .cb_resume = cb_sb_resume, - .cb_exit = cb_sb_exit, - - /* This plugin can only be configured and invoked by the Engine */ - .flags = FLB_INPUT_PRIVATE -}; |