diff options
Diffstat (limited to 'fluent-bit/src/flb_input_chunk.c')
-rw-r--r-- | fluent-bit/src/flb_input_chunk.c | 2009 |
1 files changed, 2009 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_input_chunk.c b/fluent-bit/src/flb_input_chunk.c new file mode 100644 index 000000000..c71ae3ef0 --- /dev/null +++ b/fluent-bit/src/flb_input_chunk.c @@ -0,0 +1,2009 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define FS_CHUNK_SIZE_DEBUG(op) {flb_trace("[%d] %s -> fs_chunks_size = %zu", \ + __LINE__, op->name, op->fs_chunks_size);} +#define FS_CHUNK_SIZE_DEBUG_MOD(op, chunk, mod) {flb_trace( \ + "[%d] %s -> fs_chunks_size = %zu mod=%zd chunk=%s", __LINE__, \ + op->name, op->fs_chunks_size, mod, flb_input_chunk_get_name(chunk));} + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_input_chunk.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_storage.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_router.h> +#include <fluent-bit/flb_task.h> +#include <fluent-bit/flb_routes_mask.h> +#include <fluent-bit/flb_metrics.h> +#include <fluent-bit/stream_processor/flb_sp.h> +#include <fluent-bit/flb_ring_buffer.h> +#include <chunkio/chunkio.h> +#include <monkey/mk_core.h> + + +#ifdef FLB_HAVE_CHUNK_TRACE +#include <fluent-bit/flb_chunk_trace.h> +#endif /* FLB_HAVE_CHUNK_TRACE */ + + +#define BLOCK_UNTIL_KEYPRESS() {char temp_keypress_buffer; read(0, &temp_keypress_buffer, 1);} + +#define FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL 0 +#define FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL 1 + +struct input_chunk_raw { + struct flb_input_instance *ins; + int event_type; + size_t records; + flb_sds_t tag; + void *buf_data; + size_t buf_size; +}; + +#ifdef FLB_HAVE_IN_STORAGE_BACKLOG + +extern ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin, + size_t required_space); + +extern int sb_release_output_queue_space(struct flb_output_instance *output_plugin, + ssize_t *required_space); + + +#else + +ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin, + size_t required_space) +{ + return 0; +} + +int sb_release_output_queue_space(struct flb_output_instance *output_plugin, + ssize_t *required_space) +{ + return 0; +} + +#endif + +static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic, + struct flb_input_chunk *old_ic, + uint64_t o_id); + +static int flb_input_chunk_is_task_safe_delete(struct flb_task *task); + +static int flb_input_chunk_drop_task_route( + struct flb_task *task, + struct flb_output_instance *o_ins); + +static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic); + +static int flb_input_chunk_release_space( + struct flb_input_chunk *new_input_chunk, + struct flb_input_instance *input_plugin, + struct flb_output_instance *output_plugin, + ssize_t *required_space, + int release_scope) +{ + struct mk_list *input_chunk_iterator_tmp; + struct mk_list *input_chunk_iterator; + int chunk_destroy_flag; + struct flb_input_chunk *old_input_chunk; + ssize_t released_space; + int chunk_released; + ssize_t chunk_size; + + released_space = 0; + + mk_list_foreach_safe(input_chunk_iterator, input_chunk_iterator_tmp, + &input_plugin->chunks) { + old_input_chunk = mk_list_entry(input_chunk_iterator, + struct flb_input_chunk, _head); + + if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask, + output_plugin->id)) { + continue; + } + + if (flb_input_chunk_safe_delete(new_input_chunk, + old_input_chunk, + output_plugin->id) == FLB_FALSE) { + continue; + } + + if (flb_input_chunk_drop_task_route(old_input_chunk->task, + output_plugin) == FLB_FALSE) { + continue; + } + + chunk_size = flb_input_chunk_get_real_size(old_input_chunk); + chunk_released = FLB_FALSE; + chunk_destroy_flag = FLB_FALSE; + + if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) { + flb_routes_mask_clear_bit(old_input_chunk->routes_mask, + output_plugin->id); + + FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size); + output_plugin->fs_chunks_size -= chunk_size; + + chunk_destroy_flag = flb_routes_mask_is_empty( + old_input_chunk->routes_mask); + + chunk_released = FLB_TRUE; + } + else if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) { + chunk_destroy_flag = FLB_TRUE; + } + + if (chunk_destroy_flag) { + if (old_input_chunk->task != NULL) { + /* + * If the chunk is referenced by a task and task has no active route, + * we need to destroy the task as well. + */ + if (old_input_chunk->task->users == 0) { + flb_debug("[task] drop task_id %d with no active route from input plugin %s", + old_input_chunk->task->id, new_input_chunk->in->name); + flb_task_destroy(old_input_chunk->task, FLB_TRUE); + + chunk_released = FLB_TRUE; + } + } + else { + flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s", + flb_input_chunk_get_name(old_input_chunk), new_input_chunk->in->name); + + flb_input_chunk_destroy(old_input_chunk, FLB_TRUE); + + chunk_released = FLB_TRUE; + } + } + + if (chunk_released) { + released_space += chunk_size; + } + + if (released_space >= *required_space) { + break; + } + } + + *required_space -= released_space; + + return 0; +} + +static void generate_chunk_name(struct flb_input_instance *in, + char *out_buf, int buf_size) +{ + struct flb_time tm; + (void) in; + + flb_time_get(&tm); + snprintf(out_buf, buf_size - 1, + "%i-%lu.%4lu.flb", + getpid(), + tm.tm.tv_sec, tm.tm.tv_nsec); +} + +ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic) +{ + return cio_chunk_get_content_size(ic->chunk); +} + +/* + * When chunk is set to DOWN from memory, data_size is set to 0 and + * cio_chunk_get_content_size(1) returns the data_size. fs_chunks_size + * is used to track the size of chunks in filesystem so we need to call + * cio_chunk_get_real_size to return the original size in the file system + */ +static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic) +{ + ssize_t meta_size; + ssize_t size; + + size = cio_chunk_get_real_size(ic->chunk); + + if (size != 0) { + return size; + } + + // Real size is not synced to chunk yet + size = flb_input_chunk_get_size(ic); + if (size == 0) { + flb_debug("[input chunk] no data in the chunk %s", + flb_input_chunk_get_name(ic)); + return -1; + } + + meta_size = cio_meta_size(ic->chunk); + size += meta_size + /* See https://github.com/edsiper/chunkio#file-layout for more details */ + + 2 /* HEADER BYTES */ + + 4 /* CRC32 */ + + 16 /* PADDING */ + + 2; /* METADATA LENGTH BYTES */ + + return size; +} + +int flb_input_chunk_write(void *data, const char *buf, size_t len) +{ + int ret; + struct flb_input_chunk *ic; + + ic = (struct flb_input_chunk *) data; + + ret = cio_chunk_write(ic->chunk, buf, len); + return ret; +} + +int flb_input_chunk_write_at(void *data, off_t offset, + const char *buf, size_t len) +{ + int ret; + struct flb_input_chunk *ic; + + ic = (struct flb_input_chunk *) data; + + ret = cio_chunk_write_at(ic->chunk, offset, buf, len); + return ret; +} + +static int flb_input_chunk_drop_task_route( + struct flb_task *task, + struct flb_output_instance *output_plugin) +{ + int route_status; + int result; + + if (task == NULL) { + return FLB_TRUE; + } + + result = FLB_TRUE; + + if (task->users != 0) { + result = FLB_FALSE; + + if (output_plugin != NULL) { + flb_task_acquire_lock(task); + + route_status = flb_task_get_route_status(task, output_plugin); + + if (route_status == FLB_TASK_ROUTE_INACTIVE) { + flb_task_set_route_status(task, + output_plugin, + FLB_TASK_ROUTE_DROPPED); + + result = FLB_TRUE; + } + + flb_task_release_lock(task); + } + } + + return result; +} + + +/* + * For input_chunk referenced by an outgoing task, we need to check + * whether the chunk is in the middle of output flush callback + */ +static int flb_input_chunk_is_task_safe_delete(struct flb_task *task) +{ + if (!task) { + return FLB_TRUE; + } + + if (task->users != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic, + struct flb_input_chunk *old_ic, + uint64_t o_id) +{ + /* The chunk we want to drop should not be the incoming chunk */ + if (ic == old_ic) { + return FLB_FALSE; + } + + /* + * Even if chunks from same input plugin have same routes_mask when created, + * the routes_mask could be modified when new chunks is ingested. Therefore, + * we still need to do the validation on the routes_mask with o_id. + */ + if (flb_routes_mask_get_bit(old_ic->routes_mask, o_id) == 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +int flb_input_chunk_release_space_compound( + struct flb_input_chunk *new_input_chunk, + struct flb_output_instance *output_plugin, + size_t *local_release_requirement, + int release_local_space) +{ + ssize_t required_space_remainder; + struct flb_input_instance *storage_backlog_instance; + struct flb_input_instance *input_plugin_instance; + struct mk_list *iterator; + int result; + + storage_backlog_instance = output_plugin->config->storage_input_plugin; + + *local_release_requirement = flb_input_chunk_get_real_size(new_input_chunk); + required_space_remainder = (ssize_t) *local_release_requirement; + + if (required_space_remainder > 0) { + result = flb_input_chunk_release_space(new_input_chunk, + storage_backlog_instance, + output_plugin, + &required_space_remainder, + FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL); + } + + if (required_space_remainder > 0) { + result = sb_release_output_queue_space(output_plugin, + &required_space_remainder); + } + + if (release_local_space) { + if (required_space_remainder > 0) { + result = flb_input_chunk_release_space(new_input_chunk, + new_input_chunk->in, + output_plugin, + &required_space_remainder, + FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL); + } + } + + if (required_space_remainder > 0) { + mk_list_foreach(iterator, &output_plugin->config->inputs) { + input_plugin_instance = \ + mk_list_entry(iterator, struct flb_input_instance, _head); + + if (input_plugin_instance != new_input_chunk->in) { + result = flb_input_chunk_release_space( + new_input_chunk, + input_plugin_instance, + output_plugin, + &required_space_remainder, + FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL); + } + + if (required_space_remainder <= 0) { + break; + } + } + } + + if (required_space_remainder < 0) { + required_space_remainder = 0; + } + + *local_release_requirement = (size_t) required_space_remainder; + + (void) result; + + return 0; +} + +/* + * Find a slot in the output instance to append the new data with size chunk_size, it + * will drop the the oldest chunks when the limitation on local disk is reached. + */ +int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic, + size_t chunk_size, int overlimit) +{ + int count; + int result; + struct mk_list *head; + struct flb_output_instance *o_ins; + size_t local_release_requirement; + + /* + * For each output instances that will be over the limit after adding the new chunk, + * we have to determine how many chunks needs to be removed. We will adjust the + * routes_mask to only route to the output plugin that have enough space after + * deleting some chunks fome the queue. + */ + count = 0; + + mk_list_foreach(head, &ic->in->config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + + if ((o_ins->total_limit_size == -1) || ((1 << o_ins->id) & overlimit) == 0 || + (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) { + continue; + } + + local_release_requirement = 0; + + result = flb_input_chunk_release_space_compound( + ic, o_ins, + &local_release_requirement, + FLB_TRUE); + + if (result != 0 || + local_release_requirement != 0) { + count++; + } + } + + if (count != 0) { + flb_error("[input chunk] fail to drop enough chunks in order to place new data"); + exit(0); + } + + return 0; +} + +/* + * Returns a non-zero result if any output instances will reach the limit + * after buffering the new data + */ +int flb_input_chunk_has_overlimit_routes(struct flb_input_chunk *ic, + size_t chunk_size) +{ + int overlimit = 0; + struct mk_list *head; + struct flb_output_instance *o_ins; + + mk_list_foreach(head, &ic->in->config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + + if ((o_ins->total_limit_size == -1) || + (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) { + continue; + } + + FS_CHUNK_SIZE_DEBUG(o_ins); + flb_debug("[input chunk] chunk %s required %ld bytes and %ld bytes left " + "in plugin %s", flb_input_chunk_get_name(ic), chunk_size, + o_ins->total_limit_size - + o_ins->fs_backlog_chunks_size - + o_ins->fs_chunks_size, + o_ins->name); + + if ((o_ins->fs_chunks_size + + o_ins->fs_backlog_chunks_size + + chunk_size) > o_ins->total_limit_size) { + overlimit |= (1 << o_ins->id); + } + } + + return overlimit; +} + +/* Find a slot for the incoming data to buffer it in local file system + * returns 0 if none of the routes can be written to + */ +int flb_input_chunk_place_new_chunk(struct flb_input_chunk *ic, size_t chunk_size) +{ + int overlimit; + overlimit = flb_input_chunk_has_overlimit_routes(ic, chunk_size); + if (overlimit != 0) { + flb_input_chunk_find_space_new_data(ic, chunk_size, overlimit); + } + + return !flb_routes_mask_is_empty(ic->routes_mask); +} + +/* Create an input chunk using a Chunk I/O */ +struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, + int event_type, + void *chunk) +{ + int records = 0; + int tag_len; + int has_routes; + int ret; + uint64_t ts; + char *buf_data; + size_t buf_size; + size_t offset; + ssize_t bytes; + const char *tag_buf; + struct flb_input_chunk *ic; + + /* Create context for the input instance */ + ic = flb_calloc(1, sizeof(struct flb_input_chunk)); + if (!ic) { + flb_errno(); + return NULL; + } + ic->event_type = event_type; + ic->busy = FLB_FALSE; + ic->fs_counted = FLB_FALSE; + ic->fs_backlog = FLB_TRUE; + ic->chunk = chunk; + ic->in = in; + msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write); + + ret = cio_chunk_get_content(ic->chunk, &buf_data, &buf_size); + if (ret != CIO_OK) { + flb_error("[input chunk] error retrieving content for metrics"); + flb_free(ic); + return NULL; + } + + if (ic->event_type == FLB_INPUT_LOGS) { + /* Validate records in the chunk */ + ret = flb_mp_validate_log_chunk(buf_data, buf_size, &records, &offset); + if (ret == -1) { + /* If there are valid records, truncate the chunk size */ + if (records <= 0) { + flb_plg_error(in, + "chunk validation failed, data might be corrupted. " + "No valid records found, the chunk will be discarded."); + flb_free(ic); + return NULL; + } + if (records > 0 && offset > 32) { + flb_plg_warn(in, + "chunk validation failed, data might be corrupted. " + "Found %d valid records, failed content starts " + "right after byte %lu. Recovering valid records.", + records, offset); + + /* truncate the chunk to recover valid records */ + cio_chunk_write_at(chunk, offset, NULL, 0); + } + else { + flb_plg_error(in, + "chunk validation failed, data might be corrupted. " + "Found %d valid records, failed content starts " + "right after byte %lu. Cannot recover chunk,", + records, offset); + flb_free(ic); + return NULL; + } + } + } + else if (ic->event_type == FLB_INPUT_METRICS) { + ret = flb_mp_validate_metric_chunk(buf_data, buf_size, &records, &offset); + if (ret == -1) { + if (records <= 0) { + flb_plg_error(in, + "metrics chunk validation failed, data might be corrupted. " + "No valid records found, the chunk will be discarded."); + flb_free(ic); + return NULL; + } + if (records > 0 && offset > 32) { + flb_plg_warn(in, + "metrics chunk validation failed, data might be corrupted. " + "Found %d valid records, failed content starts " + "right after byte %lu. Recovering valid records.", + records, offset); + + /* truncate the chunk to recover valid records */ + cio_chunk_write_at(chunk, offset, NULL, 0); + } + else { + flb_plg_error(in, + "metrics chunk validation failed, data might be corrupted. " + "Found %d valid records, failed content starts " + "right after byte %lu. Cannot recover chunk,", + records, offset); + flb_free(ic); + return NULL; + } + + } + } + else if (ic->event_type == FLB_INPUT_TRACES) { + + } + + /* Skip chunks without content data */ + if (records == 0) { + flb_plg_error(in, + "chunk validation failed, data might be corrupted. " + "No valid records found, the chunk will be discarded."); + flb_free(ic); + return NULL; + } + + /* + * If the content is valid and the chunk has extra padding zeros, just + * perform an adjustment. + */ + bytes = cio_chunk_get_content_size(chunk); + if (bytes == -1) { + flb_free(ic); + return NULL; + } + if (offset < bytes) { + cio_chunk_write_at(chunk, offset, NULL, 0); + } + + /* Update metrics */ +#ifdef FLB_HAVE_METRICS + ic->total_records = records; + if (ic->total_records > 0) { + /* timestamp */ + ts = cfl_time_now(); + + /* fluentbit_input_records_total */ + cmt_counter_add(in->cmt_records, ts, ic->total_records, + 1, (char *[]) {(char *) flb_input_name(in)}); + + /* fluentbit_input_bytes_total */ + cmt_counter_add(in->cmt_bytes, ts, buf_size, + 1, (char *[]) {(char *) flb_input_name(in)}); + + /* OLD metrics */ + flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->total_records, in->metrics); + flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics); + } +#endif + + /* Get the the tag reference (chunk metadata) */ + ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len); + if (ret == -1) { + flb_error("[input chunk] error retrieving tag of input chunk"); + flb_free(ic); + return NULL; + } + + bytes = flb_input_chunk_get_real_size(ic); + if (bytes < 0) { + flb_warn("[input chunk] could not retrieve chunk real size"); + flb_free(ic); + return NULL; + } + + has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag_buf, tag_len, in); + if (has_routes == 0) { + flb_warn("[input chunk] no matching route for backoff log chunk %s", + flb_input_chunk_get_name(ic)); + } + + mk_list_add(&ic->_head, &in->chunks); + + flb_input_chunk_update_output_instances(ic, bytes); + + return ic; +} + +static int input_chunk_write_header(struct cio_chunk *chunk, int event_type, + char *tag, int tag_len) + +{ + int ret; + int meta_size; + char *meta; + + /* + * Prepare the Chunk metadata header + * ---------------------------------- + * m[0] = FLB_INPUT_CHUNK_MAGIC_BYTE_0 + * m[1] = FLB_INPUT_CHUNK_MAGIC_BYTE_1 + * m[2] = type (FLB_INPUT_CHUNK_TYPE_LOG or FLB_INPUT_CHUNK_TYPE_METRIC or FLB_INPUT_CHUNK_TYPE_TRACE + * m[3] = 0 (unused for now) + */ + + /* write metadata (tag) */ + if (tag_len > (65535 - FLB_INPUT_CHUNK_META_HEADER)) { + /* truncate length */ + tag_len = 65535 - FLB_INPUT_CHUNK_META_HEADER; + } + meta_size = FLB_INPUT_CHUNK_META_HEADER + tag_len; + + /* Allocate buffer for metadata header */ + meta = flb_calloc(1, meta_size); + if (!meta) { + flb_errno(); + return -1; + } + + /* + * Write chunk header in a temporary buffer + * ---------------------------------------- + */ + + /* magic bytes */ + meta[0] = FLB_INPUT_CHUNK_MAGIC_BYTE_0; + meta[1] = FLB_INPUT_CHUNK_MAGIC_BYTE_1; + + /* event type */ + if (event_type == FLB_INPUT_LOGS) { + meta[2] = FLB_INPUT_CHUNK_TYPE_LOGS; + } + else if (event_type == FLB_INPUT_METRICS) { + meta[2] = FLB_INPUT_CHUNK_TYPE_METRICS; + } + else if (event_type == FLB_INPUT_TRACES) { + meta[2] = FLB_INPUT_CHUNK_TYPE_TRACES; + } + + /* unused byte */ + meta[3] = 0; + + /* copy the tag after magic bytes */ + memcpy(meta + FLB_INPUT_CHUNK_META_HEADER, tag, tag_len); + + /* Write tag into metadata section */ + ret = cio_meta_write(chunk, (char *) meta, meta_size); + if (ret == -1) { + flb_error("[input chunk] could not write metadata"); + flb_free(meta); + return -1; + } + flb_free(meta); + + return 0; +} + +struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type, + const char *tag, int tag_len) +{ + int ret; + int err; + int set_down = FLB_FALSE; + int has_routes; + char name[64]; + struct cio_chunk *chunk; + struct flb_storage_input *storage; + struct flb_input_chunk *ic; + + storage = in->storage; + + /* chunk name */ + generate_chunk_name(in, name, sizeof(name) - 1); + + /* open/create target chunk file */ + chunk = cio_chunk_open(storage->cio, storage->stream, name, + CIO_OPEN, FLB_INPUT_CHUNK_SIZE, &err); + if (!chunk) { + flb_error("[input chunk] could not create chunk file: %s:%s", + storage->stream->name, name); + return NULL; + } + /* + * If the returned chunk at open is 'down', just put it up, write the + * content and set it down again. + */ + ret = cio_chunk_is_up(chunk); + if (ret == CIO_FALSE) { + ret = cio_chunk_up_force(chunk); + if (ret == -1) { + cio_chunk_close(chunk, CIO_TRUE); + return NULL; + } + set_down = FLB_TRUE; + } + + /* Write chunk header */ + ret = input_chunk_write_header(chunk, event_type, (char *) tag, tag_len); + if (ret == -1) { + cio_chunk_close(chunk, CIO_TRUE); + return NULL; + } + + /* Create context for the input instance */ + ic = flb_calloc(1, sizeof(struct flb_input_chunk)); + if (!ic) { + flb_errno(); + cio_chunk_close(chunk, CIO_TRUE); + return NULL; + } + + /* + * Check chunk content type to be created: depending of the value set by + * the input plugin, this can be FLB_INPUT_LOGS, FLB_INPUT_METRICS or + * FLB_INPUT_TRACES. + */ + ic->event_type = event_type; + ic->busy = FLB_FALSE; + ic->fs_counted = FLB_FALSE; + ic->chunk = chunk; + ic->fs_backlog = FLB_FALSE; + ic->in = in; + ic->stream_off = 0; + ic->task = NULL; +#ifdef FLB_HAVE_METRICS + ic->total_records = 0; +#endif + + /* Calculate the routes_mask for the input chunk */ + has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag, tag_len, in); + if (has_routes == 0) { + flb_trace("[input chunk] no matching route for input chunk '%s' with tag '%s'", + flb_input_chunk_get_name(ic), tag); + } + + msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write); + mk_list_add(&ic->_head, &in->chunks); + + if (set_down == FLB_TRUE) { + cio_chunk_down(chunk); + } + + if (event_type == FLB_INPUT_LOGS) { + flb_hash_table_add(in->ht_log_chunks, tag, tag_len, ic, 0); + } + else if (event_type == FLB_INPUT_METRICS) { + flb_hash_table_add(in->ht_metric_chunks, tag, tag_len, ic, 0); + } + else if (event_type == FLB_INPUT_TRACES) { + flb_hash_table_add(in->ht_trace_chunks, tag, tag_len, ic, 0); + } + + return ic; +} + +int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic, + const char *tag_buf, int tag_len, + int del) +{ + ssize_t bytes; + struct mk_list *head; + struct flb_output_instance *o_ins; + + mk_list_foreach(head, &ic->in->config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + + if (o_ins->total_limit_size == -1) { + continue; + } + + bytes = flb_input_chunk_get_real_size(ic); + if (bytes == -1) { + // no data in the chunk + continue; + } + + if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) { + if (ic->fs_counted == FLB_TRUE) { + FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes); + o_ins->fs_chunks_size -= bytes; + flb_debug("[input chunk] remove chunk %s with %ld bytes from plugin %s, " + "the updated fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic), + bytes, o_ins->name, o_ins->fs_chunks_size); + } + } + } + + if (del == CIO_TRUE && tag_buf) { + /* + * "TRY" to delete any reference to this chunk ('ic') from the hash + * table. Note that maybe the value is not longer available in the + * entries if it was replaced: note that we always keep the last + * chunk for a specific Tag. + */ + if (ic->event_type == FLB_INPUT_LOGS) { + flb_hash_table_del_ptr(ic->in->ht_log_chunks, + tag_buf, tag_len, (void *) ic); + } + else if (ic->event_type == FLB_INPUT_METRICS) { + flb_hash_table_del_ptr(ic->in->ht_metric_chunks, + tag_buf, tag_len, (void *) ic); + } + else if (ic->event_type == FLB_INPUT_TRACES) { + flb_hash_table_del_ptr(ic->in->ht_trace_chunks, + tag_buf, tag_len, (void *) ic); + } + } + +#ifdef FLB_HAVE_CHUNK_TRACE + if (ic->trace != NULL) { + flb_chunk_trace_destroy(ic->trace); + } +#endif /* FLB_HAVE_CHUNK_TRACE */ + + cio_chunk_close(ic->chunk, del); + mk_list_del(&ic->_head); + flb_free(ic); + + return 0; +} + + +int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del) +{ + int tag_len; + int ret; + ssize_t bytes; + const char *tag_buf = NULL; + struct mk_list *head; + struct flb_output_instance *o_ins; + + if (flb_input_chunk_is_up(ic) == FLB_FALSE) { + flb_input_chunk_set_up(ic); + } + + mk_list_foreach(head, &ic->in->config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + + if (o_ins->total_limit_size == -1) { + continue; + } + + bytes = flb_input_chunk_get_real_size(ic); + if (bytes == -1) { + // no data in the chunk + continue; + } + + if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) { + if (ic->fs_counted == FLB_TRUE) { + FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes); + o_ins->fs_chunks_size -= bytes; + flb_debug("[input chunk] remove chunk %s with %ld bytes from plugin %s, " + "the updated fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic), + bytes, o_ins->name, o_ins->fs_chunks_size); + } + } + } + + /* + * When a chunk is going to be destroyed, this can be in a down state, + * since the next step is to retrieve the Tag we need to have the + * content up. + */ + ret = flb_input_chunk_is_up(ic); + if (ret == FLB_FALSE) { + ret = cio_chunk_up_force(ic->chunk); + if (ret == -1) { + flb_error("[input chunk] cannot load chunk: %s", + flb_input_chunk_get_name(ic)); + } + } + + /* Retrieve Tag */ + ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len); + if (ret == -1) { + flb_trace("[input chunk] could not retrieve chunk tag: %s", + flb_input_chunk_get_name(ic)); + } + + if (del == CIO_TRUE && tag_buf) { + /* + * "TRY" to delete any reference to this chunk ('ic') from the hash + * table. Note that maybe the value is not longer available in the + * entries if it was replaced: note that we always keep the last + * chunk for a specific Tag. + */ + if (ic->event_type == FLB_INPUT_LOGS) { + flb_hash_table_del_ptr(ic->in->ht_log_chunks, + tag_buf, tag_len, (void *) ic); + } + else if (ic->event_type == FLB_INPUT_METRICS) { + flb_hash_table_del_ptr(ic->in->ht_metric_chunks, + tag_buf, tag_len, (void *) ic); + } + else if (ic->event_type == FLB_INPUT_TRACES) { + flb_hash_table_del_ptr(ic->in->ht_trace_chunks, + tag_buf, tag_len, (void *) ic); + } + } + +#ifdef FLB_HAVE_CHUNK_TRACE + if (ic->trace != NULL) { + flb_chunk_trace_destroy(ic->trace); + } +#endif /* FLB_HAVE_CHUNK_TRACE */ + + cio_chunk_close(ic->chunk, del); + mk_list_del(&ic->_head); + flb_free(ic); + + return 0; +} + +/* Return or create an available chunk to write data */ +static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, + int event_type, + const char *tag, int tag_len, + size_t chunk_size, int *set_down) +{ + int id = -1; + int ret; + int new_chunk = FLB_FALSE; + size_t out_size; + struct flb_input_chunk *ic = NULL; + + if (tag_len > FLB_INPUT_CHUNK_TAG_MAX) { + flb_plg_warn(in, + "Tag set exceeds limit, truncating from %i to %i bytes", + tag_len, FLB_INPUT_CHUNK_TAG_MAX); + tag_len = FLB_INPUT_CHUNK_TAG_MAX; + } + + if (event_type == FLB_INPUT_LOGS) { + id = flb_hash_table_get(in->ht_log_chunks, tag, tag_len, + (void *) &ic, &out_size); + } + else if (event_type == FLB_INPUT_METRICS) { + id = flb_hash_table_get(in->ht_metric_chunks, tag, tag_len, + (void *) &ic, &out_size); + } + else if (event_type == FLB_INPUT_TRACES) { + id = flb_hash_table_get(in->ht_trace_chunks, tag, tag_len, + (void *) &ic, &out_size); + } + + if (id >= 0) { + if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk)) { + ic = NULL; + } + else if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) { + ret = cio_chunk_up_force(ic->chunk); + + if (ret == CIO_CORRUPTED) { + if (in->config->storage_del_bad_chunks) { + /* If the chunk is corrupted we need to discard it and + * set ic to NULL so the system tries to allocate a new + * chunk. + */ + + flb_error("[input chunk] discarding corrupted chunk"); + } + + flb_input_chunk_destroy_corrupted(ic, + tag, tag_len, + in->config->storage_del_bad_chunks); + + ic = NULL; + } + else if (ret != CIO_OK) { + ic = NULL; + } + + *set_down = FLB_TRUE; + } + } + + /* No chunk was found, we need to create a new one */ + if (!ic) { + ic = flb_input_chunk_create(in, event_type, (char *) tag, tag_len); + new_chunk = FLB_TRUE; + if (!ic) { + return NULL; + } + ic->event_type = event_type; + } + + /* + * If buffering this block of data will exceed one of the limit among all output instances + * that the chunk will flush to, we need to modify the routes_mask of the oldest chunks + * (based in creation time) to get enough space for the incoming chunk. + */ + if (!flb_routes_mask_is_empty(ic->routes_mask) + && flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) { + /* + * If the chunk is not newly created, the chunk might already have logs inside. + * We cannot delete (reused) chunks here. + * If the routes_mask is cleared after trying to append new data, we destroy + * the chunk. + */ + if (new_chunk || flb_routes_mask_is_empty(ic->routes_mask) == FLB_TRUE) { + flb_input_chunk_destroy(ic, FLB_TRUE); + } + return NULL; + } + + return ic; +} + +static inline int flb_input_chunk_is_mem_overlimit(struct flb_input_instance *i) +{ + if (i->mem_buf_limit <= 0) { + return FLB_FALSE; + } + + if (i->mem_chunks_size >= i->mem_buf_limit) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance *i) +{ + struct flb_storage_input *storage = (struct flb_storage_input *)i->storage; + + if (storage->type == FLB_STORAGE_FS) { + if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) { + if (storage->cio->total_chunks_up >= storage->cio->max_chunks_up) { + return FLB_TRUE; + } + } + } + + return FLB_FALSE; +} + +/* + * Check all chunks associated to the input instance and summarize + * the number of bytes in use. + */ +size_t flb_input_chunk_total_size(struct flb_input_instance *in) +{ + size_t total = 0; + struct flb_storage_input *storage; + + storage = (struct flb_storage_input *) in->storage; + total = cio_stream_size_chunks_up(storage->stream); + return total; +} + +/* + * Count and update the number of bytes being used by the instance. Also + * check if the instance is paused, if so, check if it can be resumed if + * is not longer over the limits. + * + * It always returns the number of bytes in use. + */ +size_t flb_input_chunk_set_limits(struct flb_input_instance *in) +{ + size_t total; + + /* Gather total number of enqueued bytes */ + total = flb_input_chunk_total_size(in); + + /* Register the total into the context variable */ + in->mem_chunks_size = total; + + /* + * After the adjustments, validate if the plugin is overlimit or paused + * and perform further adjustments. + */ + if (flb_input_chunk_is_mem_overlimit(in) == FLB_FALSE && + in->config->is_running == FLB_TRUE && + in->config->is_ingestion_active == FLB_TRUE && + in->mem_buf_status == FLB_INPUT_PAUSED) { + in->mem_buf_status = FLB_INPUT_RUNNING; + if (in->p->cb_resume) { + flb_input_resume(in); + flb_info("[input] %s resume (mem buf overlimit)", + in->name); + } + } + if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE && + in->config->is_running == FLB_TRUE && + in->config->is_ingestion_active == FLB_TRUE && + in->storage_buf_status == FLB_INPUT_PAUSED) { + in->storage_buf_status = FLB_INPUT_RUNNING; + if (in->p->cb_resume) { + flb_input_resume(in); + flb_info("[input] %s resume (storage buf overlimit %zu/%zu)", + in->name, + ((struct flb_storage_input *)in->storage)->cio->total_chunks_up, + ((struct flb_storage_input *)in->storage)->cio->max_chunks_up); + } + } + + return total; +} + +/* + * If the number of bytes in use by the chunks are over the imposed limit + * by configuration, pause the instance. + */ +static inline int flb_input_chunk_protect(struct flb_input_instance *i) +{ + struct flb_storage_input *storage = i->storage; + + if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) { + flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)", + i->name, + storage->cio->total_chunks_up, + storage->cio->max_chunks_up); + flb_input_pause(i); + i->storage_buf_status = FLB_INPUT_PAUSED; + return FLB_TRUE; + } + + if (storage->type == FLB_STORAGE_FS) { + return FLB_FALSE; + } + + if (flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) { + /* + * if the plugin is already overlimit and the strategy is based on + * a memory-ring-buffer logic, do not pause the plugin, upon next + * try of ingestion 'memrb' will make sure to release some bytes. + */ + if (i->storage_type == FLB_STORAGE_MEMRB) { + return FLB_FALSE; + } + + /* + * The plugin is using 'memory' buffering only and already reached + * it limit, just pause the ingestion. + */ + flb_warn("[input] %s paused (mem buf overlimit)", + i->name); + flb_input_pause(i); + i->mem_buf_status = FLB_INPUT_PAUSED; + return FLB_TRUE; + } + + return FLB_FALSE; +} + +/* + * Validate if the chunk coming from the input plugin based on config and + * resources usage must be 'up' or 'down' (applicable for filesystem storage + * type). + * + * FIXME: can we find a better name for this function ? + */ +int flb_input_chunk_set_up_down(struct flb_input_chunk *ic) +{ + size_t total; + struct flb_input_instance *in; + + in = ic->in; + + /* Gather total number of enqueued bytes */ + total = flb_input_chunk_total_size(in); + + /* Register the total into the context variable */ + in->mem_chunks_size = total; + + if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE) { + if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) { + cio_chunk_down(ic->chunk); + + /* Adjust new counters */ + total = flb_input_chunk_total_size(ic->in); + in->mem_chunks_size = total; + + return FLB_FALSE; + } + } + + return FLB_TRUE; +} + +int flb_input_chunk_is_up(struct flb_input_chunk *ic) +{ + return cio_chunk_is_up(ic->chunk); +} + +int flb_input_chunk_down(struct flb_input_chunk *ic) +{ + if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) { + return cio_chunk_down(ic->chunk); + } + + return 0; +} + +int flb_input_chunk_set_up(struct flb_input_chunk *ic) +{ + if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) { + return cio_chunk_up(ic->chunk); + } + + return 0; +} + +static int memrb_input_chunk_release_space(struct flb_input_instance *ins, + size_t required_space, + size_t *dropped_chunks, size_t *dropped_bytes) +{ + int ret; + int released; + size_t removed_chunks = 0; + ssize_t chunk_size; + ssize_t released_space = 0; + struct mk_list *tmp; + struct mk_list *head; + struct flb_input_chunk *ic; + + mk_list_foreach_safe(head, tmp, &ins->chunks) { + ic = mk_list_entry(head, struct flb_input_chunk, _head); + + /* check if is there any task or no users associated */ + ret = flb_input_chunk_is_task_safe_delete(ic->task); + if (ret == FLB_FALSE) { + continue; + } + + /* get chunk size */ + chunk_size = flb_input_chunk_get_real_size(ic); + + released = FLB_FALSE; + if (ic->task != NULL) { + if (ic->task->users == 0) { + flb_task_destroy(ic->task, FLB_TRUE); + released = FLB_TRUE; + } + } + else { + flb_input_chunk_destroy(ic, FLB_TRUE); + released = FLB_TRUE; + } + + if (released) { + released_space += chunk_size; + removed_chunks++; + } + + if (released_space >= required_space) { + break; + } + } + + /* no matter if we succeeded or not, set the counters */ + *dropped_bytes = released_space; + *dropped_chunks = removed_chunks; + + /* set the final status of the operation */ + if (released_space >= required_space) { + return 0; + } + + return -1; +} + +/* Append a RAW MessagPack buffer to the input instance */ +static int input_chunk_append_raw(struct flb_input_instance *in, + int event_type, + size_t n_records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size) +{ + int ret; + int set_down = FLB_FALSE; + int min; + int new_chunk = FLB_FALSE; + uint64_t ts; + char *name; + size_t dropped_chunks; + size_t dropped_bytes; + size_t content_size; + size_t real_diff; + size_t real_size; + size_t pre_real_size; + struct flb_input_chunk *ic; + struct flb_storage_input *si; + + /* memory ring-buffer checker */ + if (in->storage_type == FLB_STORAGE_MEMRB) { + /* check if we are overlimit */ + ret = flb_input_chunk_is_mem_overlimit(in); + if (ret) { + /* reset counters */ + dropped_chunks = 0; + dropped_bytes = 0; + + /* try to release 'buf_size' */ + ret = memrb_input_chunk_release_space(in, buf_size, + &dropped_chunks, &dropped_bytes); + + /* update metrics if required */ + if (dropped_chunks > 0 || dropped_bytes > 0) { + /* timestamp and input plugin name for label */ + ts = cfl_time_now(); + name = (char *) flb_input_name(in); + + /* update counters */ + cmt_counter_add(in->cmt_memrb_dropped_chunks, ts, + dropped_chunks, 1, (char *[]) {name}); + + cmt_counter_add(in->cmt_memrb_dropped_bytes, ts, + dropped_bytes, 1, (char *[]) {name}); + } + + if (ret != 0) { + /* we could not allocate the required space, just return */ + return -1; + } + } + } + + /* Check if the input plugin has been paused */ + if (flb_input_buf_paused(in) == FLB_TRUE) { + flb_debug("[input chunk] %s is paused, cannot append records", + in->name); + return -1; + } + + if (buf_size == 0) { + flb_debug("[input chunk] skip ingesting data with 0 bytes"); + return -1; + } + + /* + * Some callers might not set a custom tag, on that case just inherit + * the fixed instance tag or instance name. + */ + if (!tag) { + if (in->tag && in->tag_len > 0) { + tag = in->tag; + tag_len = in->tag_len; + } + else { + tag = in->name; + tag_len = strlen(in->name); + } + } + + /* + * Get a target input chunk, can be one with remaining space available + * or a new one. + */ + ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down); + if (!ic) { + flb_error("[input chunk] no available chunk"); + return -1; + } + + /* newly created chunk */ + if (flb_input_chunk_get_size(ic) == 0) { + new_chunk = FLB_TRUE; + } + + /* We got the chunk, validate if is 'up' or 'down' */ + ret = flb_input_chunk_is_up(ic); + if (ret == FLB_FALSE) { + ret = cio_chunk_up_force(ic->chunk); + if (ret == -1) { + flb_error("[input chunk] cannot retrieve temporary chunk"); + return -1; + } + set_down = FLB_TRUE; + } + + /* + * Keep the previous real size to calculate the real size + * difference for flb_input_chunk_update_output_instances(), + * use 0 when the chunk is new since it's size will never + * have been calculated before. + */ + if (new_chunk == FLB_TRUE) { + pre_real_size = 0; + } + else { + pre_real_size = flb_input_chunk_get_real_size(ic); + } + + /* Write the new data */ + ret = flb_input_chunk_write(ic, buf, buf_size); + if (ret == -1) { + flb_error("[input chunk] error writing data from %s instance", + in->name); + cio_chunk_tx_rollback(ic->chunk); + return -1; + } + +#ifdef FLB_HAVE_CHUNK_TRACE + flb_chunk_trace_do_input(ic); +#endif /* FLB_HAVE_CHUNK_TRACE */ + + /* Update 'input' metrics */ +#ifdef FLB_HAVE_METRICS + if (ret == CIO_OK) { + ic->added_records = n_records; + ic->total_records += n_records; + } + + if (ic->total_records > 0) { + /* timestamp */ + ts = cfl_time_now(); + + /* fluentbit_input_records_total */ + cmt_counter_add(in->cmt_records, ts, ic->added_records, + 1, (char *[]) {(char *) flb_input_name(in)}); + + /* fluentbit_input_bytes_total */ + cmt_counter_add(in->cmt_bytes, ts, buf_size, + 1, (char *[]) {(char *) flb_input_name(in)}); + + /* OLD api */ + flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics); + flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics); + } +#endif + + /* Apply filters */ + if (event_type == FLB_INPUT_LOGS) { + flb_filter_do(ic, + buf, buf_size, + tag, tag_len, in->config); + } + + /* get the chunks content size */ + content_size = cio_chunk_get_content_size(ic->chunk); + + /* + * There is a case that rewrite_tag will modify the tag and keep rule is set + * to drop the original record. The original record will still go through the + * flb_input_chunk_update_output_instances(2) to update the fs_chunks_size by + * metadata bytes (consisted by metadata bytes of the file chunk). This condition + * sets the diff to 0 in order to not update the fs_chunks_size. + */ + if (flb_input_chunk_get_size(ic) == 0) { + real_diff = 0; + } + + /* Lock buffers where size > 2MB */ + if (content_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { + cio_chunk_lock(ic->chunk); + } + + /* Make sure the data was not filtered out and the buffer size is zero */ + if (content_size == 0) { + flb_input_chunk_destroy(ic, FLB_TRUE); + flb_input_chunk_set_limits(in); + return 0; + } +#ifdef FLB_HAVE_STREAM_PROCESSOR + else if (in->config->stream_processor_ctx && + ic->event_type == FLB_INPUT_LOGS) { + char *c_data; + size_t c_size; + + /* Retrieve chunk (filtered) output content */ + cio_chunk_get_content(ic->chunk, &c_data, &c_size); + + /* Invoke stream processor */ + flb_sp_do(in->config->stream_processor_ctx, + in, + tag, tag_len, + c_data + ic->stream_off, c_size - ic->stream_off); + ic->stream_off += (c_size - ic->stream_off); + } +#endif + + if (set_down == FLB_TRUE) { + cio_chunk_down(ic->chunk); + } + + /* + * If the instance is not routable, there is no need to keep the + * content in the storage engine, just get rid of it. + */ + if (in->routable == FLB_FALSE) { + flb_input_chunk_destroy(ic, FLB_TRUE); + return 0; + } + + /* Update memory counters and adjust limits if any */ + flb_input_chunk_set_limits(in); + + /* + * Check if we are overlimit and validate if is there any filesystem + * storage type asociated to this input instance, if so, unload the + * chunk content from memory to respect imposed limits. + * + * Calling cio_chunk_down() the memory map associated and the file + * descriptor will be released. At any later time, it must be bring up + * for I/O operations. + */ + si = (struct flb_storage_input *) in->storage; + if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE && + si->type == FLB_STORAGE_FS) { + if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) { + /* + * If we are already over limit, a sub-sequent data ingestion + * might need a Chunk to write data in. As an optimization we + * will put this Chunk down ONLY IF it has less than 1% of + * it capacity as available space, otherwise keep it 'up' so + * it available space can be used. + */ + content_size = cio_chunk_get_content_size(ic->chunk); + + /* Do we have less than 1% available ? */ + min = (FLB_INPUT_CHUNK_FS_MAX_SIZE * 0.01); + if (FLB_INPUT_CHUNK_FS_MAX_SIZE - content_size < min) { + cio_chunk_down(ic->chunk); + } + } + } + + real_size = flb_input_chunk_get_real_size(ic); + real_diff = real_size - pre_real_size; + if (real_diff != 0) { + flb_debug("[input chunk] update output instances with new chunk size diff=%zd, records=%zu, input=%s", + real_diff, n_records, flb_input_name(in)); + flb_input_chunk_update_output_instances(ic, real_diff); + } + +#ifdef FLB_HAVE_CHUNK_TRACE + if (ic->trace) { + flb_chunk_trace_pre_output(ic->trace); + } +#endif /* FLB_HAVE_CHUNK_TRACE */ + + flb_input_chunk_protect(in); + return 0; +} + +static void destroy_chunk_raw(struct input_chunk_raw *cr) +{ + if (cr->buf_data) { + flb_free(cr->buf_data); + } + + if (cr->tag) { + flb_sds_destroy(cr->tag); + } + + flb_free(cr); +} + +static int append_to_ring_buffer(struct flb_input_instance *ins, + int event_type, + size_t records, + const char *tag, + size_t tag_len, + const void *buf, + size_t buf_size) + +{ + int ret; + int retries = 0; + int retry_limit = 10; + struct input_chunk_raw *cr; + + cr = flb_calloc(1, sizeof(struct input_chunk_raw)); + if (!cr) { + flb_errno(); + return -1; + } + cr->ins = ins; + cr->event_type = event_type; + + if (tag && tag_len > 0) { + cr->tag = flb_sds_create_len(tag, tag_len); + if (!cr->tag) { + flb_free(cr); + return -1; + } + } + else { + cr->tag = NULL; + } + + cr->records = records; + cr->buf_data = flb_malloc(buf_size); + if (!cr->buf_data) { + flb_errno(); + destroy_chunk_raw(cr); + return -1; + } + + /* + * this memory copy is just a simple overhead, the problem we have is that + * input instances always assume that they have to release their buffer since + * the append raw operation already did a copy. Not a big issue but maybe this + * is a tradeoff... + */ + memcpy(cr->buf_data, buf, buf_size); + cr->buf_size = buf_size; + + + +retry: + /* + * There is a little chance that the ring buffer is full or due to saturation + * from the main thread the data is not being consumed. On this scenario we + * retry up to 'retry_limit' times with a little wait time. + */ + if (retries >= retry_limit) { + flb_plg_error(ins, "could not enqueue records into the ring buffer"); + destroy_chunk_raw(cr); + return -1; + } + + /* append chunk raw context to the ring buffer */ + ret = flb_ring_buffer_write(ins->rb, (void *) &cr, sizeof(cr)); + if (ret == -1) { + flb_plg_debug(ins, "failed buffer write, retries=%i\n", + retries); + + /* sleep for 100000 microseconds (100 milliseconds) */ + usleep(100000); + retries++; + goto retry; + } + + return 0; +} + +/* iterate input instance ring buffer and remove any enqueued input_chunk_raw */ +void flb_input_chunk_ring_buffer_cleanup(struct flb_input_instance *ins) +{ + int ret; + struct input_chunk_raw *cr; + + if (!ins->rb) { + return; + } + + while ((ret = flb_ring_buffer_read(ins->rb, (void *) &cr, sizeof(cr))) == 0) { + if (cr) { + destroy_chunk_raw(cr); + cr = NULL; + } + } +} + +void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data) +{ + int ret; + int tag_len = 0; + struct mk_list *head; + struct flb_input_instance *ins; + struct input_chunk_raw *cr; + + mk_list_foreach(head, &ctx->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + cr = NULL; + + while (1) { + if (flb_input_buf_paused(ins) == FLB_TRUE) { + break; + } + + ret = flb_ring_buffer_read(ins->rb, + (void *) &cr, + sizeof(cr)); + if (ret != 0) { + break; + } + + if (cr) { + if (cr->tag) { + tag_len = flb_sds_len(cr->tag); + } + else { + tag_len = 0; + } + + input_chunk_append_raw(cr->ins, cr->event_type, cr->records, + cr->tag, tag_len, + cr->buf_data, cr->buf_size); + destroy_chunk_raw(cr); + } + cr = NULL; + } + + ins->rb->flush_pending = FLB_FALSE; + } +} + +int flb_input_chunk_append_raw(struct flb_input_instance *in, + int event_type, + size_t records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size) +{ + int ret; + + /* + * If the plugin instance registering the data runs in a separate thread, we must + * add the data reference to the ring buffer. + */ + if (flb_input_is_threaded(in)) { + ret = append_to_ring_buffer(in, event_type, records, + tag, tag_len, + buf, buf_size); + } + else { + ret = input_chunk_append_raw(in, event_type, records, + tag, tag_len, buf, buf_size); + } + + return ret; +} + +/* Retrieve a raw buffer from a dyntag node */ +const void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size) +{ + int ret; + size_t pre_size; + size_t post_size; + ssize_t diff_size; + char *buf = NULL; + + pre_size = flb_input_chunk_get_real_size(ic); + + if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) { + ret = cio_chunk_up(ic->chunk); + if (ret == -1) { + return NULL; + } + } + + /* Lock the internal chunk + * + * This operation has to be performed before getting the chunk data + * pointer because in certain situations it could cause the chunk + * mapping to be relocated (ie. macos / windows on trim) + */ + cio_chunk_lock(ic->chunk); + + /* + * msgpack-c internal use a raw buffer for it operations, since we + * already appended data we just can take out the references to avoid + * a new memory allocation and skip a copy operation. + */ + ret = cio_chunk_get_content(ic->chunk, &buf, size); + + if (ret == -1) { + flb_error("[input chunk] error retrieving chunk content"); + return NULL; + } + + if (!buf) { + *size = 0; + return NULL; + } + + /* Set it busy as it likely it's a reference for an outgoing task */ + ic->busy = FLB_TRUE; + + post_size = flb_input_chunk_get_real_size(ic); + if (post_size != pre_size) { + diff_size = post_size - pre_size; + flb_input_chunk_update_output_instances(ic, diff_size); + } + return buf; +} + +int flb_input_chunk_release_lock(struct flb_input_chunk *ic) +{ + if (ic->busy == FLB_FALSE) { + return -1; + } + + ic->busy = FLB_FALSE; + return 0; +} + +flb_sds_t flb_input_chunk_get_name(struct flb_input_chunk *ic) +{ + struct cio_chunk *ch; + + ch = (struct cio_chunk *) ic->chunk; + return ch->name; +} + +static inline int input_chunk_has_magic_bytes(char *buf, int len) +{ + unsigned char *p; + + if (len < FLB_INPUT_CHUNK_META_HEADER) { + return FLB_FALSE; + } + + p = (unsigned char *) buf; + if (p[0] == FLB_INPUT_CHUNK_MAGIC_BYTE_0 && + p[1] == FLB_INPUT_CHUNK_MAGIC_BYTE_1 && p[3] == 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +/* + * Get the event type by retrieving metadata header. NOTE: this function only event type discovery by looking at the + * headers bytes of a chunk that exists on disk. + */ +int flb_input_chunk_get_event_type(struct flb_input_chunk *ic) +{ + int len; + int ret; + int type = -1; + char *buf = NULL; + + ret = cio_meta_read(ic->chunk, &buf, &len); + if (ret == -1) { + return -1; + } + + /* Check metadata header / magic bytes */ + if (input_chunk_has_magic_bytes(buf, len)) { + if (buf[2] == FLB_INPUT_CHUNK_TYPE_LOGS) { + type = FLB_INPUT_LOGS; + } + else if (buf[2] == FLB_INPUT_CHUNK_TYPE_METRICS) { + type = FLB_INPUT_METRICS; + } + else if (buf[2] == FLB_INPUT_CHUNK_TYPE_TRACES) { + type = FLB_INPUT_TRACES; + } + } + else { + type = FLB_INPUT_LOGS; + } + + + return type; +} + +int flb_input_chunk_get_tag(struct flb_input_chunk *ic, + const char **tag_buf, int *tag_len) +{ + int len; + int ret; + char *buf; + + ret = cio_meta_read(ic->chunk, &buf, &len); + if (ret == -1) { + *tag_len = -1; + *tag_buf = NULL; + return -1; + } + + /* If magic bytes exists, just set the offset */ + if (input_chunk_has_magic_bytes(buf, len)) { + *tag_len = len - FLB_INPUT_CHUNK_META_HEADER; + *tag_buf = buf + FLB_INPUT_CHUNK_META_HEADER; + } + else { + /* Old Chunk version without magic bytes */ + *tag_len = len; + *tag_buf = buf; + } + + return ret; +} + +/* + * Iterates all output instances that the chunk will be flushing to and summarize + * the total number of bytes in use after ingesting the new data. + */ +void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic, + size_t chunk_size) +{ + struct mk_list *head; + struct flb_output_instance *o_ins; + + /* for each output plugin, we update the fs_chunks_size */ + mk_list_foreach(head, &ic->in->config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + if (o_ins->total_limit_size == -1) { + continue; + } + + if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) { + /* + * if there is match on any index of 1's in the binary, it indicates + * that the input chunk will flush to this output instance + */ + FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, chunk_size); + o_ins->fs_chunks_size += chunk_size; + ic->fs_counted = FLB_TRUE; + + flb_debug("[input chunk] chunk %s update plugin %s fs_chunks_size by %ld bytes, " + "the current fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic), + o_ins->name, chunk_size, o_ins->fs_chunks_size); + } + } +} |