summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_input_chunk.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_input_chunk.c')
-rw-r--r--fluent-bit/src/flb_input_chunk.c2009
1 files changed, 0 insertions, 2009 deletions
diff --git a/fluent-bit/src/flb_input_chunk.c b/fluent-bit/src/flb_input_chunk.c
deleted file mode 100644
index c71ae3ef0..000000000
--- a/fluent-bit/src/flb_input_chunk.c
+++ /dev/null
@@ -1,2009 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define FS_CHUNK_SIZE_DEBUG(op) {flb_trace("[%d] %s -> fs_chunks_size = %zu", \
- __LINE__, op->name, op->fs_chunks_size);}
-#define FS_CHUNK_SIZE_DEBUG_MOD(op, chunk, mod) {flb_trace( \
- "[%d] %s -> fs_chunks_size = %zu mod=%zd chunk=%s", __LINE__, \
- op->name, op->fs_chunks_size, mod, flb_input_chunk_get_name(chunk));}
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_config.h>
-#include <fluent-bit/flb_input.h>
-#include <fluent-bit/flb_input_chunk.h>
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/flb_storage.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_router.h>
-#include <fluent-bit/flb_task.h>
-#include <fluent-bit/flb_routes_mask.h>
-#include <fluent-bit/flb_metrics.h>
-#include <fluent-bit/stream_processor/flb_sp.h>
-#include <fluent-bit/flb_ring_buffer.h>
-#include <chunkio/chunkio.h>
-#include <monkey/mk_core.h>
-
-
-#ifdef FLB_HAVE_CHUNK_TRACE
-#include <fluent-bit/flb_chunk_trace.h>
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
-
-#define BLOCK_UNTIL_KEYPRESS() {char temp_keypress_buffer; read(0, &temp_keypress_buffer, 1);}
-
-#define FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL 0
-#define FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL 1
-
-struct input_chunk_raw {
- struct flb_input_instance *ins;
- int event_type;
- size_t records;
- flb_sds_t tag;
- void *buf_data;
- size_t buf_size;
-};
-
-#ifdef FLB_HAVE_IN_STORAGE_BACKLOG
-
-extern ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin,
- size_t required_space);
-
-extern int sb_release_output_queue_space(struct flb_output_instance *output_plugin,
- ssize_t *required_space);
-
-
-#else
-
-ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin,
- size_t required_space)
-{
- return 0;
-}
-
-int sb_release_output_queue_space(struct flb_output_instance *output_plugin,
- ssize_t *required_space)
-{
- return 0;
-}
-
-#endif
-
-static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic,
- struct flb_input_chunk *old_ic,
- uint64_t o_id);
-
-static int flb_input_chunk_is_task_safe_delete(struct flb_task *task);
-
-static int flb_input_chunk_drop_task_route(
- struct flb_task *task,
- struct flb_output_instance *o_ins);
-
-static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic);
-
-static int flb_input_chunk_release_space(
- struct flb_input_chunk *new_input_chunk,
- struct flb_input_instance *input_plugin,
- struct flb_output_instance *output_plugin,
- ssize_t *required_space,
- int release_scope)
-{
- struct mk_list *input_chunk_iterator_tmp;
- struct mk_list *input_chunk_iterator;
- int chunk_destroy_flag;
- struct flb_input_chunk *old_input_chunk;
- ssize_t released_space;
- int chunk_released;
- ssize_t chunk_size;
-
- released_space = 0;
-
- mk_list_foreach_safe(input_chunk_iterator, input_chunk_iterator_tmp,
- &input_plugin->chunks) {
- old_input_chunk = mk_list_entry(input_chunk_iterator,
- struct flb_input_chunk, _head);
-
- if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask,
- output_plugin->id)) {
- continue;
- }
-
- if (flb_input_chunk_safe_delete(new_input_chunk,
- old_input_chunk,
- output_plugin->id) == FLB_FALSE) {
- continue;
- }
-
- if (flb_input_chunk_drop_task_route(old_input_chunk->task,
- output_plugin) == FLB_FALSE) {
- continue;
- }
-
- chunk_size = flb_input_chunk_get_real_size(old_input_chunk);
- chunk_released = FLB_FALSE;
- chunk_destroy_flag = FLB_FALSE;
-
- if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) {
- flb_routes_mask_clear_bit(old_input_chunk->routes_mask,
- output_plugin->id);
-
- FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size);
- output_plugin->fs_chunks_size -= chunk_size;
-
- chunk_destroy_flag = flb_routes_mask_is_empty(
- old_input_chunk->routes_mask);
-
- chunk_released = FLB_TRUE;
- }
- else if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) {
- chunk_destroy_flag = FLB_TRUE;
- }
-
- if (chunk_destroy_flag) {
- if (old_input_chunk->task != NULL) {
- /*
- * If the chunk is referenced by a task and task has no active route,
- * we need to destroy the task as well.
- */
- if (old_input_chunk->task->users == 0) {
- flb_debug("[task] drop task_id %d with no active route from input plugin %s",
- old_input_chunk->task->id, new_input_chunk->in->name);
- flb_task_destroy(old_input_chunk->task, FLB_TRUE);
-
- chunk_released = FLB_TRUE;
- }
- }
- else {
- flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s",
- flb_input_chunk_get_name(old_input_chunk), new_input_chunk->in->name);
-
- flb_input_chunk_destroy(old_input_chunk, FLB_TRUE);
-
- chunk_released = FLB_TRUE;
- }
- }
-
- if (chunk_released) {
- released_space += chunk_size;
- }
-
- if (released_space >= *required_space) {
- break;
- }
- }
-
- *required_space -= released_space;
-
- return 0;
-}
-
-static void generate_chunk_name(struct flb_input_instance *in,
- char *out_buf, int buf_size)
-{
- struct flb_time tm;
- (void) in;
-
- flb_time_get(&tm);
- snprintf(out_buf, buf_size - 1,
- "%i-%lu.%4lu.flb",
- getpid(),
- tm.tm.tv_sec, tm.tm.tv_nsec);
-}
-
-ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic)
-{
- return cio_chunk_get_content_size(ic->chunk);
-}
-
-/*
- * When chunk is set to DOWN from memory, data_size is set to 0 and
- * cio_chunk_get_content_size(1) returns the data_size. fs_chunks_size
- * is used to track the size of chunks in filesystem so we need to call
- * cio_chunk_get_real_size to return the original size in the file system
- */
-static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic)
-{
- ssize_t meta_size;
- ssize_t size;
-
- size = cio_chunk_get_real_size(ic->chunk);
-
- if (size != 0) {
- return size;
- }
-
- // Real size is not synced to chunk yet
- size = flb_input_chunk_get_size(ic);
- if (size == 0) {
- flb_debug("[input chunk] no data in the chunk %s",
- flb_input_chunk_get_name(ic));
- return -1;
- }
-
- meta_size = cio_meta_size(ic->chunk);
- size += meta_size
- /* See https://github.com/edsiper/chunkio#file-layout for more details */
- + 2 /* HEADER BYTES */
- + 4 /* CRC32 */
- + 16 /* PADDING */
- + 2; /* METADATA LENGTH BYTES */
-
- return size;
-}
-
-int flb_input_chunk_write(void *data, const char *buf, size_t len)
-{
- int ret;
- struct flb_input_chunk *ic;
-
- ic = (struct flb_input_chunk *) data;
-
- ret = cio_chunk_write(ic->chunk, buf, len);
- return ret;
-}
-
-int flb_input_chunk_write_at(void *data, off_t offset,
- const char *buf, size_t len)
-{
- int ret;
- struct flb_input_chunk *ic;
-
- ic = (struct flb_input_chunk *) data;
-
- ret = cio_chunk_write_at(ic->chunk, offset, buf, len);
- return ret;
-}
-
-static int flb_input_chunk_drop_task_route(
- struct flb_task *task,
- struct flb_output_instance *output_plugin)
-{
- int route_status;
- int result;
-
- if (task == NULL) {
- return FLB_TRUE;
- }
-
- result = FLB_TRUE;
-
- if (task->users != 0) {
- result = FLB_FALSE;
-
- if (output_plugin != NULL) {
- flb_task_acquire_lock(task);
-
- route_status = flb_task_get_route_status(task, output_plugin);
-
- if (route_status == FLB_TASK_ROUTE_INACTIVE) {
- flb_task_set_route_status(task,
- output_plugin,
- FLB_TASK_ROUTE_DROPPED);
-
- result = FLB_TRUE;
- }
-
- flb_task_release_lock(task);
- }
- }
-
- return result;
-}
-
-
-/*
- * For input_chunk referenced by an outgoing task, we need to check
- * whether the chunk is in the middle of output flush callback
- */
-static int flb_input_chunk_is_task_safe_delete(struct flb_task *task)
-{
- if (!task) {
- return FLB_TRUE;
- }
-
- if (task->users != 0) {
- return FLB_FALSE;
- }
-
- return FLB_TRUE;
-}
-
-static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic,
- struct flb_input_chunk *old_ic,
- uint64_t o_id)
-{
- /* The chunk we want to drop should not be the incoming chunk */
- if (ic == old_ic) {
- return FLB_FALSE;
- }
-
- /*
- * Even if chunks from same input plugin have same routes_mask when created,
- * the routes_mask could be modified when new chunks is ingested. Therefore,
- * we still need to do the validation on the routes_mask with o_id.
- */
- if (flb_routes_mask_get_bit(old_ic->routes_mask, o_id) == 0) {
- return FLB_FALSE;
- }
-
- return FLB_TRUE;
-}
-
-int flb_input_chunk_release_space_compound(
- struct flb_input_chunk *new_input_chunk,
- struct flb_output_instance *output_plugin,
- size_t *local_release_requirement,
- int release_local_space)
-{
- ssize_t required_space_remainder;
- struct flb_input_instance *storage_backlog_instance;
- struct flb_input_instance *input_plugin_instance;
- struct mk_list *iterator;
- int result;
-
- storage_backlog_instance = output_plugin->config->storage_input_plugin;
-
- *local_release_requirement = flb_input_chunk_get_real_size(new_input_chunk);
- required_space_remainder = (ssize_t) *local_release_requirement;
-
- if (required_space_remainder > 0) {
- result = flb_input_chunk_release_space(new_input_chunk,
- storage_backlog_instance,
- output_plugin,
- &required_space_remainder,
- FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL);
- }
-
- if (required_space_remainder > 0) {
- result = sb_release_output_queue_space(output_plugin,
- &required_space_remainder);
- }
-
- if (release_local_space) {
- if (required_space_remainder > 0) {
- result = flb_input_chunk_release_space(new_input_chunk,
- new_input_chunk->in,
- output_plugin,
- &required_space_remainder,
- FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL);
- }
- }
-
- if (required_space_remainder > 0) {
- mk_list_foreach(iterator, &output_plugin->config->inputs) {
- input_plugin_instance = \
- mk_list_entry(iterator, struct flb_input_instance, _head);
-
- if (input_plugin_instance != new_input_chunk->in) {
- result = flb_input_chunk_release_space(
- new_input_chunk,
- input_plugin_instance,
- output_plugin,
- &required_space_remainder,
- FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL);
- }
-
- if (required_space_remainder <= 0) {
- break;
- }
- }
- }
-
- if (required_space_remainder < 0) {
- required_space_remainder = 0;
- }
-
- *local_release_requirement = (size_t) required_space_remainder;
-
- (void) result;
-
- return 0;
-}
-
-/*
- * Find a slot in the output instance to append the new data with size chunk_size, it
- * will drop the the oldest chunks when the limitation on local disk is reached.
- */
-int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic,
- size_t chunk_size, int overlimit)
-{
- int count;
- int result;
- struct mk_list *head;
- struct flb_output_instance *o_ins;
- size_t local_release_requirement;
-
- /*
- * For each output instances that will be over the limit after adding the new chunk,
- * we have to determine how many chunks needs to be removed. We will adjust the
- * routes_mask to only route to the output plugin that have enough space after
- * deleting some chunks fome the queue.
- */
- count = 0;
-
- mk_list_foreach(head, &ic->in->config->outputs) {
- o_ins = mk_list_entry(head, struct flb_output_instance, _head);
-
- if ((o_ins->total_limit_size == -1) || ((1 << o_ins->id) & overlimit) == 0 ||
- (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) {
- continue;
- }
-
- local_release_requirement = 0;
-
- result = flb_input_chunk_release_space_compound(
- ic, o_ins,
- &local_release_requirement,
- FLB_TRUE);
-
- if (result != 0 ||
- local_release_requirement != 0) {
- count++;
- }
- }
-
- if (count != 0) {
- flb_error("[input chunk] fail to drop enough chunks in order to place new data");
- exit(0);
- }
-
- return 0;
-}
-
-/*
- * Returns a non-zero result if any output instances will reach the limit
- * after buffering the new data
- */
-int flb_input_chunk_has_overlimit_routes(struct flb_input_chunk *ic,
- size_t chunk_size)
-{
- int overlimit = 0;
- struct mk_list *head;
- struct flb_output_instance *o_ins;
-
- mk_list_foreach(head, &ic->in->config->outputs) {
- o_ins = mk_list_entry(head, struct flb_output_instance, _head);
-
- if ((o_ins->total_limit_size == -1) ||
- (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) {
- continue;
- }
-
- FS_CHUNK_SIZE_DEBUG(o_ins);
- flb_debug("[input chunk] chunk %s required %ld bytes and %ld bytes left "
- "in plugin %s", flb_input_chunk_get_name(ic), chunk_size,
- o_ins->total_limit_size -
- o_ins->fs_backlog_chunks_size -
- o_ins->fs_chunks_size,
- o_ins->name);
-
- if ((o_ins->fs_chunks_size +
- o_ins->fs_backlog_chunks_size +
- chunk_size) > o_ins->total_limit_size) {
- overlimit |= (1 << o_ins->id);
- }
- }
-
- return overlimit;
-}
-
-/* Find a slot for the incoming data to buffer it in local file system
- * returns 0 if none of the routes can be written to
- */
-int flb_input_chunk_place_new_chunk(struct flb_input_chunk *ic, size_t chunk_size)
-{
- int overlimit;
- overlimit = flb_input_chunk_has_overlimit_routes(ic, chunk_size);
- if (overlimit != 0) {
- flb_input_chunk_find_space_new_data(ic, chunk_size, overlimit);
- }
-
- return !flb_routes_mask_is_empty(ic->routes_mask);
-}
-
-/* Create an input chunk using a Chunk I/O */
-struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in,
- int event_type,
- void *chunk)
-{
- int records = 0;
- int tag_len;
- int has_routes;
- int ret;
- uint64_t ts;
- char *buf_data;
- size_t buf_size;
- size_t offset;
- ssize_t bytes;
- const char *tag_buf;
- struct flb_input_chunk *ic;
-
- /* Create context for the input instance */
- ic = flb_calloc(1, sizeof(struct flb_input_chunk));
- if (!ic) {
- flb_errno();
- return NULL;
- }
- ic->event_type = event_type;
- ic->busy = FLB_FALSE;
- ic->fs_counted = FLB_FALSE;
- ic->fs_backlog = FLB_TRUE;
- ic->chunk = chunk;
- ic->in = in;
- msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write);
-
- ret = cio_chunk_get_content(ic->chunk, &buf_data, &buf_size);
- if (ret != CIO_OK) {
- flb_error("[input chunk] error retrieving content for metrics");
- flb_free(ic);
- return NULL;
- }
-
- if (ic->event_type == FLB_INPUT_LOGS) {
- /* Validate records in the chunk */
- ret = flb_mp_validate_log_chunk(buf_data, buf_size, &records, &offset);
- if (ret == -1) {
- /* If there are valid records, truncate the chunk size */
- if (records <= 0) {
- flb_plg_error(in,
- "chunk validation failed, data might be corrupted. "
- "No valid records found, the chunk will be discarded.");
- flb_free(ic);
- return NULL;
- }
- if (records > 0 && offset > 32) {
- flb_plg_warn(in,
- "chunk validation failed, data might be corrupted. "
- "Found %d valid records, failed content starts "
- "right after byte %lu. Recovering valid records.",
- records, offset);
-
- /* truncate the chunk to recover valid records */
- cio_chunk_write_at(chunk, offset, NULL, 0);
- }
- else {
- flb_plg_error(in,
- "chunk validation failed, data might be corrupted. "
- "Found %d valid records, failed content starts "
- "right after byte %lu. Cannot recover chunk,",
- records, offset);
- flb_free(ic);
- return NULL;
- }
- }
- }
- else if (ic->event_type == FLB_INPUT_METRICS) {
- ret = flb_mp_validate_metric_chunk(buf_data, buf_size, &records, &offset);
- if (ret == -1) {
- if (records <= 0) {
- flb_plg_error(in,
- "metrics chunk validation failed, data might be corrupted. "
- "No valid records found, the chunk will be discarded.");
- flb_free(ic);
- return NULL;
- }
- if (records > 0 && offset > 32) {
- flb_plg_warn(in,
- "metrics chunk validation failed, data might be corrupted. "
- "Found %d valid records, failed content starts "
- "right after byte %lu. Recovering valid records.",
- records, offset);
-
- /* truncate the chunk to recover valid records */
- cio_chunk_write_at(chunk, offset, NULL, 0);
- }
- else {
- flb_plg_error(in,
- "metrics chunk validation failed, data might be corrupted. "
- "Found %d valid records, failed content starts "
- "right after byte %lu. Cannot recover chunk,",
- records, offset);
- flb_free(ic);
- return NULL;
- }
-
- }
- }
- else if (ic->event_type == FLB_INPUT_TRACES) {
-
- }
-
- /* Skip chunks without content data */
- if (records == 0) {
- flb_plg_error(in,
- "chunk validation failed, data might be corrupted. "
- "No valid records found, the chunk will be discarded.");
- flb_free(ic);
- return NULL;
- }
-
- /*
- * If the content is valid and the chunk has extra padding zeros, just
- * perform an adjustment.
- */
- bytes = cio_chunk_get_content_size(chunk);
- if (bytes == -1) {
- flb_free(ic);
- return NULL;
- }
- if (offset < bytes) {
- cio_chunk_write_at(chunk, offset, NULL, 0);
- }
-
- /* Update metrics */
-#ifdef FLB_HAVE_METRICS
- ic->total_records = records;
- if (ic->total_records > 0) {
- /* timestamp */
- ts = cfl_time_now();
-
- /* fluentbit_input_records_total */
- cmt_counter_add(in->cmt_records, ts, ic->total_records,
- 1, (char *[]) {(char *) flb_input_name(in)});
-
- /* fluentbit_input_bytes_total */
- cmt_counter_add(in->cmt_bytes, ts, buf_size,
- 1, (char *[]) {(char *) flb_input_name(in)});
-
- /* OLD metrics */
- flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->total_records, in->metrics);
- flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
- }
-#endif
-
- /* Get the the tag reference (chunk metadata) */
- ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len);
- if (ret == -1) {
- flb_error("[input chunk] error retrieving tag of input chunk");
- flb_free(ic);
- return NULL;
- }
-
- bytes = flb_input_chunk_get_real_size(ic);
- if (bytes < 0) {
- flb_warn("[input chunk] could not retrieve chunk real size");
- flb_free(ic);
- return NULL;
- }
-
- has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag_buf, tag_len, in);
- if (has_routes == 0) {
- flb_warn("[input chunk] no matching route for backoff log chunk %s",
- flb_input_chunk_get_name(ic));
- }
-
- mk_list_add(&ic->_head, &in->chunks);
-
- flb_input_chunk_update_output_instances(ic, bytes);
-
- return ic;
-}
-
-static int input_chunk_write_header(struct cio_chunk *chunk, int event_type,
- char *tag, int tag_len)
-
-{
- int ret;
- int meta_size;
- char *meta;
-
- /*
- * Prepare the Chunk metadata header
- * ----------------------------------
- * m[0] = FLB_INPUT_CHUNK_MAGIC_BYTE_0
- * m[1] = FLB_INPUT_CHUNK_MAGIC_BYTE_1
- * m[2] = type (FLB_INPUT_CHUNK_TYPE_LOG or FLB_INPUT_CHUNK_TYPE_METRIC or FLB_INPUT_CHUNK_TYPE_TRACE
- * m[3] = 0 (unused for now)
- */
-
- /* write metadata (tag) */
- if (tag_len > (65535 - FLB_INPUT_CHUNK_META_HEADER)) {
- /* truncate length */
- tag_len = 65535 - FLB_INPUT_CHUNK_META_HEADER;
- }
- meta_size = FLB_INPUT_CHUNK_META_HEADER + tag_len;
-
- /* Allocate buffer for metadata header */
- meta = flb_calloc(1, meta_size);
- if (!meta) {
- flb_errno();
- return -1;
- }
-
- /*
- * Write chunk header in a temporary buffer
- * ----------------------------------------
- */
-
- /* magic bytes */
- meta[0] = FLB_INPUT_CHUNK_MAGIC_BYTE_0;
- meta[1] = FLB_INPUT_CHUNK_MAGIC_BYTE_1;
-
- /* event type */
- if (event_type == FLB_INPUT_LOGS) {
- meta[2] = FLB_INPUT_CHUNK_TYPE_LOGS;
- }
- else if (event_type == FLB_INPUT_METRICS) {
- meta[2] = FLB_INPUT_CHUNK_TYPE_METRICS;
- }
- else if (event_type == FLB_INPUT_TRACES) {
- meta[2] = FLB_INPUT_CHUNK_TYPE_TRACES;
- }
-
- /* unused byte */
- meta[3] = 0;
-
- /* copy the tag after magic bytes */
- memcpy(meta + FLB_INPUT_CHUNK_META_HEADER, tag, tag_len);
-
- /* Write tag into metadata section */
- ret = cio_meta_write(chunk, (char *) meta, meta_size);
- if (ret == -1) {
- flb_error("[input chunk] could not write metadata");
- flb_free(meta);
- return -1;
- }
- flb_free(meta);
-
- return 0;
-}
-
-struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type,
- const char *tag, int tag_len)
-{
- int ret;
- int err;
- int set_down = FLB_FALSE;
- int has_routes;
- char name[64];
- struct cio_chunk *chunk;
- struct flb_storage_input *storage;
- struct flb_input_chunk *ic;
-
- storage = in->storage;
-
- /* chunk name */
- generate_chunk_name(in, name, sizeof(name) - 1);
-
- /* open/create target chunk file */
- chunk = cio_chunk_open(storage->cio, storage->stream, name,
- CIO_OPEN, FLB_INPUT_CHUNK_SIZE, &err);
- if (!chunk) {
- flb_error("[input chunk] could not create chunk file: %s:%s",
- storage->stream->name, name);
- return NULL;
- }
- /*
- * If the returned chunk at open is 'down', just put it up, write the
- * content and set it down again.
- */
- ret = cio_chunk_is_up(chunk);
- if (ret == CIO_FALSE) {
- ret = cio_chunk_up_force(chunk);
- if (ret == -1) {
- cio_chunk_close(chunk, CIO_TRUE);
- return NULL;
- }
- set_down = FLB_TRUE;
- }
-
- /* Write chunk header */
- ret = input_chunk_write_header(chunk, event_type, (char *) tag, tag_len);
- if (ret == -1) {
- cio_chunk_close(chunk, CIO_TRUE);
- return NULL;
- }
-
- /* Create context for the input instance */
- ic = flb_calloc(1, sizeof(struct flb_input_chunk));
- if (!ic) {
- flb_errno();
- cio_chunk_close(chunk, CIO_TRUE);
- return NULL;
- }
-
- /*
- * Check chunk content type to be created: depending of the value set by
- * the input plugin, this can be FLB_INPUT_LOGS, FLB_INPUT_METRICS or
- * FLB_INPUT_TRACES.
- */
- ic->event_type = event_type;
- ic->busy = FLB_FALSE;
- ic->fs_counted = FLB_FALSE;
- ic->chunk = chunk;
- ic->fs_backlog = FLB_FALSE;
- ic->in = in;
- ic->stream_off = 0;
- ic->task = NULL;
-#ifdef FLB_HAVE_METRICS
- ic->total_records = 0;
-#endif
-
- /* Calculate the routes_mask for the input chunk */
- has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag, tag_len, in);
- if (has_routes == 0) {
- flb_trace("[input chunk] no matching route for input chunk '%s' with tag '%s'",
- flb_input_chunk_get_name(ic), tag);
- }
-
- msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write);
- mk_list_add(&ic->_head, &in->chunks);
-
- if (set_down == FLB_TRUE) {
- cio_chunk_down(chunk);
- }
-
- if (event_type == FLB_INPUT_LOGS) {
- flb_hash_table_add(in->ht_log_chunks, tag, tag_len, ic, 0);
- }
- else if (event_type == FLB_INPUT_METRICS) {
- flb_hash_table_add(in->ht_metric_chunks, tag, tag_len, ic, 0);
- }
- else if (event_type == FLB_INPUT_TRACES) {
- flb_hash_table_add(in->ht_trace_chunks, tag, tag_len, ic, 0);
- }
-
- return ic;
-}
-
-int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic,
- const char *tag_buf, int tag_len,
- int del)
-{
- ssize_t bytes;
- struct mk_list *head;
- struct flb_output_instance *o_ins;
-
- mk_list_foreach(head, &ic->in->config->outputs) {
- o_ins = mk_list_entry(head, struct flb_output_instance, _head);
-
- if (o_ins->total_limit_size == -1) {
- continue;
- }
-
- bytes = flb_input_chunk_get_real_size(ic);
- if (bytes == -1) {
- // no data in the chunk
- continue;
- }
-
- if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) {
- if (ic->fs_counted == FLB_TRUE) {
- FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes);
- o_ins->fs_chunks_size -= bytes;
- flb_debug("[input chunk] remove chunk %s with %ld bytes from plugin %s, "
- "the updated fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic),
- bytes, o_ins->name, o_ins->fs_chunks_size);
- }
- }
- }
-
- if (del == CIO_TRUE && tag_buf) {
- /*
- * "TRY" to delete any reference to this chunk ('ic') from the hash
- * table. Note that maybe the value is not longer available in the
- * entries if it was replaced: note that we always keep the last
- * chunk for a specific Tag.
- */
- if (ic->event_type == FLB_INPUT_LOGS) {
- flb_hash_table_del_ptr(ic->in->ht_log_chunks,
- tag_buf, tag_len, (void *) ic);
- }
- else if (ic->event_type == FLB_INPUT_METRICS) {
- flb_hash_table_del_ptr(ic->in->ht_metric_chunks,
- tag_buf, tag_len, (void *) ic);
- }
- else if (ic->event_type == FLB_INPUT_TRACES) {
- flb_hash_table_del_ptr(ic->in->ht_trace_chunks,
- tag_buf, tag_len, (void *) ic);
- }
- }
-
-#ifdef FLB_HAVE_CHUNK_TRACE
- if (ic->trace != NULL) {
- flb_chunk_trace_destroy(ic->trace);
- }
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
- cio_chunk_close(ic->chunk, del);
- mk_list_del(&ic->_head);
- flb_free(ic);
-
- return 0;
-}
-
-
-int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del)
-{
- int tag_len;
- int ret;
- ssize_t bytes;
- const char *tag_buf = NULL;
- struct mk_list *head;
- struct flb_output_instance *o_ins;
-
- if (flb_input_chunk_is_up(ic) == FLB_FALSE) {
- flb_input_chunk_set_up(ic);
- }
-
- mk_list_foreach(head, &ic->in->config->outputs) {
- o_ins = mk_list_entry(head, struct flb_output_instance, _head);
-
- if (o_ins->total_limit_size == -1) {
- continue;
- }
-
- bytes = flb_input_chunk_get_real_size(ic);
- if (bytes == -1) {
- // no data in the chunk
- continue;
- }
-
- if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) {
- if (ic->fs_counted == FLB_TRUE) {
- FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes);
- o_ins->fs_chunks_size -= bytes;
- flb_debug("[input chunk] remove chunk %s with %ld bytes from plugin %s, "
- "the updated fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic),
- bytes, o_ins->name, o_ins->fs_chunks_size);
- }
- }
- }
-
- /*
- * When a chunk is going to be destroyed, this can be in a down state,
- * since the next step is to retrieve the Tag we need to have the
- * content up.
- */
- ret = flb_input_chunk_is_up(ic);
- if (ret == FLB_FALSE) {
- ret = cio_chunk_up_force(ic->chunk);
- if (ret == -1) {
- flb_error("[input chunk] cannot load chunk: %s",
- flb_input_chunk_get_name(ic));
- }
- }
-
- /* Retrieve Tag */
- ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len);
- if (ret == -1) {
- flb_trace("[input chunk] could not retrieve chunk tag: %s",
- flb_input_chunk_get_name(ic));
- }
-
- if (del == CIO_TRUE && tag_buf) {
- /*
- * "TRY" to delete any reference to this chunk ('ic') from the hash
- * table. Note that maybe the value is not longer available in the
- * entries if it was replaced: note that we always keep the last
- * chunk for a specific Tag.
- */
- if (ic->event_type == FLB_INPUT_LOGS) {
- flb_hash_table_del_ptr(ic->in->ht_log_chunks,
- tag_buf, tag_len, (void *) ic);
- }
- else if (ic->event_type == FLB_INPUT_METRICS) {
- flb_hash_table_del_ptr(ic->in->ht_metric_chunks,
- tag_buf, tag_len, (void *) ic);
- }
- else if (ic->event_type == FLB_INPUT_TRACES) {
- flb_hash_table_del_ptr(ic->in->ht_trace_chunks,
- tag_buf, tag_len, (void *) ic);
- }
- }
-
-#ifdef FLB_HAVE_CHUNK_TRACE
- if (ic->trace != NULL) {
- flb_chunk_trace_destroy(ic->trace);
- }
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
- cio_chunk_close(ic->chunk, del);
- mk_list_del(&ic->_head);
- flb_free(ic);
-
- return 0;
-}
-
-/* Return or create an available chunk to write data */
-static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
- int event_type,
- const char *tag, int tag_len,
- size_t chunk_size, int *set_down)
-{
- int id = -1;
- int ret;
- int new_chunk = FLB_FALSE;
- size_t out_size;
- struct flb_input_chunk *ic = NULL;
-
- if (tag_len > FLB_INPUT_CHUNK_TAG_MAX) {
- flb_plg_warn(in,
- "Tag set exceeds limit, truncating from %i to %i bytes",
- tag_len, FLB_INPUT_CHUNK_TAG_MAX);
- tag_len = FLB_INPUT_CHUNK_TAG_MAX;
- }
-
- if (event_type == FLB_INPUT_LOGS) {
- id = flb_hash_table_get(in->ht_log_chunks, tag, tag_len,
- (void *) &ic, &out_size);
- }
- else if (event_type == FLB_INPUT_METRICS) {
- id = flb_hash_table_get(in->ht_metric_chunks, tag, tag_len,
- (void *) &ic, &out_size);
- }
- else if (event_type == FLB_INPUT_TRACES) {
- id = flb_hash_table_get(in->ht_trace_chunks, tag, tag_len,
- (void *) &ic, &out_size);
- }
-
- if (id >= 0) {
- if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk)) {
- ic = NULL;
- }
- else if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
- ret = cio_chunk_up_force(ic->chunk);
-
- if (ret == CIO_CORRUPTED) {
- if (in->config->storage_del_bad_chunks) {
- /* If the chunk is corrupted we need to discard it and
- * set ic to NULL so the system tries to allocate a new
- * chunk.
- */
-
- flb_error("[input chunk] discarding corrupted chunk");
- }
-
- flb_input_chunk_destroy_corrupted(ic,
- tag, tag_len,
- in->config->storage_del_bad_chunks);
-
- ic = NULL;
- }
- else if (ret != CIO_OK) {
- ic = NULL;
- }
-
- *set_down = FLB_TRUE;
- }
- }
-
- /* No chunk was found, we need to create a new one */
- if (!ic) {
- ic = flb_input_chunk_create(in, event_type, (char *) tag, tag_len);
- new_chunk = FLB_TRUE;
- if (!ic) {
- return NULL;
- }
- ic->event_type = event_type;
- }
-
- /*
- * If buffering this block of data will exceed one of the limit among all output instances
- * that the chunk will flush to, we need to modify the routes_mask of the oldest chunks
- * (based in creation time) to get enough space for the incoming chunk.
- */
- if (!flb_routes_mask_is_empty(ic->routes_mask)
- && flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) {
- /*
- * If the chunk is not newly created, the chunk might already have logs inside.
- * We cannot delete (reused) chunks here.
- * If the routes_mask is cleared after trying to append new data, we destroy
- * the chunk.
- */
- if (new_chunk || flb_routes_mask_is_empty(ic->routes_mask) == FLB_TRUE) {
- flb_input_chunk_destroy(ic, FLB_TRUE);
- }
- return NULL;
- }
-
- return ic;
-}
-
-static inline int flb_input_chunk_is_mem_overlimit(struct flb_input_instance *i)
-{
- if (i->mem_buf_limit <= 0) {
- return FLB_FALSE;
- }
-
- if (i->mem_chunks_size >= i->mem_buf_limit) {
- return FLB_TRUE;
- }
-
- return FLB_FALSE;
-}
-
-static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance *i)
-{
- struct flb_storage_input *storage = (struct flb_storage_input *)i->storage;
-
- if (storage->type == FLB_STORAGE_FS) {
- if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) {
- if (storage->cio->total_chunks_up >= storage->cio->max_chunks_up) {
- return FLB_TRUE;
- }
- }
- }
-
- return FLB_FALSE;
-}
-
-/*
- * Check all chunks associated to the input instance and summarize
- * the number of bytes in use.
- */
-size_t flb_input_chunk_total_size(struct flb_input_instance *in)
-{
- size_t total = 0;
- struct flb_storage_input *storage;
-
- storage = (struct flb_storage_input *) in->storage;
- total = cio_stream_size_chunks_up(storage->stream);
- return total;
-}
-
-/*
- * Count and update the number of bytes being used by the instance. Also
- * check if the instance is paused, if so, check if it can be resumed if
- * is not longer over the limits.
- *
- * It always returns the number of bytes in use.
- */
-size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
-{
- size_t total;
-
- /* Gather total number of enqueued bytes */
- total = flb_input_chunk_total_size(in);
-
- /* Register the total into the context variable */
- in->mem_chunks_size = total;
-
- /*
- * After the adjustments, validate if the plugin is overlimit or paused
- * and perform further adjustments.
- */
- if (flb_input_chunk_is_mem_overlimit(in) == FLB_FALSE &&
- in->config->is_running == FLB_TRUE &&
- in->config->is_ingestion_active == FLB_TRUE &&
- in->mem_buf_status == FLB_INPUT_PAUSED) {
- in->mem_buf_status = FLB_INPUT_RUNNING;
- if (in->p->cb_resume) {
- flb_input_resume(in);
- flb_info("[input] %s resume (mem buf overlimit)",
- in->name);
- }
- }
- if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE &&
- in->config->is_running == FLB_TRUE &&
- in->config->is_ingestion_active == FLB_TRUE &&
- in->storage_buf_status == FLB_INPUT_PAUSED) {
- in->storage_buf_status = FLB_INPUT_RUNNING;
- if (in->p->cb_resume) {
- flb_input_resume(in);
- flb_info("[input] %s resume (storage buf overlimit %zu/%zu)",
- in->name,
- ((struct flb_storage_input *)in->storage)->cio->total_chunks_up,
- ((struct flb_storage_input *)in->storage)->cio->max_chunks_up);
- }
- }
-
- return total;
-}
-
-/*
- * If the number of bytes in use by the chunks are over the imposed limit
- * by configuration, pause the instance.
- */
-static inline int flb_input_chunk_protect(struct flb_input_instance *i)
-{
- struct flb_storage_input *storage = i->storage;
-
- if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) {
- flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)",
- i->name,
- storage->cio->total_chunks_up,
- storage->cio->max_chunks_up);
- flb_input_pause(i);
- i->storage_buf_status = FLB_INPUT_PAUSED;
- return FLB_TRUE;
- }
-
- if (storage->type == FLB_STORAGE_FS) {
- return FLB_FALSE;
- }
-
- if (flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) {
- /*
- * if the plugin is already overlimit and the strategy is based on
- * a memory-ring-buffer logic, do not pause the plugin, upon next
- * try of ingestion 'memrb' will make sure to release some bytes.
- */
- if (i->storage_type == FLB_STORAGE_MEMRB) {
- return FLB_FALSE;
- }
-
- /*
- * The plugin is using 'memory' buffering only and already reached
- * it limit, just pause the ingestion.
- */
- flb_warn("[input] %s paused (mem buf overlimit)",
- i->name);
- flb_input_pause(i);
- i->mem_buf_status = FLB_INPUT_PAUSED;
- return FLB_TRUE;
- }
-
- return FLB_FALSE;
-}
-
-/*
- * Validate if the chunk coming from the input plugin based on config and
- * resources usage must be 'up' or 'down' (applicable for filesystem storage
- * type).
- *
- * FIXME: can we find a better name for this function ?
- */
-int flb_input_chunk_set_up_down(struct flb_input_chunk *ic)
-{
- size_t total;
- struct flb_input_instance *in;
-
- in = ic->in;
-
- /* Gather total number of enqueued bytes */
- total = flb_input_chunk_total_size(in);
-
- /* Register the total into the context variable */
- in->mem_chunks_size = total;
-
- if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE) {
- if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
- cio_chunk_down(ic->chunk);
-
- /* Adjust new counters */
- total = flb_input_chunk_total_size(ic->in);
- in->mem_chunks_size = total;
-
- return FLB_FALSE;
- }
- }
-
- return FLB_TRUE;
-}
-
-int flb_input_chunk_is_up(struct flb_input_chunk *ic)
-{
- return cio_chunk_is_up(ic->chunk);
-}
-
-int flb_input_chunk_down(struct flb_input_chunk *ic)
-{
- if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
- return cio_chunk_down(ic->chunk);
- }
-
- return 0;
-}
-
-int flb_input_chunk_set_up(struct flb_input_chunk *ic)
-{
- if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
- return cio_chunk_up(ic->chunk);
- }
-
- return 0;
-}
-
-static int memrb_input_chunk_release_space(struct flb_input_instance *ins,
- size_t required_space,
- size_t *dropped_chunks, size_t *dropped_bytes)
-{
- int ret;
- int released;
- size_t removed_chunks = 0;
- ssize_t chunk_size;
- ssize_t released_space = 0;
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_input_chunk *ic;
-
- mk_list_foreach_safe(head, tmp, &ins->chunks) {
- ic = mk_list_entry(head, struct flb_input_chunk, _head);
-
- /* check if is there any task or no users associated */
- ret = flb_input_chunk_is_task_safe_delete(ic->task);
- if (ret == FLB_FALSE) {
- continue;
- }
-
- /* get chunk size */
- chunk_size = flb_input_chunk_get_real_size(ic);
-
- released = FLB_FALSE;
- if (ic->task != NULL) {
- if (ic->task->users == 0) {
- flb_task_destroy(ic->task, FLB_TRUE);
- released = FLB_TRUE;
- }
- }
- else {
- flb_input_chunk_destroy(ic, FLB_TRUE);
- released = FLB_TRUE;
- }
-
- if (released) {
- released_space += chunk_size;
- removed_chunks++;
- }
-
- if (released_space >= required_space) {
- break;
- }
- }
-
- /* no matter if we succeeded or not, set the counters */
- *dropped_bytes = released_space;
- *dropped_chunks = removed_chunks;
-
- /* set the final status of the operation */
- if (released_space >= required_space) {
- return 0;
- }
-
- return -1;
-}
-
-/* Append a RAW MessagPack buffer to the input instance */
-static int input_chunk_append_raw(struct flb_input_instance *in,
- int event_type,
- size_t n_records,
- const char *tag, size_t tag_len,
- const void *buf, size_t buf_size)
-{
- int ret;
- int set_down = FLB_FALSE;
- int min;
- int new_chunk = FLB_FALSE;
- uint64_t ts;
- char *name;
- size_t dropped_chunks;
- size_t dropped_bytes;
- size_t content_size;
- size_t real_diff;
- size_t real_size;
- size_t pre_real_size;
- struct flb_input_chunk *ic;
- struct flb_storage_input *si;
-
- /* memory ring-buffer checker */
- if (in->storage_type == FLB_STORAGE_MEMRB) {
- /* check if we are overlimit */
- ret = flb_input_chunk_is_mem_overlimit(in);
- if (ret) {
- /* reset counters */
- dropped_chunks = 0;
- dropped_bytes = 0;
-
- /* try to release 'buf_size' */
- ret = memrb_input_chunk_release_space(in, buf_size,
- &dropped_chunks, &dropped_bytes);
-
- /* update metrics if required */
- if (dropped_chunks > 0 || dropped_bytes > 0) {
- /* timestamp and input plugin name for label */
- ts = cfl_time_now();
- name = (char *) flb_input_name(in);
-
- /* update counters */
- cmt_counter_add(in->cmt_memrb_dropped_chunks, ts,
- dropped_chunks, 1, (char *[]) {name});
-
- cmt_counter_add(in->cmt_memrb_dropped_bytes, ts,
- dropped_bytes, 1, (char *[]) {name});
- }
-
- if (ret != 0) {
- /* we could not allocate the required space, just return */
- return -1;
- }
- }
- }
-
- /* Check if the input plugin has been paused */
- if (flb_input_buf_paused(in) == FLB_TRUE) {
- flb_debug("[input chunk] %s is paused, cannot append records",
- in->name);
- return -1;
- }
-
- if (buf_size == 0) {
- flb_debug("[input chunk] skip ingesting data with 0 bytes");
- return -1;
- }
-
- /*
- * Some callers might not set a custom tag, on that case just inherit
- * the fixed instance tag or instance name.
- */
- if (!tag) {
- if (in->tag && in->tag_len > 0) {
- tag = in->tag;
- tag_len = in->tag_len;
- }
- else {
- tag = in->name;
- tag_len = strlen(in->name);
- }
- }
-
- /*
- * Get a target input chunk, can be one with remaining space available
- * or a new one.
- */
- ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down);
- if (!ic) {
- flb_error("[input chunk] no available chunk");
- return -1;
- }
-
- /* newly created chunk */
- if (flb_input_chunk_get_size(ic) == 0) {
- new_chunk = FLB_TRUE;
- }
-
- /* We got the chunk, validate if is 'up' or 'down' */
- ret = flb_input_chunk_is_up(ic);
- if (ret == FLB_FALSE) {
- ret = cio_chunk_up_force(ic->chunk);
- if (ret == -1) {
- flb_error("[input chunk] cannot retrieve temporary chunk");
- return -1;
- }
- set_down = FLB_TRUE;
- }
-
- /*
- * Keep the previous real size to calculate the real size
- * difference for flb_input_chunk_update_output_instances(),
- * use 0 when the chunk is new since it's size will never
- * have been calculated before.
- */
- if (new_chunk == FLB_TRUE) {
- pre_real_size = 0;
- }
- else {
- pre_real_size = flb_input_chunk_get_real_size(ic);
- }
-
- /* Write the new data */
- ret = flb_input_chunk_write(ic, buf, buf_size);
- if (ret == -1) {
- flb_error("[input chunk] error writing data from %s instance",
- in->name);
- cio_chunk_tx_rollback(ic->chunk);
- return -1;
- }
-
-#ifdef FLB_HAVE_CHUNK_TRACE
- flb_chunk_trace_do_input(ic);
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
- /* Update 'input' metrics */
-#ifdef FLB_HAVE_METRICS
- if (ret == CIO_OK) {
- ic->added_records = n_records;
- ic->total_records += n_records;
- }
-
- if (ic->total_records > 0) {
- /* timestamp */
- ts = cfl_time_now();
-
- /* fluentbit_input_records_total */
- cmt_counter_add(in->cmt_records, ts, ic->added_records,
- 1, (char *[]) {(char *) flb_input_name(in)});
-
- /* fluentbit_input_bytes_total */
- cmt_counter_add(in->cmt_bytes, ts, buf_size,
- 1, (char *[]) {(char *) flb_input_name(in)});
-
- /* OLD api */
- flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics);
- flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
- }
-#endif
-
- /* Apply filters */
- if (event_type == FLB_INPUT_LOGS) {
- flb_filter_do(ic,
- buf, buf_size,
- tag, tag_len, in->config);
- }
-
- /* get the chunks content size */
- content_size = cio_chunk_get_content_size(ic->chunk);
-
- /*
- * There is a case that rewrite_tag will modify the tag and keep rule is set
- * to drop the original record. The original record will still go through the
- * flb_input_chunk_update_output_instances(2) to update the fs_chunks_size by
- * metadata bytes (consisted by metadata bytes of the file chunk). This condition
- * sets the diff to 0 in order to not update the fs_chunks_size.
- */
- if (flb_input_chunk_get_size(ic) == 0) {
- real_diff = 0;
- }
-
- /* Lock buffers where size > 2MB */
- if (content_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) {
- cio_chunk_lock(ic->chunk);
- }
-
- /* Make sure the data was not filtered out and the buffer size is zero */
- if (content_size == 0) {
- flb_input_chunk_destroy(ic, FLB_TRUE);
- flb_input_chunk_set_limits(in);
- return 0;
- }
-#ifdef FLB_HAVE_STREAM_PROCESSOR
- else if (in->config->stream_processor_ctx &&
- ic->event_type == FLB_INPUT_LOGS) {
- char *c_data;
- size_t c_size;
-
- /* Retrieve chunk (filtered) output content */
- cio_chunk_get_content(ic->chunk, &c_data, &c_size);
-
- /* Invoke stream processor */
- flb_sp_do(in->config->stream_processor_ctx,
- in,
- tag, tag_len,
- c_data + ic->stream_off, c_size - ic->stream_off);
- ic->stream_off += (c_size - ic->stream_off);
- }
-#endif
-
- if (set_down == FLB_TRUE) {
- cio_chunk_down(ic->chunk);
- }
-
- /*
- * If the instance is not routable, there is no need to keep the
- * content in the storage engine, just get rid of it.
- */
- if (in->routable == FLB_FALSE) {
- flb_input_chunk_destroy(ic, FLB_TRUE);
- return 0;
- }
-
- /* Update memory counters and adjust limits if any */
- flb_input_chunk_set_limits(in);
-
- /*
- * Check if we are overlimit and validate if is there any filesystem
- * storage type asociated to this input instance, if so, unload the
- * chunk content from memory to respect imposed limits.
- *
- * Calling cio_chunk_down() the memory map associated and the file
- * descriptor will be released. At any later time, it must be bring up
- * for I/O operations.
- */
- si = (struct flb_storage_input *) in->storage;
- if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE &&
- si->type == FLB_STORAGE_FS) {
- if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
- /*
- * If we are already over limit, a sub-sequent data ingestion
- * might need a Chunk to write data in. As an optimization we
- * will put this Chunk down ONLY IF it has less than 1% of
- * it capacity as available space, otherwise keep it 'up' so
- * it available space can be used.
- */
- content_size = cio_chunk_get_content_size(ic->chunk);
-
- /* Do we have less than 1% available ? */
- min = (FLB_INPUT_CHUNK_FS_MAX_SIZE * 0.01);
- if (FLB_INPUT_CHUNK_FS_MAX_SIZE - content_size < min) {
- cio_chunk_down(ic->chunk);
- }
- }
- }
-
- real_size = flb_input_chunk_get_real_size(ic);
- real_diff = real_size - pre_real_size;
- if (real_diff != 0) {
- flb_debug("[input chunk] update output instances with new chunk size diff=%zd, records=%zu, input=%s",
- real_diff, n_records, flb_input_name(in));
- flb_input_chunk_update_output_instances(ic, real_diff);
- }
-
-#ifdef FLB_HAVE_CHUNK_TRACE
- if (ic->trace) {
- flb_chunk_trace_pre_output(ic->trace);
- }
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
- flb_input_chunk_protect(in);
- return 0;
-}
-
-static void destroy_chunk_raw(struct input_chunk_raw *cr)
-{
- if (cr->buf_data) {
- flb_free(cr->buf_data);
- }
-
- if (cr->tag) {
- flb_sds_destroy(cr->tag);
- }
-
- flb_free(cr);
-}
-
-static int append_to_ring_buffer(struct flb_input_instance *ins,
- int event_type,
- size_t records,
- const char *tag,
- size_t tag_len,
- const void *buf,
- size_t buf_size)
-
-{
- int ret;
- int retries = 0;
- int retry_limit = 10;
- struct input_chunk_raw *cr;
-
- cr = flb_calloc(1, sizeof(struct input_chunk_raw));
- if (!cr) {
- flb_errno();
- return -1;
- }
- cr->ins = ins;
- cr->event_type = event_type;
-
- if (tag && tag_len > 0) {
- cr->tag = flb_sds_create_len(tag, tag_len);
- if (!cr->tag) {
- flb_free(cr);
- return -1;
- }
- }
- else {
- cr->tag = NULL;
- }
-
- cr->records = records;
- cr->buf_data = flb_malloc(buf_size);
- if (!cr->buf_data) {
- flb_errno();
- destroy_chunk_raw(cr);
- return -1;
- }
-
- /*
- * this memory copy is just a simple overhead, the problem we have is that
- * input instances always assume that they have to release their buffer since
- * the append raw operation already did a copy. Not a big issue but maybe this
- * is a tradeoff...
- */
- memcpy(cr->buf_data, buf, buf_size);
- cr->buf_size = buf_size;
-
-
-
-retry:
- /*
- * There is a little chance that the ring buffer is full or due to saturation
- * from the main thread the data is not being consumed. On this scenario we
- * retry up to 'retry_limit' times with a little wait time.
- */
- if (retries >= retry_limit) {
- flb_plg_error(ins, "could not enqueue records into the ring buffer");
- destroy_chunk_raw(cr);
- return -1;
- }
-
- /* append chunk raw context to the ring buffer */
- ret = flb_ring_buffer_write(ins->rb, (void *) &cr, sizeof(cr));
- if (ret == -1) {
- flb_plg_debug(ins, "failed buffer write, retries=%i\n",
- retries);
-
- /* sleep for 100000 microseconds (100 milliseconds) */
- usleep(100000);
- retries++;
- goto retry;
- }
-
- return 0;
-}
-
-/* iterate input instance ring buffer and remove any enqueued input_chunk_raw */
-void flb_input_chunk_ring_buffer_cleanup(struct flb_input_instance *ins)
-{
- int ret;
- struct input_chunk_raw *cr;
-
- if (!ins->rb) {
- return;
- }
-
- while ((ret = flb_ring_buffer_read(ins->rb, (void *) &cr, sizeof(cr))) == 0) {
- if (cr) {
- destroy_chunk_raw(cr);
- cr = NULL;
- }
- }
-}
-
-void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data)
-{
- int ret;
- int tag_len = 0;
- struct mk_list *head;
- struct flb_input_instance *ins;
- struct input_chunk_raw *cr;
-
- mk_list_foreach(head, &ctx->inputs) {
- ins = mk_list_entry(head, struct flb_input_instance, _head);
- cr = NULL;
-
- while (1) {
- if (flb_input_buf_paused(ins) == FLB_TRUE) {
- break;
- }
-
- ret = flb_ring_buffer_read(ins->rb,
- (void *) &cr,
- sizeof(cr));
- if (ret != 0) {
- break;
- }
-
- if (cr) {
- if (cr->tag) {
- tag_len = flb_sds_len(cr->tag);
- }
- else {
- tag_len = 0;
- }
-
- input_chunk_append_raw(cr->ins, cr->event_type, cr->records,
- cr->tag, tag_len,
- cr->buf_data, cr->buf_size);
- destroy_chunk_raw(cr);
- }
- cr = NULL;
- }
-
- ins->rb->flush_pending = FLB_FALSE;
- }
-}
-
-int flb_input_chunk_append_raw(struct flb_input_instance *in,
- int event_type,
- size_t records,
- const char *tag, size_t tag_len,
- const void *buf, size_t buf_size)
-{
- int ret;
-
- /*
- * If the plugin instance registering the data runs in a separate thread, we must
- * add the data reference to the ring buffer.
- */
- if (flb_input_is_threaded(in)) {
- ret = append_to_ring_buffer(in, event_type, records,
- tag, tag_len,
- buf, buf_size);
- }
- else {
- ret = input_chunk_append_raw(in, event_type, records,
- tag, tag_len, buf, buf_size);
- }
-
- return ret;
-}
-
-/* Retrieve a raw buffer from a dyntag node */
-const void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size)
-{
- int ret;
- size_t pre_size;
- size_t post_size;
- ssize_t diff_size;
- char *buf = NULL;
-
- pre_size = flb_input_chunk_get_real_size(ic);
-
- if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
- ret = cio_chunk_up(ic->chunk);
- if (ret == -1) {
- return NULL;
- }
- }
-
- /* Lock the internal chunk
- *
- * This operation has to be performed before getting the chunk data
- * pointer because in certain situations it could cause the chunk
- * mapping to be relocated (ie. macos / windows on trim)
- */
- cio_chunk_lock(ic->chunk);
-
- /*
- * msgpack-c internal use a raw buffer for it operations, since we
- * already appended data we just can take out the references to avoid
- * a new memory allocation and skip a copy operation.
- */
- ret = cio_chunk_get_content(ic->chunk, &buf, size);
-
- if (ret == -1) {
- flb_error("[input chunk] error retrieving chunk content");
- return NULL;
- }
-
- if (!buf) {
- *size = 0;
- return NULL;
- }
-
- /* Set it busy as it likely it's a reference for an outgoing task */
- ic->busy = FLB_TRUE;
-
- post_size = flb_input_chunk_get_real_size(ic);
- if (post_size != pre_size) {
- diff_size = post_size - pre_size;
- flb_input_chunk_update_output_instances(ic, diff_size);
- }
- return buf;
-}
-
-int flb_input_chunk_release_lock(struct flb_input_chunk *ic)
-{
- if (ic->busy == FLB_FALSE) {
- return -1;
- }
-
- ic->busy = FLB_FALSE;
- return 0;
-}
-
-flb_sds_t flb_input_chunk_get_name(struct flb_input_chunk *ic)
-{
- struct cio_chunk *ch;
-
- ch = (struct cio_chunk *) ic->chunk;
- return ch->name;
-}
-
-static inline int input_chunk_has_magic_bytes(char *buf, int len)
-{
- unsigned char *p;
-
- if (len < FLB_INPUT_CHUNK_META_HEADER) {
- return FLB_FALSE;
- }
-
- p = (unsigned char *) buf;
- if (p[0] == FLB_INPUT_CHUNK_MAGIC_BYTE_0 &&
- p[1] == FLB_INPUT_CHUNK_MAGIC_BYTE_1 && p[3] == 0) {
- return FLB_TRUE;
- }
-
- return FLB_FALSE;
-}
-
-/*
- * Get the event type by retrieving metadata header. NOTE: this function only event type discovery by looking at the
- * headers bytes of a chunk that exists on disk.
- */
-int flb_input_chunk_get_event_type(struct flb_input_chunk *ic)
-{
- int len;
- int ret;
- int type = -1;
- char *buf = NULL;
-
- ret = cio_meta_read(ic->chunk, &buf, &len);
- if (ret == -1) {
- return -1;
- }
-
- /* Check metadata header / magic bytes */
- if (input_chunk_has_magic_bytes(buf, len)) {
- if (buf[2] == FLB_INPUT_CHUNK_TYPE_LOGS) {
- type = FLB_INPUT_LOGS;
- }
- else if (buf[2] == FLB_INPUT_CHUNK_TYPE_METRICS) {
- type = FLB_INPUT_METRICS;
- }
- else if (buf[2] == FLB_INPUT_CHUNK_TYPE_TRACES) {
- type = FLB_INPUT_TRACES;
- }
- }
- else {
- type = FLB_INPUT_LOGS;
- }
-
-
- return type;
-}
-
-int flb_input_chunk_get_tag(struct flb_input_chunk *ic,
- const char **tag_buf, int *tag_len)
-{
- int len;
- int ret;
- char *buf;
-
- ret = cio_meta_read(ic->chunk, &buf, &len);
- if (ret == -1) {
- *tag_len = -1;
- *tag_buf = NULL;
- return -1;
- }
-
- /* If magic bytes exists, just set the offset */
- if (input_chunk_has_magic_bytes(buf, len)) {
- *tag_len = len - FLB_INPUT_CHUNK_META_HEADER;
- *tag_buf = buf + FLB_INPUT_CHUNK_META_HEADER;
- }
- else {
- /* Old Chunk version without magic bytes */
- *tag_len = len;
- *tag_buf = buf;
- }
-
- return ret;
-}
-
-/*
- * Iterates all output instances that the chunk will be flushing to and summarize
- * the total number of bytes in use after ingesting the new data.
- */
-void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic,
- size_t chunk_size)
-{
- struct mk_list *head;
- struct flb_output_instance *o_ins;
-
- /* for each output plugin, we update the fs_chunks_size */
- mk_list_foreach(head, &ic->in->config->outputs) {
- o_ins = mk_list_entry(head, struct flb_output_instance, _head);
- if (o_ins->total_limit_size == -1) {
- continue;
- }
-
- if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) {
- /*
- * if there is match on any index of 1's in the binary, it indicates
- * that the input chunk will flush to this output instance
- */
- FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, chunk_size);
- o_ins->fs_chunks_size += chunk_size;
- ic->fs_counted = FLB_TRUE;
-
- flb_debug("[input chunk] chunk %s update plugin %s fs_chunks_size by %ld bytes, "
- "the current fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic),
- o_ins->name, chunk_size, o_ins->fs_chunks_size);
- }
- }
-}