diff options
Diffstat (limited to 'fluent-bit/src/flb_input_chunk.c')
-rw-r--r-- | fluent-bit/src/flb_input_chunk.c | 2009 |
1 files changed, 0 insertions, 2009 deletions
diff --git a/fluent-bit/src/flb_input_chunk.c b/fluent-bit/src/flb_input_chunk.c deleted file mode 100644 index c71ae3ef0..000000000 --- a/fluent-bit/src/flb_input_chunk.c +++ /dev/null @@ -1,2009 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define FS_CHUNK_SIZE_DEBUG(op) {flb_trace("[%d] %s -> fs_chunks_size = %zu", \ - __LINE__, op->name, op->fs_chunks_size);} -#define FS_CHUNK_SIZE_DEBUG_MOD(op, chunk, mod) {flb_trace( \ - "[%d] %s -> fs_chunks_size = %zu mod=%zd chunk=%s", __LINE__, \ - op->name, op->fs_chunks_size, mod, flb_input_chunk_get_name(chunk));} - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_config.h> -#include <fluent-bit/flb_input.h> -#include <fluent-bit/flb_input_chunk.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_storage.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_router.h> -#include <fluent-bit/flb_task.h> -#include <fluent-bit/flb_routes_mask.h> -#include <fluent-bit/flb_metrics.h> -#include <fluent-bit/stream_processor/flb_sp.h> -#include <fluent-bit/flb_ring_buffer.h> -#include <chunkio/chunkio.h> -#include <monkey/mk_core.h> - - -#ifdef FLB_HAVE_CHUNK_TRACE -#include <fluent-bit/flb_chunk_trace.h> -#endif /* FLB_HAVE_CHUNK_TRACE */ - - -#define BLOCK_UNTIL_KEYPRESS() {char temp_keypress_buffer; read(0, &temp_keypress_buffer, 1);} - -#define FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL 0 -#define FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL 1 - -struct input_chunk_raw { - struct flb_input_instance *ins; - int event_type; - size_t records; - flb_sds_t tag; - void *buf_data; - size_t buf_size; -}; - -#ifdef FLB_HAVE_IN_STORAGE_BACKLOG - -extern ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin, - size_t required_space); - -extern int sb_release_output_queue_space(struct flb_output_instance *output_plugin, - ssize_t *required_space); - - -#else - -ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin, - size_t required_space) -{ - return 0; -} - -int sb_release_output_queue_space(struct flb_output_instance *output_plugin, - ssize_t *required_space) -{ - return 0; -} - -#endif - -static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic, - struct flb_input_chunk *old_ic, - uint64_t o_id); - -static int flb_input_chunk_is_task_safe_delete(struct flb_task *task); - -static int flb_input_chunk_drop_task_route( - struct flb_task *task, - struct flb_output_instance *o_ins); - -static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic); - -static int flb_input_chunk_release_space( - struct flb_input_chunk *new_input_chunk, - struct flb_input_instance *input_plugin, - struct flb_output_instance *output_plugin, - ssize_t *required_space, - int release_scope) -{ - struct mk_list *input_chunk_iterator_tmp; - struct mk_list *input_chunk_iterator; - int chunk_destroy_flag; - struct flb_input_chunk *old_input_chunk; - ssize_t released_space; - int chunk_released; - ssize_t chunk_size; - - released_space = 0; - - mk_list_foreach_safe(input_chunk_iterator, input_chunk_iterator_tmp, - &input_plugin->chunks) { - old_input_chunk = mk_list_entry(input_chunk_iterator, - struct flb_input_chunk, _head); - - if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask, - output_plugin->id)) { - continue; - } - - if (flb_input_chunk_safe_delete(new_input_chunk, - old_input_chunk, - output_plugin->id) == FLB_FALSE) { - continue; - } - - if (flb_input_chunk_drop_task_route(old_input_chunk->task, - output_plugin) == FLB_FALSE) { - continue; - } - - chunk_size = flb_input_chunk_get_real_size(old_input_chunk); - chunk_released = FLB_FALSE; - chunk_destroy_flag = FLB_FALSE; - - if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) { - flb_routes_mask_clear_bit(old_input_chunk->routes_mask, - output_plugin->id); - - FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size); - output_plugin->fs_chunks_size -= chunk_size; - - chunk_destroy_flag = flb_routes_mask_is_empty( - old_input_chunk->routes_mask); - - chunk_released = FLB_TRUE; - } - else if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) { - chunk_destroy_flag = FLB_TRUE; - } - - if (chunk_destroy_flag) { - if (old_input_chunk->task != NULL) { - /* - * If the chunk is referenced by a task and task has no active route, - * we need to destroy the task as well. - */ - if (old_input_chunk->task->users == 0) { - flb_debug("[task] drop task_id %d with no active route from input plugin %s", - old_input_chunk->task->id, new_input_chunk->in->name); - flb_task_destroy(old_input_chunk->task, FLB_TRUE); - - chunk_released = FLB_TRUE; - } - } - else { - flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s", - flb_input_chunk_get_name(old_input_chunk), new_input_chunk->in->name); - - flb_input_chunk_destroy(old_input_chunk, FLB_TRUE); - - chunk_released = FLB_TRUE; - } - } - - if (chunk_released) { - released_space += chunk_size; - } - - if (released_space >= *required_space) { - break; - } - } - - *required_space -= released_space; - - return 0; -} - -static void generate_chunk_name(struct flb_input_instance *in, - char *out_buf, int buf_size) -{ - struct flb_time tm; - (void) in; - - flb_time_get(&tm); - snprintf(out_buf, buf_size - 1, - "%i-%lu.%4lu.flb", - getpid(), - tm.tm.tv_sec, tm.tm.tv_nsec); -} - -ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic) -{ - return cio_chunk_get_content_size(ic->chunk); -} - -/* - * When chunk is set to DOWN from memory, data_size is set to 0 and - * cio_chunk_get_content_size(1) returns the data_size. fs_chunks_size - * is used to track the size of chunks in filesystem so we need to call - * cio_chunk_get_real_size to return the original size in the file system - */ -static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic) -{ - ssize_t meta_size; - ssize_t size; - - size = cio_chunk_get_real_size(ic->chunk); - - if (size != 0) { - return size; - } - - // Real size is not synced to chunk yet - size = flb_input_chunk_get_size(ic); - if (size == 0) { - flb_debug("[input chunk] no data in the chunk %s", - flb_input_chunk_get_name(ic)); - return -1; - } - - meta_size = cio_meta_size(ic->chunk); - size += meta_size - /* See https://github.com/edsiper/chunkio#file-layout for more details */ - + 2 /* HEADER BYTES */ - + 4 /* CRC32 */ - + 16 /* PADDING */ - + 2; /* METADATA LENGTH BYTES */ - - return size; -} - -int flb_input_chunk_write(void *data, const char *buf, size_t len) -{ - int ret; - struct flb_input_chunk *ic; - - ic = (struct flb_input_chunk *) data; - - ret = cio_chunk_write(ic->chunk, buf, len); - return ret; -} - -int flb_input_chunk_write_at(void *data, off_t offset, - const char *buf, size_t len) -{ - int ret; - struct flb_input_chunk *ic; - - ic = (struct flb_input_chunk *) data; - - ret = cio_chunk_write_at(ic->chunk, offset, buf, len); - return ret; -} - -static int flb_input_chunk_drop_task_route( - struct flb_task *task, - struct flb_output_instance *output_plugin) -{ - int route_status; - int result; - - if (task == NULL) { - return FLB_TRUE; - } - - result = FLB_TRUE; - - if (task->users != 0) { - result = FLB_FALSE; - - if (output_plugin != NULL) { - flb_task_acquire_lock(task); - - route_status = flb_task_get_route_status(task, output_plugin); - - if (route_status == FLB_TASK_ROUTE_INACTIVE) { - flb_task_set_route_status(task, - output_plugin, - FLB_TASK_ROUTE_DROPPED); - - result = FLB_TRUE; - } - - flb_task_release_lock(task); - } - } - - return result; -} - - -/* - * For input_chunk referenced by an outgoing task, we need to check - * whether the chunk is in the middle of output flush callback - */ -static int flb_input_chunk_is_task_safe_delete(struct flb_task *task) -{ - if (!task) { - return FLB_TRUE; - } - - if (task->users != 0) { - return FLB_FALSE; - } - - return FLB_TRUE; -} - -static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic, - struct flb_input_chunk *old_ic, - uint64_t o_id) -{ - /* The chunk we want to drop should not be the incoming chunk */ - if (ic == old_ic) { - return FLB_FALSE; - } - - /* - * Even if chunks from same input plugin have same routes_mask when created, - * the routes_mask could be modified when new chunks is ingested. Therefore, - * we still need to do the validation on the routes_mask with o_id. - */ - if (flb_routes_mask_get_bit(old_ic->routes_mask, o_id) == 0) { - return FLB_FALSE; - } - - return FLB_TRUE; -} - -int flb_input_chunk_release_space_compound( - struct flb_input_chunk *new_input_chunk, - struct flb_output_instance *output_plugin, - size_t *local_release_requirement, - int release_local_space) -{ - ssize_t required_space_remainder; - struct flb_input_instance *storage_backlog_instance; - struct flb_input_instance *input_plugin_instance; - struct mk_list *iterator; - int result; - - storage_backlog_instance = output_plugin->config->storage_input_plugin; - - *local_release_requirement = flb_input_chunk_get_real_size(new_input_chunk); - required_space_remainder = (ssize_t) *local_release_requirement; - - if (required_space_remainder > 0) { - result = flb_input_chunk_release_space(new_input_chunk, - storage_backlog_instance, - output_plugin, - &required_space_remainder, - FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL); - } - - if (required_space_remainder > 0) { - result = sb_release_output_queue_space(output_plugin, - &required_space_remainder); - } - - if (release_local_space) { - if (required_space_remainder > 0) { - result = flb_input_chunk_release_space(new_input_chunk, - new_input_chunk->in, - output_plugin, - &required_space_remainder, - FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL); - } - } - - if (required_space_remainder > 0) { - mk_list_foreach(iterator, &output_plugin->config->inputs) { - input_plugin_instance = \ - mk_list_entry(iterator, struct flb_input_instance, _head); - - if (input_plugin_instance != new_input_chunk->in) { - result = flb_input_chunk_release_space( - new_input_chunk, - input_plugin_instance, - output_plugin, - &required_space_remainder, - FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL); - } - - if (required_space_remainder <= 0) { - break; - } - } - } - - if (required_space_remainder < 0) { - required_space_remainder = 0; - } - - *local_release_requirement = (size_t) required_space_remainder; - - (void) result; - - return 0; -} - -/* - * Find a slot in the output instance to append the new data with size chunk_size, it - * will drop the the oldest chunks when the limitation on local disk is reached. - */ -int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic, - size_t chunk_size, int overlimit) -{ - int count; - int result; - struct mk_list *head; - struct flb_output_instance *o_ins; - size_t local_release_requirement; - - /* - * For each output instances that will be over the limit after adding the new chunk, - * we have to determine how many chunks needs to be removed. We will adjust the - * routes_mask to only route to the output plugin that have enough space after - * deleting some chunks fome the queue. - */ - count = 0; - - mk_list_foreach(head, &ic->in->config->outputs) { - o_ins = mk_list_entry(head, struct flb_output_instance, _head); - - if ((o_ins->total_limit_size == -1) || ((1 << o_ins->id) & overlimit) == 0 || - (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) { - continue; - } - - local_release_requirement = 0; - - result = flb_input_chunk_release_space_compound( - ic, o_ins, - &local_release_requirement, - FLB_TRUE); - - if (result != 0 || - local_release_requirement != 0) { - count++; - } - } - - if (count != 0) { - flb_error("[input chunk] fail to drop enough chunks in order to place new data"); - exit(0); - } - - return 0; -} - -/* - * Returns a non-zero result if any output instances will reach the limit - * after buffering the new data - */ -int flb_input_chunk_has_overlimit_routes(struct flb_input_chunk *ic, - size_t chunk_size) -{ - int overlimit = 0; - struct mk_list *head; - struct flb_output_instance *o_ins; - - mk_list_foreach(head, &ic->in->config->outputs) { - o_ins = mk_list_entry(head, struct flb_output_instance, _head); - - if ((o_ins->total_limit_size == -1) || - (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) { - continue; - } - - FS_CHUNK_SIZE_DEBUG(o_ins); - flb_debug("[input chunk] chunk %s required %ld bytes and %ld bytes left " - "in plugin %s", flb_input_chunk_get_name(ic), chunk_size, - o_ins->total_limit_size - - o_ins->fs_backlog_chunks_size - - o_ins->fs_chunks_size, - o_ins->name); - - if ((o_ins->fs_chunks_size + - o_ins->fs_backlog_chunks_size + - chunk_size) > o_ins->total_limit_size) { - overlimit |= (1 << o_ins->id); - } - } - - return overlimit; -} - -/* Find a slot for the incoming data to buffer it in local file system - * returns 0 if none of the routes can be written to - */ -int flb_input_chunk_place_new_chunk(struct flb_input_chunk *ic, size_t chunk_size) -{ - int overlimit; - overlimit = flb_input_chunk_has_overlimit_routes(ic, chunk_size); - if (overlimit != 0) { - flb_input_chunk_find_space_new_data(ic, chunk_size, overlimit); - } - - return !flb_routes_mask_is_empty(ic->routes_mask); -} - -/* Create an input chunk using a Chunk I/O */ -struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, - int event_type, - void *chunk) -{ - int records = 0; - int tag_len; - int has_routes; - int ret; - uint64_t ts; - char *buf_data; - size_t buf_size; - size_t offset; - ssize_t bytes; - const char *tag_buf; - struct flb_input_chunk *ic; - - /* Create context for the input instance */ - ic = flb_calloc(1, sizeof(struct flb_input_chunk)); - if (!ic) { - flb_errno(); - return NULL; - } - ic->event_type = event_type; - ic->busy = FLB_FALSE; - ic->fs_counted = FLB_FALSE; - ic->fs_backlog = FLB_TRUE; - ic->chunk = chunk; - ic->in = in; - msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write); - - ret = cio_chunk_get_content(ic->chunk, &buf_data, &buf_size); - if (ret != CIO_OK) { - flb_error("[input chunk] error retrieving content for metrics"); - flb_free(ic); - return NULL; - } - - if (ic->event_type == FLB_INPUT_LOGS) { - /* Validate records in the chunk */ - ret = flb_mp_validate_log_chunk(buf_data, buf_size, &records, &offset); - if (ret == -1) { - /* If there are valid records, truncate the chunk size */ - if (records <= 0) { - flb_plg_error(in, - "chunk validation failed, data might be corrupted. " - "No valid records found, the chunk will be discarded."); - flb_free(ic); - return NULL; - } - if (records > 0 && offset > 32) { - flb_plg_warn(in, - "chunk validation failed, data might be corrupted. " - "Found %d valid records, failed content starts " - "right after byte %lu. Recovering valid records.", - records, offset); - - /* truncate the chunk to recover valid records */ - cio_chunk_write_at(chunk, offset, NULL, 0); - } - else { - flb_plg_error(in, - "chunk validation failed, data might be corrupted. " - "Found %d valid records, failed content starts " - "right after byte %lu. Cannot recover chunk,", - records, offset); - flb_free(ic); - return NULL; - } - } - } - else if (ic->event_type == FLB_INPUT_METRICS) { - ret = flb_mp_validate_metric_chunk(buf_data, buf_size, &records, &offset); - if (ret == -1) { - if (records <= 0) { - flb_plg_error(in, - "metrics chunk validation failed, data might be corrupted. " - "No valid records found, the chunk will be discarded."); - flb_free(ic); - return NULL; - } - if (records > 0 && offset > 32) { - flb_plg_warn(in, - "metrics chunk validation failed, data might be corrupted. " - "Found %d valid records, failed content starts " - "right after byte %lu. Recovering valid records.", - records, offset); - - /* truncate the chunk to recover valid records */ - cio_chunk_write_at(chunk, offset, NULL, 0); - } - else { - flb_plg_error(in, - "metrics chunk validation failed, data might be corrupted. " - "Found %d valid records, failed content starts " - "right after byte %lu. Cannot recover chunk,", - records, offset); - flb_free(ic); - return NULL; - } - - } - } - else if (ic->event_type == FLB_INPUT_TRACES) { - - } - - /* Skip chunks without content data */ - if (records == 0) { - flb_plg_error(in, - "chunk validation failed, data might be corrupted. " - "No valid records found, the chunk will be discarded."); - flb_free(ic); - return NULL; - } - - /* - * If the content is valid and the chunk has extra padding zeros, just - * perform an adjustment. - */ - bytes = cio_chunk_get_content_size(chunk); - if (bytes == -1) { - flb_free(ic); - return NULL; - } - if (offset < bytes) { - cio_chunk_write_at(chunk, offset, NULL, 0); - } - - /* Update metrics */ -#ifdef FLB_HAVE_METRICS - ic->total_records = records; - if (ic->total_records > 0) { - /* timestamp */ - ts = cfl_time_now(); - - /* fluentbit_input_records_total */ - cmt_counter_add(in->cmt_records, ts, ic->total_records, - 1, (char *[]) {(char *) flb_input_name(in)}); - - /* fluentbit_input_bytes_total */ - cmt_counter_add(in->cmt_bytes, ts, buf_size, - 1, (char *[]) {(char *) flb_input_name(in)}); - - /* OLD metrics */ - flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->total_records, in->metrics); - flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics); - } -#endif - - /* Get the the tag reference (chunk metadata) */ - ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len); - if (ret == -1) { - flb_error("[input chunk] error retrieving tag of input chunk"); - flb_free(ic); - return NULL; - } - - bytes = flb_input_chunk_get_real_size(ic); - if (bytes < 0) { - flb_warn("[input chunk] could not retrieve chunk real size"); - flb_free(ic); - return NULL; - } - - has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag_buf, tag_len, in); - if (has_routes == 0) { - flb_warn("[input chunk] no matching route for backoff log chunk %s", - flb_input_chunk_get_name(ic)); - } - - mk_list_add(&ic->_head, &in->chunks); - - flb_input_chunk_update_output_instances(ic, bytes); - - return ic; -} - -static int input_chunk_write_header(struct cio_chunk *chunk, int event_type, - char *tag, int tag_len) - -{ - int ret; - int meta_size; - char *meta; - - /* - * Prepare the Chunk metadata header - * ---------------------------------- - * m[0] = FLB_INPUT_CHUNK_MAGIC_BYTE_0 - * m[1] = FLB_INPUT_CHUNK_MAGIC_BYTE_1 - * m[2] = type (FLB_INPUT_CHUNK_TYPE_LOG or FLB_INPUT_CHUNK_TYPE_METRIC or FLB_INPUT_CHUNK_TYPE_TRACE - * m[3] = 0 (unused for now) - */ - - /* write metadata (tag) */ - if (tag_len > (65535 - FLB_INPUT_CHUNK_META_HEADER)) { - /* truncate length */ - tag_len = 65535 - FLB_INPUT_CHUNK_META_HEADER; - } - meta_size = FLB_INPUT_CHUNK_META_HEADER + tag_len; - - /* Allocate buffer for metadata header */ - meta = flb_calloc(1, meta_size); - if (!meta) { - flb_errno(); - return -1; - } - - /* - * Write chunk header in a temporary buffer - * ---------------------------------------- - */ - - /* magic bytes */ - meta[0] = FLB_INPUT_CHUNK_MAGIC_BYTE_0; - meta[1] = FLB_INPUT_CHUNK_MAGIC_BYTE_1; - - /* event type */ - if (event_type == FLB_INPUT_LOGS) { - meta[2] = FLB_INPUT_CHUNK_TYPE_LOGS; - } - else if (event_type == FLB_INPUT_METRICS) { - meta[2] = FLB_INPUT_CHUNK_TYPE_METRICS; - } - else if (event_type == FLB_INPUT_TRACES) { - meta[2] = FLB_INPUT_CHUNK_TYPE_TRACES; - } - - /* unused byte */ - meta[3] = 0; - - /* copy the tag after magic bytes */ - memcpy(meta + FLB_INPUT_CHUNK_META_HEADER, tag, tag_len); - - /* Write tag into metadata section */ - ret = cio_meta_write(chunk, (char *) meta, meta_size); - if (ret == -1) { - flb_error("[input chunk] could not write metadata"); - flb_free(meta); - return -1; - } - flb_free(meta); - - return 0; -} - -struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type, - const char *tag, int tag_len) -{ - int ret; - int err; - int set_down = FLB_FALSE; - int has_routes; - char name[64]; - struct cio_chunk *chunk; - struct flb_storage_input *storage; - struct flb_input_chunk *ic; - - storage = in->storage; - - /* chunk name */ - generate_chunk_name(in, name, sizeof(name) - 1); - - /* open/create target chunk file */ - chunk = cio_chunk_open(storage->cio, storage->stream, name, - CIO_OPEN, FLB_INPUT_CHUNK_SIZE, &err); - if (!chunk) { - flb_error("[input chunk] could not create chunk file: %s:%s", - storage->stream->name, name); - return NULL; - } - /* - * If the returned chunk at open is 'down', just put it up, write the - * content and set it down again. - */ - ret = cio_chunk_is_up(chunk); - if (ret == CIO_FALSE) { - ret = cio_chunk_up_force(chunk); - if (ret == -1) { - cio_chunk_close(chunk, CIO_TRUE); - return NULL; - } - set_down = FLB_TRUE; - } - - /* Write chunk header */ - ret = input_chunk_write_header(chunk, event_type, (char *) tag, tag_len); - if (ret == -1) { - cio_chunk_close(chunk, CIO_TRUE); - return NULL; - } - - /* Create context for the input instance */ - ic = flb_calloc(1, sizeof(struct flb_input_chunk)); - if (!ic) { - flb_errno(); - cio_chunk_close(chunk, CIO_TRUE); - return NULL; - } - - /* - * Check chunk content type to be created: depending of the value set by - * the input plugin, this can be FLB_INPUT_LOGS, FLB_INPUT_METRICS or - * FLB_INPUT_TRACES. - */ - ic->event_type = event_type; - ic->busy = FLB_FALSE; - ic->fs_counted = FLB_FALSE; - ic->chunk = chunk; - ic->fs_backlog = FLB_FALSE; - ic->in = in; - ic->stream_off = 0; - ic->task = NULL; -#ifdef FLB_HAVE_METRICS - ic->total_records = 0; -#endif - - /* Calculate the routes_mask for the input chunk */ - has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag, tag_len, in); - if (has_routes == 0) { - flb_trace("[input chunk] no matching route for input chunk '%s' with tag '%s'", - flb_input_chunk_get_name(ic), tag); - } - - msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write); - mk_list_add(&ic->_head, &in->chunks); - - if (set_down == FLB_TRUE) { - cio_chunk_down(chunk); - } - - if (event_type == FLB_INPUT_LOGS) { - flb_hash_table_add(in->ht_log_chunks, tag, tag_len, ic, 0); - } - else if (event_type == FLB_INPUT_METRICS) { - flb_hash_table_add(in->ht_metric_chunks, tag, tag_len, ic, 0); - } - else if (event_type == FLB_INPUT_TRACES) { - flb_hash_table_add(in->ht_trace_chunks, tag, tag_len, ic, 0); - } - - return ic; -} - -int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic, - const char *tag_buf, int tag_len, - int del) -{ - ssize_t bytes; - struct mk_list *head; - struct flb_output_instance *o_ins; - - mk_list_foreach(head, &ic->in->config->outputs) { - o_ins = mk_list_entry(head, struct flb_output_instance, _head); - - if (o_ins->total_limit_size == -1) { - continue; - } - - bytes = flb_input_chunk_get_real_size(ic); - if (bytes == -1) { - // no data in the chunk - continue; - } - - if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) { - if (ic->fs_counted == FLB_TRUE) { - FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes); - o_ins->fs_chunks_size -= bytes; - flb_debug("[input chunk] remove chunk %s with %ld bytes from plugin %s, " - "the updated fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic), - bytes, o_ins->name, o_ins->fs_chunks_size); - } - } - } - - if (del == CIO_TRUE && tag_buf) { - /* - * "TRY" to delete any reference to this chunk ('ic') from the hash - * table. Note that maybe the value is not longer available in the - * entries if it was replaced: note that we always keep the last - * chunk for a specific Tag. - */ - if (ic->event_type == FLB_INPUT_LOGS) { - flb_hash_table_del_ptr(ic->in->ht_log_chunks, - tag_buf, tag_len, (void *) ic); - } - else if (ic->event_type == FLB_INPUT_METRICS) { - flb_hash_table_del_ptr(ic->in->ht_metric_chunks, - tag_buf, tag_len, (void *) ic); - } - else if (ic->event_type == FLB_INPUT_TRACES) { - flb_hash_table_del_ptr(ic->in->ht_trace_chunks, - tag_buf, tag_len, (void *) ic); - } - } - -#ifdef FLB_HAVE_CHUNK_TRACE - if (ic->trace != NULL) { - flb_chunk_trace_destroy(ic->trace); - } -#endif /* FLB_HAVE_CHUNK_TRACE */ - - cio_chunk_close(ic->chunk, del); - mk_list_del(&ic->_head); - flb_free(ic); - - return 0; -} - - -int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del) -{ - int tag_len; - int ret; - ssize_t bytes; - const char *tag_buf = NULL; - struct mk_list *head; - struct flb_output_instance *o_ins; - - if (flb_input_chunk_is_up(ic) == FLB_FALSE) { - flb_input_chunk_set_up(ic); - } - - mk_list_foreach(head, &ic->in->config->outputs) { - o_ins = mk_list_entry(head, struct flb_output_instance, _head); - - if (o_ins->total_limit_size == -1) { - continue; - } - - bytes = flb_input_chunk_get_real_size(ic); - if (bytes == -1) { - // no data in the chunk - continue; - } - - if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) { - if (ic->fs_counted == FLB_TRUE) { - FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes); - o_ins->fs_chunks_size -= bytes; - flb_debug("[input chunk] remove chunk %s with %ld bytes from plugin %s, " - "the updated fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic), - bytes, o_ins->name, o_ins->fs_chunks_size); - } - } - } - - /* - * When a chunk is going to be destroyed, this can be in a down state, - * since the next step is to retrieve the Tag we need to have the - * content up. - */ - ret = flb_input_chunk_is_up(ic); - if (ret == FLB_FALSE) { - ret = cio_chunk_up_force(ic->chunk); - if (ret == -1) { - flb_error("[input chunk] cannot load chunk: %s", - flb_input_chunk_get_name(ic)); - } - } - - /* Retrieve Tag */ - ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len); - if (ret == -1) { - flb_trace("[input chunk] could not retrieve chunk tag: %s", - flb_input_chunk_get_name(ic)); - } - - if (del == CIO_TRUE && tag_buf) { - /* - * "TRY" to delete any reference to this chunk ('ic') from the hash - * table. Note that maybe the value is not longer available in the - * entries if it was replaced: note that we always keep the last - * chunk for a specific Tag. - */ - if (ic->event_type == FLB_INPUT_LOGS) { - flb_hash_table_del_ptr(ic->in->ht_log_chunks, - tag_buf, tag_len, (void *) ic); - } - else if (ic->event_type == FLB_INPUT_METRICS) { - flb_hash_table_del_ptr(ic->in->ht_metric_chunks, - tag_buf, tag_len, (void *) ic); - } - else if (ic->event_type == FLB_INPUT_TRACES) { - flb_hash_table_del_ptr(ic->in->ht_trace_chunks, - tag_buf, tag_len, (void *) ic); - } - } - -#ifdef FLB_HAVE_CHUNK_TRACE - if (ic->trace != NULL) { - flb_chunk_trace_destroy(ic->trace); - } -#endif /* FLB_HAVE_CHUNK_TRACE */ - - cio_chunk_close(ic->chunk, del); - mk_list_del(&ic->_head); - flb_free(ic); - - return 0; -} - -/* Return or create an available chunk to write data */ -static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, - int event_type, - const char *tag, int tag_len, - size_t chunk_size, int *set_down) -{ - int id = -1; - int ret; - int new_chunk = FLB_FALSE; - size_t out_size; - struct flb_input_chunk *ic = NULL; - - if (tag_len > FLB_INPUT_CHUNK_TAG_MAX) { - flb_plg_warn(in, - "Tag set exceeds limit, truncating from %i to %i bytes", - tag_len, FLB_INPUT_CHUNK_TAG_MAX); - tag_len = FLB_INPUT_CHUNK_TAG_MAX; - } - - if (event_type == FLB_INPUT_LOGS) { - id = flb_hash_table_get(in->ht_log_chunks, tag, tag_len, - (void *) &ic, &out_size); - } - else if (event_type == FLB_INPUT_METRICS) { - id = flb_hash_table_get(in->ht_metric_chunks, tag, tag_len, - (void *) &ic, &out_size); - } - else if (event_type == FLB_INPUT_TRACES) { - id = flb_hash_table_get(in->ht_trace_chunks, tag, tag_len, - (void *) &ic, &out_size); - } - - if (id >= 0) { - if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk)) { - ic = NULL; - } - else if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) { - ret = cio_chunk_up_force(ic->chunk); - - if (ret == CIO_CORRUPTED) { - if (in->config->storage_del_bad_chunks) { - /* If the chunk is corrupted we need to discard it and - * set ic to NULL so the system tries to allocate a new - * chunk. - */ - - flb_error("[input chunk] discarding corrupted chunk"); - } - - flb_input_chunk_destroy_corrupted(ic, - tag, tag_len, - in->config->storage_del_bad_chunks); - - ic = NULL; - } - else if (ret != CIO_OK) { - ic = NULL; - } - - *set_down = FLB_TRUE; - } - } - - /* No chunk was found, we need to create a new one */ - if (!ic) { - ic = flb_input_chunk_create(in, event_type, (char *) tag, tag_len); - new_chunk = FLB_TRUE; - if (!ic) { - return NULL; - } - ic->event_type = event_type; - } - - /* - * If buffering this block of data will exceed one of the limit among all output instances - * that the chunk will flush to, we need to modify the routes_mask of the oldest chunks - * (based in creation time) to get enough space for the incoming chunk. - */ - if (!flb_routes_mask_is_empty(ic->routes_mask) - && flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) { - /* - * If the chunk is not newly created, the chunk might already have logs inside. - * We cannot delete (reused) chunks here. - * If the routes_mask is cleared after trying to append new data, we destroy - * the chunk. - */ - if (new_chunk || flb_routes_mask_is_empty(ic->routes_mask) == FLB_TRUE) { - flb_input_chunk_destroy(ic, FLB_TRUE); - } - return NULL; - } - - return ic; -} - -static inline int flb_input_chunk_is_mem_overlimit(struct flb_input_instance *i) -{ - if (i->mem_buf_limit <= 0) { - return FLB_FALSE; - } - - if (i->mem_chunks_size >= i->mem_buf_limit) { - return FLB_TRUE; - } - - return FLB_FALSE; -} - -static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance *i) -{ - struct flb_storage_input *storage = (struct flb_storage_input *)i->storage; - - if (storage->type == FLB_STORAGE_FS) { - if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) { - if (storage->cio->total_chunks_up >= storage->cio->max_chunks_up) { - return FLB_TRUE; - } - } - } - - return FLB_FALSE; -} - -/* - * Check all chunks associated to the input instance and summarize - * the number of bytes in use. - */ -size_t flb_input_chunk_total_size(struct flb_input_instance *in) -{ - size_t total = 0; - struct flb_storage_input *storage; - - storage = (struct flb_storage_input *) in->storage; - total = cio_stream_size_chunks_up(storage->stream); - return total; -} - -/* - * Count and update the number of bytes being used by the instance. Also - * check if the instance is paused, if so, check if it can be resumed if - * is not longer over the limits. - * - * It always returns the number of bytes in use. - */ -size_t flb_input_chunk_set_limits(struct flb_input_instance *in) -{ - size_t total; - - /* Gather total number of enqueued bytes */ - total = flb_input_chunk_total_size(in); - - /* Register the total into the context variable */ - in->mem_chunks_size = total; - - /* - * After the adjustments, validate if the plugin is overlimit or paused - * and perform further adjustments. - */ - if (flb_input_chunk_is_mem_overlimit(in) == FLB_FALSE && - in->config->is_running == FLB_TRUE && - in->config->is_ingestion_active == FLB_TRUE && - in->mem_buf_status == FLB_INPUT_PAUSED) { - in->mem_buf_status = FLB_INPUT_RUNNING; - if (in->p->cb_resume) { - flb_input_resume(in); - flb_info("[input] %s resume (mem buf overlimit)", - in->name); - } - } - if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE && - in->config->is_running == FLB_TRUE && - in->config->is_ingestion_active == FLB_TRUE && - in->storage_buf_status == FLB_INPUT_PAUSED) { - in->storage_buf_status = FLB_INPUT_RUNNING; - if (in->p->cb_resume) { - flb_input_resume(in); - flb_info("[input] %s resume (storage buf overlimit %zu/%zu)", - in->name, - ((struct flb_storage_input *)in->storage)->cio->total_chunks_up, - ((struct flb_storage_input *)in->storage)->cio->max_chunks_up); - } - } - - return total; -} - -/* - * If the number of bytes in use by the chunks are over the imposed limit - * by configuration, pause the instance. - */ -static inline int flb_input_chunk_protect(struct flb_input_instance *i) -{ - struct flb_storage_input *storage = i->storage; - - if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) { - flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)", - i->name, - storage->cio->total_chunks_up, - storage->cio->max_chunks_up); - flb_input_pause(i); - i->storage_buf_status = FLB_INPUT_PAUSED; - return FLB_TRUE; - } - - if (storage->type == FLB_STORAGE_FS) { - return FLB_FALSE; - } - - if (flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) { - /* - * if the plugin is already overlimit and the strategy is based on - * a memory-ring-buffer logic, do not pause the plugin, upon next - * try of ingestion 'memrb' will make sure to release some bytes. - */ - if (i->storage_type == FLB_STORAGE_MEMRB) { - return FLB_FALSE; - } - - /* - * The plugin is using 'memory' buffering only and already reached - * it limit, just pause the ingestion. - */ - flb_warn("[input] %s paused (mem buf overlimit)", - i->name); - flb_input_pause(i); - i->mem_buf_status = FLB_INPUT_PAUSED; - return FLB_TRUE; - } - - return FLB_FALSE; -} - -/* - * Validate if the chunk coming from the input plugin based on config and - * resources usage must be 'up' or 'down' (applicable for filesystem storage - * type). - * - * FIXME: can we find a better name for this function ? - */ -int flb_input_chunk_set_up_down(struct flb_input_chunk *ic) -{ - size_t total; - struct flb_input_instance *in; - - in = ic->in; - - /* Gather total number of enqueued bytes */ - total = flb_input_chunk_total_size(in); - - /* Register the total into the context variable */ - in->mem_chunks_size = total; - - if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE) { - if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) { - cio_chunk_down(ic->chunk); - - /* Adjust new counters */ - total = flb_input_chunk_total_size(ic->in); - in->mem_chunks_size = total; - - return FLB_FALSE; - } - } - - return FLB_TRUE; -} - -int flb_input_chunk_is_up(struct flb_input_chunk *ic) -{ - return cio_chunk_is_up(ic->chunk); -} - -int flb_input_chunk_down(struct flb_input_chunk *ic) -{ - if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) { - return cio_chunk_down(ic->chunk); - } - - return 0; -} - -int flb_input_chunk_set_up(struct flb_input_chunk *ic) -{ - if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) { - return cio_chunk_up(ic->chunk); - } - - return 0; -} - -static int memrb_input_chunk_release_space(struct flb_input_instance *ins, - size_t required_space, - size_t *dropped_chunks, size_t *dropped_bytes) -{ - int ret; - int released; - size_t removed_chunks = 0; - ssize_t chunk_size; - ssize_t released_space = 0; - struct mk_list *tmp; - struct mk_list *head; - struct flb_input_chunk *ic; - - mk_list_foreach_safe(head, tmp, &ins->chunks) { - ic = mk_list_entry(head, struct flb_input_chunk, _head); - - /* check if is there any task or no users associated */ - ret = flb_input_chunk_is_task_safe_delete(ic->task); - if (ret == FLB_FALSE) { - continue; - } - - /* get chunk size */ - chunk_size = flb_input_chunk_get_real_size(ic); - - released = FLB_FALSE; - if (ic->task != NULL) { - if (ic->task->users == 0) { - flb_task_destroy(ic->task, FLB_TRUE); - released = FLB_TRUE; - } - } - else { - flb_input_chunk_destroy(ic, FLB_TRUE); - released = FLB_TRUE; - } - - if (released) { - released_space += chunk_size; - removed_chunks++; - } - - if (released_space >= required_space) { - break; - } - } - - /* no matter if we succeeded or not, set the counters */ - *dropped_bytes = released_space; - *dropped_chunks = removed_chunks; - - /* set the final status of the operation */ - if (released_space >= required_space) { - return 0; - } - - return -1; -} - -/* Append a RAW MessagPack buffer to the input instance */ -static int input_chunk_append_raw(struct flb_input_instance *in, - int event_type, - size_t n_records, - const char *tag, size_t tag_len, - const void *buf, size_t buf_size) -{ - int ret; - int set_down = FLB_FALSE; - int min; - int new_chunk = FLB_FALSE; - uint64_t ts; - char *name; - size_t dropped_chunks; - size_t dropped_bytes; - size_t content_size; - size_t real_diff; - size_t real_size; - size_t pre_real_size; - struct flb_input_chunk *ic; - struct flb_storage_input *si; - - /* memory ring-buffer checker */ - if (in->storage_type == FLB_STORAGE_MEMRB) { - /* check if we are overlimit */ - ret = flb_input_chunk_is_mem_overlimit(in); - if (ret) { - /* reset counters */ - dropped_chunks = 0; - dropped_bytes = 0; - - /* try to release 'buf_size' */ - ret = memrb_input_chunk_release_space(in, buf_size, - &dropped_chunks, &dropped_bytes); - - /* update metrics if required */ - if (dropped_chunks > 0 || dropped_bytes > 0) { - /* timestamp and input plugin name for label */ - ts = cfl_time_now(); - name = (char *) flb_input_name(in); - - /* update counters */ - cmt_counter_add(in->cmt_memrb_dropped_chunks, ts, - dropped_chunks, 1, (char *[]) {name}); - - cmt_counter_add(in->cmt_memrb_dropped_bytes, ts, - dropped_bytes, 1, (char *[]) {name}); - } - - if (ret != 0) { - /* we could not allocate the required space, just return */ - return -1; - } - } - } - - /* Check if the input plugin has been paused */ - if (flb_input_buf_paused(in) == FLB_TRUE) { - flb_debug("[input chunk] %s is paused, cannot append records", - in->name); - return -1; - } - - if (buf_size == 0) { - flb_debug("[input chunk] skip ingesting data with 0 bytes"); - return -1; - } - - /* - * Some callers might not set a custom tag, on that case just inherit - * the fixed instance tag or instance name. - */ - if (!tag) { - if (in->tag && in->tag_len > 0) { - tag = in->tag; - tag_len = in->tag_len; - } - else { - tag = in->name; - tag_len = strlen(in->name); - } - } - - /* - * Get a target input chunk, can be one with remaining space available - * or a new one. - */ - ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down); - if (!ic) { - flb_error("[input chunk] no available chunk"); - return -1; - } - - /* newly created chunk */ - if (flb_input_chunk_get_size(ic) == 0) { - new_chunk = FLB_TRUE; - } - - /* We got the chunk, validate if is 'up' or 'down' */ - ret = flb_input_chunk_is_up(ic); - if (ret == FLB_FALSE) { - ret = cio_chunk_up_force(ic->chunk); - if (ret == -1) { - flb_error("[input chunk] cannot retrieve temporary chunk"); - return -1; - } - set_down = FLB_TRUE; - } - - /* - * Keep the previous real size to calculate the real size - * difference for flb_input_chunk_update_output_instances(), - * use 0 when the chunk is new since it's size will never - * have been calculated before. - */ - if (new_chunk == FLB_TRUE) { - pre_real_size = 0; - } - else { - pre_real_size = flb_input_chunk_get_real_size(ic); - } - - /* Write the new data */ - ret = flb_input_chunk_write(ic, buf, buf_size); - if (ret == -1) { - flb_error("[input chunk] error writing data from %s instance", - in->name); - cio_chunk_tx_rollback(ic->chunk); - return -1; - } - -#ifdef FLB_HAVE_CHUNK_TRACE - flb_chunk_trace_do_input(ic); -#endif /* FLB_HAVE_CHUNK_TRACE */ - - /* Update 'input' metrics */ -#ifdef FLB_HAVE_METRICS - if (ret == CIO_OK) { - ic->added_records = n_records; - ic->total_records += n_records; - } - - if (ic->total_records > 0) { - /* timestamp */ - ts = cfl_time_now(); - - /* fluentbit_input_records_total */ - cmt_counter_add(in->cmt_records, ts, ic->added_records, - 1, (char *[]) {(char *) flb_input_name(in)}); - - /* fluentbit_input_bytes_total */ - cmt_counter_add(in->cmt_bytes, ts, buf_size, - 1, (char *[]) {(char *) flb_input_name(in)}); - - /* OLD api */ - flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics); - flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics); - } -#endif - - /* Apply filters */ - if (event_type == FLB_INPUT_LOGS) { - flb_filter_do(ic, - buf, buf_size, - tag, tag_len, in->config); - } - - /* get the chunks content size */ - content_size = cio_chunk_get_content_size(ic->chunk); - - /* - * There is a case that rewrite_tag will modify the tag and keep rule is set - * to drop the original record. The original record will still go through the - * flb_input_chunk_update_output_instances(2) to update the fs_chunks_size by - * metadata bytes (consisted by metadata bytes of the file chunk). This condition - * sets the diff to 0 in order to not update the fs_chunks_size. - */ - if (flb_input_chunk_get_size(ic) == 0) { - real_diff = 0; - } - - /* Lock buffers where size > 2MB */ - if (content_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { - cio_chunk_lock(ic->chunk); - } - - /* Make sure the data was not filtered out and the buffer size is zero */ - if (content_size == 0) { - flb_input_chunk_destroy(ic, FLB_TRUE); - flb_input_chunk_set_limits(in); - return 0; - } -#ifdef FLB_HAVE_STREAM_PROCESSOR - else if (in->config->stream_processor_ctx && - ic->event_type == FLB_INPUT_LOGS) { - char *c_data; - size_t c_size; - - /* Retrieve chunk (filtered) output content */ - cio_chunk_get_content(ic->chunk, &c_data, &c_size); - - /* Invoke stream processor */ - flb_sp_do(in->config->stream_processor_ctx, - in, - tag, tag_len, - c_data + ic->stream_off, c_size - ic->stream_off); - ic->stream_off += (c_size - ic->stream_off); - } -#endif - - if (set_down == FLB_TRUE) { - cio_chunk_down(ic->chunk); - } - - /* - * If the instance is not routable, there is no need to keep the - * content in the storage engine, just get rid of it. - */ - if (in->routable == FLB_FALSE) { - flb_input_chunk_destroy(ic, FLB_TRUE); - return 0; - } - - /* Update memory counters and adjust limits if any */ - flb_input_chunk_set_limits(in); - - /* - * Check if we are overlimit and validate if is there any filesystem - * storage type asociated to this input instance, if so, unload the - * chunk content from memory to respect imposed limits. - * - * Calling cio_chunk_down() the memory map associated and the file - * descriptor will be released. At any later time, it must be bring up - * for I/O operations. - */ - si = (struct flb_storage_input *) in->storage; - if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE && - si->type == FLB_STORAGE_FS) { - if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) { - /* - * If we are already over limit, a sub-sequent data ingestion - * might need a Chunk to write data in. As an optimization we - * will put this Chunk down ONLY IF it has less than 1% of - * it capacity as available space, otherwise keep it 'up' so - * it available space can be used. - */ - content_size = cio_chunk_get_content_size(ic->chunk); - - /* Do we have less than 1% available ? */ - min = (FLB_INPUT_CHUNK_FS_MAX_SIZE * 0.01); - if (FLB_INPUT_CHUNK_FS_MAX_SIZE - content_size < min) { - cio_chunk_down(ic->chunk); - } - } - } - - real_size = flb_input_chunk_get_real_size(ic); - real_diff = real_size - pre_real_size; - if (real_diff != 0) { - flb_debug("[input chunk] update output instances with new chunk size diff=%zd, records=%zu, input=%s", - real_diff, n_records, flb_input_name(in)); - flb_input_chunk_update_output_instances(ic, real_diff); - } - -#ifdef FLB_HAVE_CHUNK_TRACE - if (ic->trace) { - flb_chunk_trace_pre_output(ic->trace); - } -#endif /* FLB_HAVE_CHUNK_TRACE */ - - flb_input_chunk_protect(in); - return 0; -} - -static void destroy_chunk_raw(struct input_chunk_raw *cr) -{ - if (cr->buf_data) { - flb_free(cr->buf_data); - } - - if (cr->tag) { - flb_sds_destroy(cr->tag); - } - - flb_free(cr); -} - -static int append_to_ring_buffer(struct flb_input_instance *ins, - int event_type, - size_t records, - const char *tag, - size_t tag_len, - const void *buf, - size_t buf_size) - -{ - int ret; - int retries = 0; - int retry_limit = 10; - struct input_chunk_raw *cr; - - cr = flb_calloc(1, sizeof(struct input_chunk_raw)); - if (!cr) { - flb_errno(); - return -1; - } - cr->ins = ins; - cr->event_type = event_type; - - if (tag && tag_len > 0) { - cr->tag = flb_sds_create_len(tag, tag_len); - if (!cr->tag) { - flb_free(cr); - return -1; - } - } - else { - cr->tag = NULL; - } - - cr->records = records; - cr->buf_data = flb_malloc(buf_size); - if (!cr->buf_data) { - flb_errno(); - destroy_chunk_raw(cr); - return -1; - } - - /* - * this memory copy is just a simple overhead, the problem we have is that - * input instances always assume that they have to release their buffer since - * the append raw operation already did a copy. Not a big issue but maybe this - * is a tradeoff... - */ - memcpy(cr->buf_data, buf, buf_size); - cr->buf_size = buf_size; - - - -retry: - /* - * There is a little chance that the ring buffer is full or due to saturation - * from the main thread the data is not being consumed. On this scenario we - * retry up to 'retry_limit' times with a little wait time. - */ - if (retries >= retry_limit) { - flb_plg_error(ins, "could not enqueue records into the ring buffer"); - destroy_chunk_raw(cr); - return -1; - } - - /* append chunk raw context to the ring buffer */ - ret = flb_ring_buffer_write(ins->rb, (void *) &cr, sizeof(cr)); - if (ret == -1) { - flb_plg_debug(ins, "failed buffer write, retries=%i\n", - retries); - - /* sleep for 100000 microseconds (100 milliseconds) */ - usleep(100000); - retries++; - goto retry; - } - - return 0; -} - -/* iterate input instance ring buffer and remove any enqueued input_chunk_raw */ -void flb_input_chunk_ring_buffer_cleanup(struct flb_input_instance *ins) -{ - int ret; - struct input_chunk_raw *cr; - - if (!ins->rb) { - return; - } - - while ((ret = flb_ring_buffer_read(ins->rb, (void *) &cr, sizeof(cr))) == 0) { - if (cr) { - destroy_chunk_raw(cr); - cr = NULL; - } - } -} - -void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data) -{ - int ret; - int tag_len = 0; - struct mk_list *head; - struct flb_input_instance *ins; - struct input_chunk_raw *cr; - - mk_list_foreach(head, &ctx->inputs) { - ins = mk_list_entry(head, struct flb_input_instance, _head); - cr = NULL; - - while (1) { - if (flb_input_buf_paused(ins) == FLB_TRUE) { - break; - } - - ret = flb_ring_buffer_read(ins->rb, - (void *) &cr, - sizeof(cr)); - if (ret != 0) { - break; - } - - if (cr) { - if (cr->tag) { - tag_len = flb_sds_len(cr->tag); - } - else { - tag_len = 0; - } - - input_chunk_append_raw(cr->ins, cr->event_type, cr->records, - cr->tag, tag_len, - cr->buf_data, cr->buf_size); - destroy_chunk_raw(cr); - } - cr = NULL; - } - - ins->rb->flush_pending = FLB_FALSE; - } -} - -int flb_input_chunk_append_raw(struct flb_input_instance *in, - int event_type, - size_t records, - const char *tag, size_t tag_len, - const void *buf, size_t buf_size) -{ - int ret; - - /* - * If the plugin instance registering the data runs in a separate thread, we must - * add the data reference to the ring buffer. - */ - if (flb_input_is_threaded(in)) { - ret = append_to_ring_buffer(in, event_type, records, - tag, tag_len, - buf, buf_size); - } - else { - ret = input_chunk_append_raw(in, event_type, records, - tag, tag_len, buf, buf_size); - } - - return ret; -} - -/* Retrieve a raw buffer from a dyntag node */ -const void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size) -{ - int ret; - size_t pre_size; - size_t post_size; - ssize_t diff_size; - char *buf = NULL; - - pre_size = flb_input_chunk_get_real_size(ic); - - if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) { - ret = cio_chunk_up(ic->chunk); - if (ret == -1) { - return NULL; - } - } - - /* Lock the internal chunk - * - * This operation has to be performed before getting the chunk data - * pointer because in certain situations it could cause the chunk - * mapping to be relocated (ie. macos / windows on trim) - */ - cio_chunk_lock(ic->chunk); - - /* - * msgpack-c internal use a raw buffer for it operations, since we - * already appended data we just can take out the references to avoid - * a new memory allocation and skip a copy operation. - */ - ret = cio_chunk_get_content(ic->chunk, &buf, size); - - if (ret == -1) { - flb_error("[input chunk] error retrieving chunk content"); - return NULL; - } - - if (!buf) { - *size = 0; - return NULL; - } - - /* Set it busy as it likely it's a reference for an outgoing task */ - ic->busy = FLB_TRUE; - - post_size = flb_input_chunk_get_real_size(ic); - if (post_size != pre_size) { - diff_size = post_size - pre_size; - flb_input_chunk_update_output_instances(ic, diff_size); - } - return buf; -} - -int flb_input_chunk_release_lock(struct flb_input_chunk *ic) -{ - if (ic->busy == FLB_FALSE) { - return -1; - } - - ic->busy = FLB_FALSE; - return 0; -} - -flb_sds_t flb_input_chunk_get_name(struct flb_input_chunk *ic) -{ - struct cio_chunk *ch; - - ch = (struct cio_chunk *) ic->chunk; - return ch->name; -} - -static inline int input_chunk_has_magic_bytes(char *buf, int len) -{ - unsigned char *p; - - if (len < FLB_INPUT_CHUNK_META_HEADER) { - return FLB_FALSE; - } - - p = (unsigned char *) buf; - if (p[0] == FLB_INPUT_CHUNK_MAGIC_BYTE_0 && - p[1] == FLB_INPUT_CHUNK_MAGIC_BYTE_1 && p[3] == 0) { - return FLB_TRUE; - } - - return FLB_FALSE; -} - -/* - * Get the event type by retrieving metadata header. NOTE: this function only event type discovery by looking at the - * headers bytes of a chunk that exists on disk. - */ -int flb_input_chunk_get_event_type(struct flb_input_chunk *ic) -{ - int len; - int ret; - int type = -1; - char *buf = NULL; - - ret = cio_meta_read(ic->chunk, &buf, &len); - if (ret == -1) { - return -1; - } - - /* Check metadata header / magic bytes */ - if (input_chunk_has_magic_bytes(buf, len)) { - if (buf[2] == FLB_INPUT_CHUNK_TYPE_LOGS) { - type = FLB_INPUT_LOGS; - } - else if (buf[2] == FLB_INPUT_CHUNK_TYPE_METRICS) { - type = FLB_INPUT_METRICS; - } - else if (buf[2] == FLB_INPUT_CHUNK_TYPE_TRACES) { - type = FLB_INPUT_TRACES; - } - } - else { - type = FLB_INPUT_LOGS; - } - - - return type; -} - -int flb_input_chunk_get_tag(struct flb_input_chunk *ic, - const char **tag_buf, int *tag_len) -{ - int len; - int ret; - char *buf; - - ret = cio_meta_read(ic->chunk, &buf, &len); - if (ret == -1) { - *tag_len = -1; - *tag_buf = NULL; - return -1; - } - - /* If magic bytes exists, just set the offset */ - if (input_chunk_has_magic_bytes(buf, len)) { - *tag_len = len - FLB_INPUT_CHUNK_META_HEADER; - *tag_buf = buf + FLB_INPUT_CHUNK_META_HEADER; - } - else { - /* Old Chunk version without magic bytes */ - *tag_len = len; - *tag_buf = buf; - } - - return ret; -} - -/* - * Iterates all output instances that the chunk will be flushing to and summarize - * the total number of bytes in use after ingesting the new data. - */ -void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic, - size_t chunk_size) -{ - struct mk_list *head; - struct flb_output_instance *o_ins; - - /* for each output plugin, we update the fs_chunks_size */ - mk_list_foreach(head, &ic->in->config->outputs) { - o_ins = mk_list_entry(head, struct flb_output_instance, _head); - if (o_ins->total_limit_size == -1) { - continue; - } - - if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) { - /* - * if there is match on any index of 1's in the binary, it indicates - * that the input chunk will flush to this output instance - */ - FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, chunk_size); - o_ins->fs_chunks_size += chunk_size; - ic->fs_counted = FLB_TRUE; - - flb_debug("[input chunk] chunk %s update plugin %s fs_chunks_size by %ld bytes, " - "the current fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic), - o_ins->name, chunk_size, o_ins->fs_chunks_size); - } - } -} |