diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/plugins/in_tail | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/in_tail')
29 files changed, 0 insertions, 7244 deletions
diff --git a/fluent-bit/plugins/in_tail/CMakeLists.txt b/fluent-bit/plugins/in_tail/CMakeLists.txt deleted file mode 100644 index 31d865218..000000000 --- a/fluent-bit/plugins/in_tail/CMakeLists.txt +++ /dev/null @@ -1,37 +0,0 @@ -set(src - tail_file.c - tail_dockermode.c - tail_scan.c - tail_config.c - tail_fs_stat.c - tail.c) - -if(FLB_HAVE_INOTIFY) -set(src - ${src} - tail_fs_inotify.c) -endif() - -if(FLB_SQLDB) -set(src - ${src} - tail_db.c) -endif() - -if(FLB_PARSER) - set(src - ${src} - tail_multiline.c - ) -endif() - -if(MSVC) - set(src - ${src} - win32/stat.c - win32/io.c - ) - FLB_PLUGIN(in_tail "${src}" "Shlwapi") -else() - FLB_PLUGIN(in_tail "${src}" "") -endif() diff --git a/fluent-bit/plugins/in_tail/tail.c b/fluent-bit/plugins/in_tail/tail.c deleted file mode 100644 index 34a0fec3d..000000000 --- a/fluent-bit/plugins/in_tail/tail.c +++ /dev/null @@ -1,783 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <stdio.h> -#include <stdlib.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_compat.h> -#include <fluent-bit/flb_input.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_config.h> -#include <fluent-bit/flb_config_map.h> -#include <fluent-bit/flb_error.h> -#include <fluent-bit/flb_utils.h> - -#include "tail.h" -#include "tail_fs.h" -#include "tail_db.h" -#include "tail_file.h" -#include "tail_scan.h" -#include "tail_signal.h" -#include "tail_config.h" -#include "tail_dockermode.h" -#include "tail_multiline.h" - -static inline int consume_byte(flb_pipefd_t fd) -{ - int ret; - uint64_t val; - - /* We need to consume the byte */ - ret = flb_pipe_r(fd, (char *) &val, sizeof(val)); - if (ret <= 0) { - flb_errno(); - return -1; - } - - return 0; -} - -/* cb_collect callback */ -static int in_tail_collect_pending(struct flb_input_instance *ins, - struct flb_config *config, void *in_context) -{ - int ret; - int active = 0; - struct mk_list *tmp; - struct mk_list *head; - struct flb_tail_config *ctx = in_context; - struct flb_tail_file *file; - struct stat st; - uint64_t pre; - uint64_t total_processed = 0; - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach_safe(head, tmp, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - - if (file->watch_fd == -1 || - (file->offset >= file->size)) { - ret = fstat(file->fd, &st); - - if (ret == -1) { - flb_errno(); - flb_tail_file_remove(file); - continue; - } - - file->size = st.st_size; - file->pending_bytes = (file->size - file->offset); - } - else { - memset(&st, 0, sizeof(struct stat)); - } - - if (file->pending_bytes <= 0) { - file->pending_bytes = (file->size - file->offset); - } - - if (file->pending_bytes <= 0) { - continue; - } - - if (ctx->event_batch_size > 0 && - total_processed >= ctx->event_batch_size) { - break; - } - - /* get initial offset to calculate the number of processed bytes later */ - pre = file->offset; - - ret = flb_tail_file_chunk(file); - - /* Update the total number of bytes processed */ - if (file->offset > pre) { - total_processed += (file->offset - pre); - } - - switch (ret) { - case FLB_TAIL_ERROR: - /* Could not longer read the file */ - flb_tail_file_remove(file); - break; - case FLB_TAIL_OK: - case FLB_TAIL_BUSY: - /* - * Adjust counter to verify if we need a further read(2) later. - * For more details refer to tail_fs_inotify.c:96. - */ - if (file->offset < file->size) { - file->pending_bytes = (file->size - file->offset); - active++; - } - else { - file->pending_bytes = 0; - } - break; - } - } - - /* If no more active files, consume pending signal so we don't get called again. */ - if (active == 0) { - tail_consume_pending(ctx); - } - - return 0; -} - -/* cb_collect callback */ -static int in_tail_collect_static(struct flb_input_instance *ins, - struct flb_config *config, void *in_context) -{ - int ret; - int active = 0; - int pre_size; - int pos_size; - int alter_size = 0; - int completed = FLB_FALSE; - char s_size[32]; - struct mk_list *tmp; - struct mk_list *head; - struct flb_tail_config *ctx = in_context; - struct flb_tail_file *file; - uint64_t pre; - uint64_t total_processed = 0; - - /* Do a data chunk collection for each file */ - mk_list_foreach_safe(head, tmp, &ctx->files_static) { - file = mk_list_entry(head, struct flb_tail_file, _head); - - /* - * The list 'files_static' represents all the files that were discovered - * on startup that already contains data: these are called 'static files'. - * - * When processing static files, we don't know what kind of content they - * have and what kind of 'latency' might add to process all of them in - * a row. Despite we always 'try' to do a full round and process a - * fraction of them on every invocation of this function if we have a - * huge number of files we will face latency and make the main pipeline - * to degrade performance. - * - * In order to avoid this situation, we added a new option to the plugin - * called 'static_batch_size' which basically defines how many bytes can - * be processed on every invocation to process the static files. - * - * When the limit is reached, we just break the loop and as a side effect - * we allow other events keep processing. - */ - if (ctx->static_batch_size > 0 && - total_processed >= ctx->static_batch_size) { - break; - } - - /* get initial offset to calculate the number of processed bytes later */ - pre = file->offset; - - /* Process the file */ - ret = flb_tail_file_chunk(file); - - /* Update the total number of bytes processed */ - if (file->offset > pre) { - total_processed += (file->offset - pre); - } - - switch (ret) { - case FLB_TAIL_ERROR: - /* Could not longer read the file */ - flb_plg_debug(ctx->ins, "inode=%"PRIu64" collect static ERROR", - file->inode); - flb_tail_file_remove(file); - break; - case FLB_TAIL_OK: - case FLB_TAIL_BUSY: - active++; - break; - case FLB_TAIL_WAIT: - if (file->config->exit_on_eof) { - flb_plg_info(ctx->ins, "inode=%"PRIu64" file=%s ended, stop", - file->inode, file->name); - if (ctx->files_static_count == 1) { - flb_engine_exit(config); - } - } - /* Promote file to 'events' type handler */ - flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s promote to TAIL_EVENT", - file->inode, file->name); - - /* - * When promoting a file from 'static' to 'event' mode, the promoter - * will check if the file has been rotated while it was being - * processed on this function, if so, it will try to check for the - * following condition: - * - * "discover a new possible file created due to rotation" - * - * If the condition above is met, a new file entry will be added to - * the list that we are processing and a 'new signal' will be send - * to the signal manager. But the signal manager will trigger the - * message only if no pending messages exists (to avoid queue size - * exhaustion). - * - * All good, but there is a corner case where if no 'active' files - * exists, the signal will be read and this function will not be - * called again and since the signal did not triggered the - * message, the 'new file' enqueued by the nested function - * might stay in stale mode (note that altering the length of this - * list will not be reflected yet) - * - * To fix the corner case, we use a variable called 'alter_size' - * that determinate if the size of the list keeps the same after - * a rotation, so it means: a new file was added. - * - * We use 'alter_size' as a helper in the conditional below to know - * when to stop processing the static list. - */ - if (alter_size == 0) { - pre_size = ctx->files_static_count; - } - ret = flb_tail_file_to_event(file); - if (ret == -1) { - flb_plg_debug(ctx->ins, "file=%s cannot promote, unregistering", - file->name); - flb_tail_file_remove(file); - } - - if (alter_size == 0) { - pos_size = ctx->files_static_count; - if (pre_size == pos_size) { - alter_size++; - } - } - break; - } - } - - /* - * If there are no more active static handlers, we consume the 'byte' that - * triggered this event so this is not longer called again. - */ - if (active == 0 && alter_size == 0) { - consume_byte(ctx->ch_manager[0]); - ctx->ch_reads++; - completed = FLB_TRUE; - } - - /* Debugging number of processed bytes */ - if (flb_log_check_level(ctx->ins->log_level, FLB_LOG_DEBUG)) { - flb_utils_bytes_to_human_readable_size(total_processed, - s_size, sizeof(s_size)); - if (completed) { - flb_plg_debug(ctx->ins, "[static files] processed %s, done", s_size); - } - else { - flb_plg_debug(ctx->ins, "[static files] processed %s", s_size); - } - } - - return 0; -} - -static int in_tail_watcher_callback(struct flb_input_instance *ins, - struct flb_config *config, void *context) -{ - int ret = 0; - struct mk_list *tmp; - struct mk_list *head; - struct flb_tail_config *ctx = context; - struct flb_tail_file *file; - (void) config; - - mk_list_foreach_safe(head, tmp, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - if (file->is_link == FLB_TRUE) { - ret = flb_tail_file_is_rotated(ctx, file); - if (ret == FLB_FALSE) { - continue; - } - - /* The symbolic link name has been rotated */ - flb_tail_file_rotated(file); - } - } - return ret; -} - -int in_tail_collect_event(void *file, struct flb_config *config) -{ - int ret; - struct stat st; - struct flb_tail_file *f = file; - - ret = fstat(f->fd, &st); - if (ret == -1) { - flb_tail_file_remove(f); - return 0; - } - - ret = flb_tail_file_chunk(f); - switch (ret) { - case FLB_TAIL_ERROR: - /* Could not longer read the file */ - flb_tail_file_remove(f); - break; - case FLB_TAIL_OK: - case FLB_TAIL_WAIT: - break; - } - - return 0; -} - -/* Initialize plugin */ -static int in_tail_init(struct flb_input_instance *in, - struct flb_config *config, void *data) -{ - int ret = -1; - struct flb_tail_config *ctx = NULL; - - /* Allocate space for the configuration */ - ctx = flb_tail_config_create(in, config); - if (!ctx) { - return -1; - } - ctx->ins = in; - - /* Initialize file-system watcher */ - ret = flb_tail_fs_init(in, ctx, config); - if (ret == -1) { - flb_tail_config_destroy(ctx); - return -1; - } - - /* Scan path */ - flb_tail_scan(ctx->path_list, ctx); - - /* - * After the first scan (on start time), all new files discovered needs to be - * read from head, so we switch the 'read_from_head' flag to true so any - * other file discovered after a scan or a rotation are read from the - * beginning. - */ - ctx->read_from_head = FLB_TRUE; - - /* Set plugin context */ - flb_input_set_context(in, ctx); - - /* Register an event collector */ - ret = flb_input_set_collector_event(in, in_tail_collect_static, - ctx->ch_manager[0], config); - if (ret == -1) { - flb_tail_config_destroy(ctx); - return -1; - } - ctx->coll_fd_static = ret; - - /* Register re-scan: time managed by 'refresh_interval' property */ - ret = flb_input_set_collector_time(in, flb_tail_scan_callback, - ctx->refresh_interval_sec, - ctx->refresh_interval_nsec, - config); - if (ret == -1) { - flb_tail_config_destroy(ctx); - return -1; - } - ctx->coll_fd_scan = ret; - - /* Register watcher, interval managed by 'watcher_interval' property */ - ret = flb_input_set_collector_time(in, in_tail_watcher_callback, - ctx->watcher_interval, 0, - config); - if (ret == -1) { - flb_tail_config_destroy(ctx); - return -1; - } - ctx->coll_fd_watcher = ret; - - /* Register callback to purge rotated files */ - ret = flb_input_set_collector_time(in, flb_tail_file_purge, - ctx->rotate_wait, 0, - config); - if (ret == -1) { - flb_tail_config_destroy(ctx); - return -1; - } - ctx->coll_fd_rotated = ret; - - /* Register callback to process pending bytes in promoted files */ - ret = flb_input_set_collector_event(in, in_tail_collect_pending, - ctx->ch_pending[0], config);//1, 0, config); - if (ret == -1) { - flb_tail_config_destroy(ctx); - return -1; - } - ctx->coll_fd_pending = ret; - - - if (ctx->multiline == FLB_TRUE && ctx->parser) { - ctx->parser = NULL; - flb_plg_warn(in, "on multiline mode 'Parser' is not allowed " - "(parser disabled)"); - } - - /* Register callback to process docker mode queued buffer */ - if (ctx->docker_mode == FLB_TRUE) { - ret = flb_input_set_collector_time(in, flb_tail_dmode_pending_flush, - ctx->docker_mode_flush, 0, - config); - if (ret == -1) { - ctx->docker_mode = FLB_FALSE; - flb_tail_config_destroy(ctx); - return -1; - } - ctx->coll_fd_dmode_flush = ret; - } - -#ifdef FLB_HAVE_PARSER - /* Register callback to process multiline queued buffer */ - if (ctx->multiline == FLB_TRUE) { - ret = flb_input_set_collector_time(in, flb_tail_mult_pending_flush, - ctx->multiline_flush, 0, - config); - if (ret == -1) { - ctx->multiline = FLB_FALSE; - flb_tail_config_destroy(ctx); - return -1; - } - ctx->coll_fd_mult_flush = ret; - } -#endif - - return 0; -} - -/* Pre-run callback / before the event loop */ -static int in_tail_pre_run(struct flb_input_instance *ins, - struct flb_config *config, void *in_context) -{ - struct flb_tail_config *ctx = in_context; - (void) ins; - - return tail_signal_manager(ctx); -} - -static int in_tail_exit(void *data, struct flb_config *config) -{ - (void) *config; - struct flb_tail_config *ctx = data; - - flb_tail_file_remove_all(ctx); - flb_tail_fs_exit(ctx); - flb_tail_config_destroy(ctx); - - return 0; -} - -static void in_tail_pause(void *data, struct flb_config *config) -{ - struct flb_tail_config *ctx = data; - - /* - * Pause general collectors: - * - * - static : static files lookup before promotion - */ - flb_input_collector_pause(ctx->coll_fd_static, ctx->ins); - flb_input_collector_pause(ctx->coll_fd_pending, ctx->ins); - - if (ctx->docker_mode == FLB_TRUE) { - flb_input_collector_pause(ctx->coll_fd_dmode_flush, ctx->ins); - if (config->is_ingestion_active == FLB_FALSE) { - flb_plg_info(ctx->ins, "flushing pending docker mode data..."); - flb_tail_dmode_pending_flush_all(ctx); - } - } - - if (ctx->multiline == FLB_TRUE) { - flb_input_collector_pause(ctx->coll_fd_mult_flush, ctx->ins); - if (config->is_ingestion_active == FLB_FALSE) { - flb_plg_info(ctx->ins, "flushing pending multiline data..."); - flb_tail_mult_pending_flush_all(ctx); - } - } - - /* Pause file system backend handlers */ - flb_tail_fs_pause(ctx); -} - -static void in_tail_resume(void *data, struct flb_config *config) -{ - struct flb_tail_config *ctx = data; - - flb_input_collector_resume(ctx->coll_fd_static, ctx->ins); - flb_input_collector_resume(ctx->coll_fd_pending, ctx->ins); - - if (ctx->docker_mode == FLB_TRUE) { - flb_input_collector_resume(ctx->coll_fd_dmode_flush, ctx->ins); - } - - if (ctx->multiline == FLB_TRUE) { - flb_input_collector_resume(ctx->coll_fd_mult_flush, ctx->ins); - } - - /* Pause file system backend handlers */ - flb_tail_fs_resume(ctx); -} - -/* Configuration properties map */ -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_CLIST, "path", NULL, - 0, FLB_TRUE, offsetof(struct flb_tail_config, path_list), - "pattern specifying log files or multiple ones through " - "the use of common wildcards." - }, - { - FLB_CONFIG_MAP_CLIST, "exclude_path", NULL, - 0, FLB_TRUE, offsetof(struct flb_tail_config, exclude_list), - "Set one or multiple shell patterns separated by commas to exclude " - "files matching a certain criteria, e.g: 'exclude_path *.gz,*.zip'" - }, - { - FLB_CONFIG_MAP_STR, "key", "log", - 0, FLB_TRUE, offsetof(struct flb_tail_config, key), - "when a message is unstructured (no parser applied), it's appended " - "as a string under the key name log. This option allows to define an " - "alternative name for that key." - }, - { - FLB_CONFIG_MAP_BOOL, "read_from_head", "false", - 0, FLB_TRUE, offsetof(struct flb_tail_config, read_from_head), - "For new discovered files on start (without a database offset/position), read the " - "content from the head of the file, not tail." - }, - { - FLB_CONFIG_MAP_STR, "refresh_interval", "60", - 0, FLB_FALSE, 0, - "interval to refresh the list of watched files expressed in seconds." - }, - { - FLB_CONFIG_MAP_TIME, "watcher_interval", "2s", - 0, FLB_TRUE, offsetof(struct flb_tail_config, watcher_interval), - }, - { - FLB_CONFIG_MAP_TIME, "progress_check_interval", "2s", - 0, FLB_TRUE, offsetof(struct flb_tail_config, progress_check_interval), - }, - { - FLB_CONFIG_MAP_INT, "progress_check_interval_nsec", "0", - 0, FLB_TRUE, offsetof(struct flb_tail_config, progress_check_interval_nsec), - }, - { - FLB_CONFIG_MAP_TIME, "rotate_wait", FLB_TAIL_ROTATE_WAIT, - 0, FLB_TRUE, offsetof(struct flb_tail_config, rotate_wait), - "specify the number of extra time in seconds to monitor a file once is " - "rotated in case some pending data is flushed." - }, - { - FLB_CONFIG_MAP_BOOL, "docker_mode", "false", - 0, FLB_TRUE, offsetof(struct flb_tail_config, docker_mode), - "If enabled, the plugin will recombine split Docker log lines before " - "passing them to any parser as configured above. This mode cannot be " - "used at the same time as Multiline." - }, - { - FLB_CONFIG_MAP_INT, "docker_mode_flush", "4", - 0, FLB_TRUE, offsetof(struct flb_tail_config, docker_mode_flush), - "wait period time in seconds to flush queued unfinished split lines." - - }, -#ifdef FLB_HAVE_REGEX - { - FLB_CONFIG_MAP_STR, "docker_mode_parser", NULL, - 0, FLB_FALSE, 0, - "specify the parser name to fetch log first line for muliline log" - }, -#endif - { - FLB_CONFIG_MAP_STR, "path_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_tail_config, path_key), - "set the 'key' name where the name of monitored file will be appended." - }, - { - FLB_CONFIG_MAP_STR, "offset_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_tail_config, offset_key), - "set the 'key' name where the offset of monitored file will be appended." - }, - { - FLB_CONFIG_MAP_TIME, "ignore_older", "0", - 0, FLB_TRUE, offsetof(struct flb_tail_config, ignore_older), - "ignore files older than 'ignore_older'. Supports m,h,d (minutes, " - "hours, days) syntax. Default behavior is to read all the files." - }, - { - FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", FLB_TAIL_CHUNK, - 0, FLB_TRUE, offsetof(struct flb_tail_config, buf_chunk_size), - "set the initial buffer size to read data from files. This value is " - "used too to increase buffer size." - }, - { - FLB_CONFIG_MAP_SIZE, "buffer_max_size", FLB_TAIL_CHUNK, - 0, FLB_TRUE, offsetof(struct flb_tail_config, buf_max_size), - "set the limit of the buffer size per monitored file. When a buffer " - "needs to be increased (e.g: very long lines), this value is used to " - "restrict how much the memory buffer can grow. If reading a file exceed " - "this limit, the file is removed from the monitored file list." - }, - { - FLB_CONFIG_MAP_SIZE, "static_batch_size", FLB_TAIL_STATIC_BATCH_SIZE, - 0, FLB_TRUE, offsetof(struct flb_tail_config, static_batch_size), - "On start, Fluent Bit might process files which already contains data, " - "these files are called 'static' files. The configuration property " - "in question set's the maximum number of bytes to process per iteration " - "for the static files monitored." - }, - { - FLB_CONFIG_MAP_SIZE, "event_batch_size", FLB_TAIL_EVENT_BATCH_SIZE, - 0, FLB_TRUE, offsetof(struct flb_tail_config, event_batch_size), - "When Fluent Bit is processing files in event based mode the amount of" - "data available for consumption could be too much and cause the input plugin " - "to over extend and smother other plugins" - "The configuration property sets the maximum number of bytes to process per iteration " - "for the files monitored (in event mode)." - }, - { - FLB_CONFIG_MAP_BOOL, "skip_long_lines", "false", - 0, FLB_TRUE, offsetof(struct flb_tail_config, skip_long_lines), - "if a monitored file reach it buffer capacity due to a very long line " - "(buffer_max_size), the default behavior is to stop monitoring that " - "file. This option alter that behavior and instruct Fluent Bit to skip " - "long lines and continue processing other lines that fits into the buffer." - }, - { - FLB_CONFIG_MAP_BOOL, "exit_on_eof", "false", - 0, FLB_TRUE, offsetof(struct flb_tail_config, exit_on_eof), - "exit Fluent Bit when reaching EOF on a monitored file." - }, - - { - FLB_CONFIG_MAP_BOOL, "skip_empty_lines", "false", - 0, FLB_TRUE, offsetof(struct flb_tail_config, skip_empty_lines), - "Allows to skip empty lines." - }, - -#ifdef FLB_HAVE_INOTIFY - { - FLB_CONFIG_MAP_BOOL, "inotify_watcher", "true", - 0, FLB_TRUE, offsetof(struct flb_tail_config, inotify_watcher), - "set to false to use file stat watcher instead of inotify." - }, -#endif -#ifdef FLB_HAVE_REGEX - { - FLB_CONFIG_MAP_STR, "parser", NULL, - 0, FLB_FALSE, 0, - "specify the parser name to process an unstructured message." - }, - { - FLB_CONFIG_MAP_STR, "tag_regex", NULL, - 0, FLB_FALSE, 0, - "set a regex to extract fields from the file name and use them later to " - "compose the Tag." - }, -#endif - -#ifdef FLB_HAVE_SQLDB - { - FLB_CONFIG_MAP_STR, "db", NULL, - 0, FLB_FALSE, 0, - "set a database file to keep track of monitored files and it offsets." - }, - { - FLB_CONFIG_MAP_STR, "db.sync", "normal", - 0, FLB_FALSE, 0, - "set a database sync method. values: extra, full, normal and off." - }, - { - FLB_CONFIG_MAP_BOOL, "db.locking", "false", - 0, FLB_TRUE, offsetof(struct flb_tail_config, db_locking), - "set exclusive locking mode, increase performance but don't allow " - "external connections to the database file." - }, - { - FLB_CONFIG_MAP_STR, "db.journal_mode", "WAL", - 0, FLB_TRUE, offsetof(struct flb_tail_config, db_journal_mode), - "Option to provide WAL configuration for Work Ahead Logging mechanism (WAL). Enabling WAL " - "provides higher performance. Note that WAL is not compatible with " - "shared network file systems." - }, -#endif - - /* Multiline Options */ -#ifdef FLB_HAVE_PARSER - { - FLB_CONFIG_MAP_BOOL, "multiline", "false", - 0, FLB_TRUE, offsetof(struct flb_tail_config, multiline), - "if enabled, the plugin will try to discover multiline messages and use " - "the proper parsers to compose the outgoing messages. Note that when this " - "option is enabled the Parser option is not used." - }, - { - FLB_CONFIG_MAP_TIME, "multiline_flush", FLB_TAIL_MULT_FLUSH, - 0, FLB_TRUE, offsetof(struct flb_tail_config, multiline_flush), - "wait period time in seconds to process queued multiline messages." - }, - { - FLB_CONFIG_MAP_STR, "parser_firstline", NULL, - 0, FLB_FALSE, 0, - "name of the parser that matches the beginning of a multiline message. " - "Note that the regular expression defined in the parser must include a " - "group name (named capture)." - }, - { - FLB_CONFIG_MAP_STR_PREFIX, "parser_", NULL, - 0, FLB_FALSE, 0, - "optional extra parser to interpret and structure multiline entries. This " - "option can be used to define multiple parsers, e.g: Parser_1 ab1, " - "Parser_2 ab2, Parser_N abN." - }, - - /* Multiline Core Engine based API */ - { - FLB_CONFIG_MAP_CLIST, "multiline.parser", NULL, - FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_tail_config, multiline_parsers), - "specify one or multiple multiline parsers: docker, cri, go, java, etc." - }, -#endif - - /* EOF */ - {0} -}; - -struct flb_input_plugin in_tail_plugin = { - .name = "tail", - .description = "Tail files", - .cb_init = in_tail_init, - .cb_pre_run = in_tail_pre_run, - .cb_collect = NULL, - .cb_flush_buf = NULL, - .cb_pause = in_tail_pause, - .cb_resume = in_tail_resume, - .cb_exit = in_tail_exit, - .config_map = config_map, - .flags = 0 -}; diff --git a/fluent-bit/plugins/in_tail/tail.h b/fluent-bit/plugins/in_tail/tail.h deleted file mode 100644 index 074f7a49f..000000000 --- a/fluent-bit/plugins/in_tail/tail.h +++ /dev/null @@ -1,45 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_H -#define FLB_TAIL_H - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input.h> - -/* Internal return values */ -#define FLB_TAIL_ERROR -1 -#define FLB_TAIL_OK 0 -#define FLB_TAIL_WAIT 1 -#define FLB_TAIL_BUSY 2 - -/* Consuming mode */ -#define FLB_TAIL_STATIC 0 /* Data is being consumed through read(2) */ -#define FLB_TAIL_EVENT 1 /* Data is being consumed through inotify */ - -/* Config */ -#define FLB_TAIL_CHUNK "32768" /* buffer chunk = 32KB */ -#define FLB_TAIL_REFRESH 60 /* refresh every 60 seconds */ -#define FLB_TAIL_ROTATE_WAIT "5" /* time to monitor after rotation */ -#define FLB_TAIL_STATIC_BATCH_SIZE "50M" /* static batch size */ -#define FLB_TAIL_EVENT_BATCH_SIZE "50M" /* event batch size */ - -int in_tail_collect_event(void *file, struct flb_config *config); - -#endif diff --git a/fluent-bit/plugins/in_tail/tail_config.c b/fluent-bit/plugins/in_tail/tail_config.c deleted file mode 100644 index 360b3dbea..000000000 --- a/fluent-bit/plugins/in_tail/tail_config.c +++ /dev/null @@ -1,472 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_parser.h> - -#include <stdlib.h> -#include <fcntl.h> - -#include "tail_fs.h" -#include "tail_db.h" -#include "tail_config.h" -#include "tail_scan.h" -#include "tail_sql.h" -#include "tail_dockermode.h" - -#ifdef FLB_HAVE_PARSER -#include "tail_multiline.h" -#endif - -static int multiline_load_parsers(struct flb_tail_config *ctx) -{ - struct mk_list *head; - struct mk_list *head_p; - struct flb_config_map_val *mv; - struct flb_slist_entry *val = NULL; - struct flb_ml_parser_ins *parser_i; - - if (!ctx->multiline_parsers) { - return 0; - } - - /* Create Multiline context using the plugin instance name */ - ctx->ml_ctx = flb_ml_create(ctx->config, ctx->ins->name); - if (!ctx->ml_ctx) { - return -1; - } - - /* - * Iterate all 'multiline.parser' entries. Every entry is considered - * a group which can have multiple multiline parser instances. - */ - flb_config_map_foreach(head, mv, ctx->multiline_parsers) { - mk_list_foreach(head_p, mv->val.list) { - val = mk_list_entry(head_p, struct flb_slist_entry, _head); - - /* Create an instance of the defined parser */ - parser_i = flb_ml_parser_instance_create(ctx->ml_ctx, val->str); - if (!parser_i) { - return -1; - } - } - } - - return 0; -} - -struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, - struct flb_config *config) -{ - int ret; - int sec; - int i; - long nsec; - const char *tmp; - struct flb_tail_config *ctx; - - ctx = flb_calloc(1, sizeof(struct flb_tail_config)); - if (!ctx) { - flb_errno(); - return NULL; - } - ctx->config = config; - ctx->ins = ins; - ctx->ignore_older = 0; - ctx->skip_long_lines = FLB_FALSE; -#ifdef FLB_HAVE_SQLDB - ctx->db_sync = 1; /* sqlite sync 'normal' */ -#endif - - /* Load the config map */ - ret = flb_input_config_map_set(ins, (void *) ctx); - if (ret == -1) { - flb_free(ctx); - return NULL; - } - - /* Create the channel manager */ - ret = flb_pipe_create(ctx->ch_manager); - if (ret == -1) { - flb_errno(); - flb_free(ctx); - return NULL; - } - ctx->ch_reads = 0; - ctx->ch_writes = 0; - - /* Create the pending channel */ - ret = flb_pipe_create(ctx->ch_pending); - if (ret == -1) { - flb_errno(); - flb_tail_config_destroy(ctx); - return NULL; - } - /* Make pending channel non-blocking */ - for (i = 0; i <= 1; i++) { - ret = flb_pipe_set_nonblocking(ctx->ch_pending[i]); - if (ret == -1) { - flb_errno(); - flb_tail_config_destroy(ctx); - return NULL; - } - } - - /* Config: path/pattern to read files */ - if (!ctx->path_list || mk_list_size(ctx->path_list) == 0) { - flb_plg_error(ctx->ins, "no input 'path' was given"); - flb_tail_config_destroy(ctx); - return NULL; - } - - /* Config: seconds interval before to re-scan the path */ - tmp = flb_input_get_property("refresh_interval", ins); - if (!tmp) { - ctx->refresh_interval_sec = FLB_TAIL_REFRESH; - ctx->refresh_interval_nsec = 0; - } - else { - ret = flb_utils_time_split(tmp, &sec, &nsec); - if (ret == 0) { - ctx->refresh_interval_sec = sec; - ctx->refresh_interval_nsec = nsec; - - if (sec == 0 && nsec == 0) { - flb_plg_error(ctx->ins, "invalid 'refresh_interval' config " - "value (%s)", tmp); - flb_free(ctx); - return NULL; - } - - if (sec == 0 && nsec <= 1000000) { - flb_plg_warn(ctx->ins, "very low refresh_interval " - "(%i.%lu nanoseconds) might cause high CPU usage", - sec, nsec); - } - } - else { - flb_plg_error(ctx->ins, - "invalid 'refresh_interval' config value (%s)", - tmp); - flb_tail_config_destroy(ctx); - return NULL; - } - } - - /* Config: seconds interval to monitor file after rotation */ - if (ctx->rotate_wait <= 0) { - flb_plg_error(ctx->ins, "invalid 'rotate_wait' config value"); - flb_free(ctx); - return NULL; - } - -#ifdef FLB_HAVE_PARSER - /* Config: multi-line support */ - if (ctx->multiline == FLB_TRUE) { - ret = flb_tail_mult_create(ctx, ins, config); - if (ret == -1) { - flb_tail_config_destroy(ctx); - return NULL; - } - } -#endif - - /* Config: Docker mode */ - if(ctx->docker_mode == FLB_TRUE) { - ret = flb_tail_dmode_create(ctx, ins, config); - if (ret == -1) { - flb_tail_config_destroy(ctx); - return NULL; - } - } - - /* Validate buffer limit */ - if (ctx->buf_chunk_size > ctx->buf_max_size) { - flb_plg_error(ctx->ins, "buffer_max_size must be >= buffer_chunk"); - flb_free(ctx); - return NULL; - } - -#ifdef FLB_HAVE_REGEX - /* Parser / Format */ - tmp = flb_input_get_property("parser", ins); - if (tmp) { - ctx->parser = flb_parser_get(tmp, config); - if (!ctx->parser) { - flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp); - } - } -#endif - - mk_list_init(&ctx->files_static); - mk_list_init(&ctx->files_event); - mk_list_init(&ctx->files_rotated); - - /* hash table for files lookups */ - ctx->static_hash = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1000, 0); - if (!ctx->static_hash) { - flb_plg_error(ctx->ins, "could not create static hash"); - flb_tail_config_destroy(ctx); - return NULL; - } - - ctx->event_hash = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1000, 0); - if (!ctx->event_hash) { - flb_plg_error(ctx->ins, "could not create event hash"); - flb_tail_config_destroy(ctx); - return NULL; - } - -#ifdef FLB_HAVE_SQLDB - ctx->db = NULL; -#endif - -#ifdef FLB_HAVE_REGEX - tmp = flb_input_get_property("tag_regex", ins); - if (tmp) { - ctx->tag_regex = flb_regex_create(tmp); - if (ctx->tag_regex) { - ctx->dynamic_tag = FLB_TRUE; - } - else { - flb_plg_error(ctx->ins, "invalid 'tag_regex' config value"); - } - } - else { - ctx->tag_regex = NULL; - } -#endif - - /* Check if it should use dynamic tags */ - tmp = strchr(ins->tag, '*'); - if (tmp) { - ctx->dynamic_tag = FLB_TRUE; - } - -#ifdef FLB_HAVE_SQLDB - /* Database options (needs to be set before the context) */ - tmp = flb_input_get_property("db.sync", ins); - if (tmp) { - if (strcasecmp(tmp, "extra") == 0) { - ctx->db_sync = 3; - } - else if (strcasecmp(tmp, "full") == 0) { - ctx->db_sync = 2; - } - else if (strcasecmp(tmp, "normal") == 0) { - ctx->db_sync = 1; - } - else if (strcasecmp(tmp, "off") == 0) { - ctx->db_sync = 0; - } - else { - flb_plg_error(ctx->ins, "invalid database 'db.sync' value"); - } - } - - /* Initialize database */ - tmp = flb_input_get_property("db", ins); - if (tmp) { - ctx->db = flb_tail_db_open(tmp, ins, ctx, config); - if (!ctx->db) { - flb_plg_error(ctx->ins, "could not open/create database"); - flb_tail_config_destroy(ctx); - return NULL; - } - } - - /* Journal mode check */ - tmp = flb_input_get_property("db.journal_mode", ins); - if (tmp) { - if (strcasecmp(tmp, "DELETE") != 0 && - strcasecmp(tmp, "TRUNCATE") != 0 && - strcasecmp(tmp, "PERSIST") != 0 && - strcasecmp(tmp, "MEMORY") != 0 && - strcasecmp(tmp, "WAL") != 0 && - strcasecmp(tmp, "OFF") != 0) { - - flb_plg_error(ctx->ins, "invalid db.journal_mode=%s", tmp); - flb_tail_config_destroy(ctx); - return NULL; - } - } - - /* Prepare Statement */ - if (ctx->db) { - /* SQL_GET_FILE */ - ret = sqlite3_prepare_v2(ctx->db->handler, - SQL_GET_FILE, - -1, - &ctx->stmt_get_file, - 0); - if (ret != SQLITE_OK) { - flb_plg_error(ctx->ins, "error preparing database SQL statement"); - flb_tail_config_destroy(ctx); - return NULL; - } - - /* SQL_INSERT_FILE */ - ret = sqlite3_prepare_v2(ctx->db->handler, - SQL_INSERT_FILE, - -1, - &ctx->stmt_insert_file, - 0); - if (ret != SQLITE_OK) { - flb_plg_error(ctx->ins, "error preparing database SQL statement"); - flb_tail_config_destroy(ctx); - return NULL; - } - - /* SQL_ROTATE_FILE */ - ret = sqlite3_prepare_v2(ctx->db->handler, - SQL_ROTATE_FILE, - -1, - &ctx->stmt_rotate_file, - 0); - if (ret != SQLITE_OK) { - flb_plg_error(ctx->ins, "error preparing database SQL statement"); - flb_tail_config_destroy(ctx); - return NULL; - } - - /* SQL_UPDATE_OFFSET */ - ret = sqlite3_prepare_v2(ctx->db->handler, - SQL_UPDATE_OFFSET, - -1, - &ctx->stmt_offset, - 0); - if (ret != SQLITE_OK) { - flb_plg_error(ctx->ins, "error preparing database SQL statement"); - flb_tail_config_destroy(ctx); - return NULL; - } - - /* SQL_DELETE_FILE */ - ret = sqlite3_prepare_v2(ctx->db->handler, - SQL_DELETE_FILE, - -1, - &ctx->stmt_delete_file, - 0); - if (ret != SQLITE_OK) { - flb_plg_error(ctx->ins, "error preparing database SQL statement"); - flb_tail_config_destroy(ctx); - return NULL; - } - - } -#endif - -#ifdef FLB_HAVE_PARSER - /* Multiline core API */ - if (ctx->multiline_parsers && mk_list_size(ctx->multiline_parsers) > 0) { - ret = multiline_load_parsers(ctx); - if (ret != 0) { - flb_plg_error(ctx->ins, "could not load multiline parsers"); - flb_tail_config_destroy(ctx); - return NULL; - } - - /* Enable auto-flush routine */ - ret = flb_ml_auto_flush_init(ctx->ml_ctx); - if (ret == -1) { - flb_plg_error(ctx->ins, "could not start multiline auto-flush"); - flb_tail_config_destroy(ctx); - return NULL; - } - flb_plg_info(ctx->ins, "multiline core started"); - } -#endif - -#ifdef FLB_HAVE_METRICS - ctx->cmt_files_opened = cmt_counter_create(ins->cmt, - "fluentbit", "input", - "files_opened_total", - "Total number of opened files", - 1, (char *[]) {"name"}); - - ctx->cmt_files_closed = cmt_counter_create(ins->cmt, - "fluentbit", "input", - "files_closed_total", - "Total number of closed files", - 1, (char *[]) {"name"}); - - ctx->cmt_files_rotated = cmt_counter_create(ins->cmt, - "fluentbit", "input", - "files_rotated_total", - "Total number of rotated files", - 1, (char *[]) {"name"}); - - /* OLD metrics */ - flb_metrics_add(FLB_TAIL_METRIC_F_OPENED, - "files_opened", ctx->ins->metrics); - flb_metrics_add(FLB_TAIL_METRIC_F_CLOSED, - "files_closed", ctx->ins->metrics); - flb_metrics_add(FLB_TAIL_METRIC_F_ROTATED, - "files_rotated", ctx->ins->metrics); -#endif - - return ctx; -} - -int flb_tail_config_destroy(struct flb_tail_config *config) -{ - -#ifdef FLB_HAVE_PARSER - flb_tail_mult_destroy(config); - - if (config->ml_ctx) { - flb_ml_destroy(config->ml_ctx); - } -#endif - - /* Close pipe ends */ - flb_pipe_close(config->ch_manager[0]); - flb_pipe_close(config->ch_manager[1]); - flb_pipe_close(config->ch_pending[0]); - flb_pipe_close(config->ch_pending[1]); - -#ifdef FLB_HAVE_REGEX - if (config->tag_regex) { - flb_regex_destroy(config->tag_regex); - } -#endif - -#ifdef FLB_HAVE_SQLDB - if (config->db != NULL) { - sqlite3_finalize(config->stmt_get_file); - sqlite3_finalize(config->stmt_insert_file); - sqlite3_finalize(config->stmt_delete_file); - sqlite3_finalize(config->stmt_rotate_file); - sqlite3_finalize(config->stmt_offset); - flb_tail_db_close(config->db); - } -#endif - - if (config->static_hash) { - flb_hash_table_destroy(config->static_hash); - } - if (config->event_hash) { - flb_hash_table_destroy(config->event_hash); - } - - flb_free(config); - return 0; -} diff --git a/fluent-bit/plugins/in_tail/tail_config.h b/fluent-bit/plugins/in_tail/tail_config.h deleted file mode 100644 index dcfa54e02..000000000 --- a/fluent-bit/plugins/in_tail/tail_config.h +++ /dev/null @@ -1,168 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_CONFIG_H -#define FLB_TAIL_CONFIG_H - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input.h> -#include <fluent-bit/flb_parser.h> -#include <fluent-bit/flb_macros.h> -#include <fluent-bit/flb_sqldb.h> -#include <fluent-bit/flb_metrics.h> -#include <fluent-bit/flb_log_event.h> -#ifdef FLB_HAVE_REGEX -#include <fluent-bit/flb_regex.h> -#endif -#ifdef FLB_HAVE_PARSER -#include <fluent-bit/multiline/flb_ml.h> -#endif - -#include <xxhash.h> - -/* Metrics */ -#ifdef FLB_HAVE_METRICS -#define FLB_TAIL_METRIC_F_OPENED 100 /* number of opened files */ -#define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */ -#define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */ -#endif - -struct flb_tail_config { - int fd_notify; /* inotify fd */ - flb_pipefd_t ch_manager[2]; /* pipe: channel manager */ - flb_pipefd_t ch_pending[2]; /* pipe: pending events */ - int ch_reads; /* count number if signal reads */ - int ch_writes; /* count number of signal writes */ - - /* Buffer Config */ - size_t buf_chunk_size; /* allocation chunks */ - size_t buf_max_size; /* max size of a buffer */ - - /* Static files processor */ - size_t static_batch_size; - - /* Event files processor */ - size_t event_batch_size; - - /* Collectors */ - int coll_fd_static; - int coll_fd_scan; - int coll_fd_watcher; - int coll_fd_rotated; - int coll_fd_pending; - int coll_fd_inactive; - int coll_fd_dmode_flush; - int coll_fd_mult_flush; - int coll_fd_progress_check; - - /* Backend collectors */ - int coll_fd_fs1; /* used by fs_inotify & fs_stat */ - int coll_fd_fs2; /* only used by fs_stat */ - - /* Configuration */ - int dynamic_tag; /* dynamic tag ? e.g: abc.* */ -#ifdef FLB_HAVE_REGEX - struct flb_regex *tag_regex;/* path to tag regex */ -#endif - int refresh_interval_sec; /* seconds to re-scan */ - long refresh_interval_nsec;/* nanoseconds to re-scan */ - int read_from_head; /* read new files from head */ - int rotate_wait; /* sec to wait on rotated files */ - int watcher_interval; /* watcher interval */ - int ignore_older; /* ignore fields older than X seconds */ - time_t last_pending; /* last time a 'pending signal' was emitted' */ - struct mk_list *path_list; /* list of paths to scan (glob) */ - flb_sds_t path_key; /* key name of file path */ - flb_sds_t key; /* key for unstructured record */ - int skip_long_lines; /* skip long lines */ - int skip_empty_lines; /* skip empty lines (off) */ - int exit_on_eof; /* exit fluent-bit on EOF, test */ - - int progress_check_interval; /* watcher interval */ - int progress_check_interval_nsec; /* watcher interval */ - -#ifdef FLB_HAVE_INOTIFY - int inotify_watcher; /* enable/disable inotify monitor */ -#endif - flb_sds_t offset_key; /* key name of file offset */ - - /* Database */ -#ifdef FLB_HAVE_SQLDB - struct flb_sqldb *db; - int db_sync; - int db_locking; - flb_sds_t db_journal_mode; - sqlite3_stmt *stmt_get_file; - sqlite3_stmt *stmt_insert_file; - sqlite3_stmt *stmt_delete_file; - sqlite3_stmt *stmt_rotate_file; - sqlite3_stmt *stmt_offset; -#endif - - /* Parser / Format */ - struct flb_parser *parser; - - /* Multiline */ - int multiline; /* multiline enabled ? */ - int multiline_flush; /* multiline flush/wait */ - struct flb_parser *mult_parser_firstline; - struct mk_list mult_parsers; - - /* Docker mode */ - int docker_mode; /* Docker mode enabled ? */ - int docker_mode_flush; /* Docker mode flush/wait */ - struct flb_parser *docker_mode_parser; /* Parser for separate multiline logs */ - - /* Multiline core engine */ - struct flb_ml *ml_ctx; - struct mk_list *multiline_parsers; - - uint64_t files_static_count; /* number of items in the static file list */ - struct mk_list files_static; - struct mk_list files_event; - - /* List of rotated files that needs to be removed after 'rotate_wait' */ - struct mk_list files_rotated; - - /* List of shell patterns used to exclude certain file names */ - struct mk_list *exclude_list; - - /* Plugin input instance */ - struct flb_input_instance *ins; - - struct flb_log_event_encoder log_event_encoder; - struct flb_log_event_decoder log_event_decoder; - - /* Metrics */ - struct cmt_counter *cmt_files_opened; - struct cmt_counter *cmt_files_closed; - struct cmt_counter *cmt_files_rotated; - - /* Hash: hash tables for quick acess to registered files */ - struct flb_hash_table *static_hash; - struct flb_hash_table *event_hash; - - struct flb_config *config; -}; - -struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, - struct flb_config *config); -int flb_tail_config_destroy(struct flb_tail_config *config); - -#endif diff --git a/fluent-bit/plugins/in_tail/tail_db.c b/fluent-bit/plugins/in_tail/tail_db.c deleted file mode 100644 index 664963b6d..000000000 --- a/fluent-bit/plugins/in_tail/tail_db.c +++ /dev/null @@ -1,277 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_sqldb.h> - -#include "tail_db.h" -#include "tail_sql.h" -#include "tail_file.h" - -struct query_status { - int id; - int rows; - int64_t offset; -}; - -/* Open or create database required by tail plugin */ -struct flb_sqldb *flb_tail_db_open(const char *path, - struct flb_input_instance *in, - struct flb_tail_config *ctx, - struct flb_config *config) -{ - int ret; - char tmp[64]; - struct flb_sqldb *db; - - /* Open/create the database */ - db = flb_sqldb_open(path, in->name, config); - if (!db) { - return NULL; - } - - /* Create table schema if it don't exists */ - ret = flb_sqldb_query(db, SQL_CREATE_FILES, NULL, NULL); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "db: could not create 'in_tail_files' table"); - flb_sqldb_close(db); - return NULL; - } - - if (ctx->db_sync >= 0) { - snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC, - ctx->db_sync); - ret = flb_sqldb_query(db, tmp, NULL, NULL); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "db could not set pragma 'sync'"); - flb_sqldb_close(db); - return NULL; - } - } - - if (ctx->db_locking == FLB_TRUE) { - ret = flb_sqldb_query(db, SQL_PRAGMA_LOCKING_MODE, NULL, NULL); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "db: could not set pragma 'locking_mode'"); - flb_sqldb_close(db); - return NULL; - } - } - - if (ctx->db_journal_mode) { - snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_JOURNAL_MODE, - ctx->db_journal_mode); - ret = flb_sqldb_query(db, tmp, NULL, NULL); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "db could not set pragma 'journal_mode'"); - flb_sqldb_close(db); - return NULL; - } - } - - return db; -} - -int flb_tail_db_close(struct flb_sqldb *db) -{ - flb_sqldb_close(db); - return 0; -} - -/* - * Check if an file inode exists in the database. Return FLB_TRUE or - * FLB_FALSE - */ -static int db_file_exists(struct flb_tail_file *file, - struct flb_tail_config *ctx, - uint64_t *id, uint64_t *inode, off_t *offset) -{ - int ret; - int exists = FLB_FALSE; - - /* Bind parameters */ - sqlite3_bind_int64(ctx->stmt_get_file, 1, file->inode); - ret = sqlite3_step(ctx->stmt_get_file); - - if (ret == SQLITE_ROW) { - exists = FLB_TRUE; - - /* id: column 0 */ - *id = sqlite3_column_int64(ctx->stmt_get_file, 0); - - /* offset: column 2 */ - *offset = sqlite3_column_int64(ctx->stmt_get_file, 2); - - /* inode: column 3 */ - *inode = sqlite3_column_int64(ctx->stmt_get_file, 3); - } - else if (ret == SQLITE_DONE) { - /* all good */ - } - else { - exists = -1; - } - - sqlite3_clear_bindings(ctx->stmt_get_file); - sqlite3_reset(ctx->stmt_get_file); - - return exists; - -} - -static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ctx) - -{ - int ret; - time_t created; - - /* Register the file */ - created = time(NULL); - - /* Bind parameters */ - sqlite3_bind_text(ctx->stmt_insert_file, 1, file->name, -1, 0); - sqlite3_bind_int64(ctx->stmt_insert_file, 2, file->offset); - sqlite3_bind_int64(ctx->stmt_insert_file, 3, file->inode); - sqlite3_bind_int64(ctx->stmt_insert_file, 4, created); - - /* Run the insert */ - ret = sqlite3_step(ctx->stmt_insert_file); - if (ret != SQLITE_DONE) { - sqlite3_clear_bindings(ctx->stmt_insert_file); - sqlite3_reset(ctx->stmt_insert_file); - flb_plg_error(ctx->ins, "cannot execute insert file %s inode=%lu", - file->name, file->inode); - return -1; - } - - sqlite3_clear_bindings(ctx->stmt_insert_file); - sqlite3_reset(ctx->stmt_insert_file); - - /* Get the database ID for this file */ - return flb_sqldb_last_id(ctx->db); -} - -int flb_tail_db_file_set(struct flb_tail_file *file, - struct flb_tail_config *ctx) -{ - int ret; - uint64_t id = 0; - off_t offset = 0; - uint64_t inode = 0; - - /* Check if the file exists */ - ret = db_file_exists(file, ctx, &id, &inode, &offset); - if (ret == -1) { - flb_plg_error(ctx->ins, "cannot execute query to check inode: %lu", - file->inode); - return -1; - } - - if (ret == FLB_FALSE) { - /* Get the database ID for this file */ - file->db_id = db_file_insert(file, ctx); - } - else { - file->db_id = id; - file->offset = offset; - } - - return 0; -} - -/* Update Offset v2 */ -int flb_tail_db_file_offset(struct flb_tail_file *file, - struct flb_tail_config *ctx) -{ - int ret; - - /* Bind parameters */ - sqlite3_bind_int64(ctx->stmt_offset, 1, file->offset); - sqlite3_bind_int64(ctx->stmt_offset, 2, file->db_id); - - ret = sqlite3_step(ctx->stmt_offset); - - if (ret != SQLITE_DONE) { - sqlite3_clear_bindings(ctx->stmt_offset); - sqlite3_reset(ctx->stmt_offset); - return -1; - } - - /* Verify number of updated rows */ - ret = sqlite3_changes(ctx->db->handler); - if (ret == 0) { - /* - * 'someone' like you 'the reader' or another user has deleted the database - * entry, just restore it. - */ - file->db_id = db_file_insert(file, ctx); - } - - sqlite3_clear_bindings(ctx->stmt_offset); - sqlite3_reset(ctx->stmt_offset); - - return 0; -} - -/* Mark a file as rotated v2 */ -int flb_tail_db_file_rotate(const char *new_name, - struct flb_tail_file *file, - struct flb_tail_config *ctx) -{ - int ret; - - /* Bind parameters */ - sqlite3_bind_text(ctx->stmt_rotate_file, 1, new_name, -1, 0); - sqlite3_bind_int64(ctx->stmt_rotate_file, 2, file->db_id); - - ret = sqlite3_step(ctx->stmt_rotate_file); - - sqlite3_clear_bindings(ctx->stmt_rotate_file); - sqlite3_reset(ctx->stmt_rotate_file); - - if (ret != SQLITE_DONE) { - return -1; - } - - return 0; -} - -/* Delete file entry from the database */ -int flb_tail_db_file_delete(struct flb_tail_file *file, - struct flb_tail_config *ctx) -{ - int ret; - - /* Bind parameters */ - sqlite3_bind_int64(ctx->stmt_delete_file, 1, file->db_id); - ret = sqlite3_step(ctx->stmt_delete_file); - - sqlite3_clear_bindings(ctx->stmt_delete_file); - sqlite3_reset(ctx->stmt_delete_file); - - if (ret != SQLITE_DONE) { - flb_plg_error(ctx->ins, "db: error deleting entry from database: %s", - file->name); - return -1; - } - - flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name); - return 0; -} diff --git a/fluent-bit/plugins/in_tail/tail_db.h b/fluent-bit/plugins/in_tail/tail_db.h deleted file mode 100644 index 7b5355d22..000000000 --- a/fluent-bit/plugins/in_tail/tail_db.h +++ /dev/null @@ -1,43 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_DB_H -#define FLB_TAIL_DB_H - -#include <fluent-bit/flb_input.h> -#include <fluent-bit/flb_sqldb.h> - -#include "tail_file.h" - -struct flb_sqldb *flb_tail_db_open(const char *path, - struct flb_input_instance *in, - struct flb_tail_config *ctx, - struct flb_config *config); - -int flb_tail_db_close(struct flb_sqldb *db); -int flb_tail_db_file_set(struct flb_tail_file *file, - struct flb_tail_config *ctx); -int flb_tail_db_file_offset(struct flb_tail_file *file, - struct flb_tail_config *ctx); -int flb_tail_db_file_rotate(const char *new_name, - struct flb_tail_file *file, - struct flb_tail_config *ctx); -int flb_tail_db_file_delete(struct flb_tail_file *file, - struct flb_tail_config *ctx); -#endif diff --git a/fluent-bit/plugins/in_tail/tail_dockermode.c b/fluent-bit/plugins/in_tail/tail_dockermode.c deleted file mode 100644 index a8f20f9cd..000000000 --- a/fluent-bit/plugins/in_tail/tail_dockermode.c +++ /dev/null @@ -1,459 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_unescape.h> - -#include "tail_config.h" -#include "tail_dockermode.h" -#include "tail_file_internal.h" - -int flb_tail_dmode_create(struct flb_tail_config *ctx, - struct flb_input_instance *ins, - struct flb_config *config) -{ - const char *tmp; - - if (ctx->multiline == FLB_TRUE) { - flb_plg_error(ctx->ins, "Docker mode cannot be enabled when multiline " - "is enabled"); - return -1; - } - -#ifdef FLB_HAVE_REGEX - /* First line Parser */ - tmp = flb_input_get_property("docker_mode_parser", ins); - if (tmp) { - ctx->docker_mode_parser = flb_parser_get(tmp, config); - if (!ctx->docker_mode_parser) { - flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp); - } - } - else { - ctx->docker_mode_parser = NULL; - } -#endif - - tmp = flb_input_get_property("docker_mode_flush", ins); - if (!tmp) { - ctx->docker_mode_flush = FLB_TAIL_DMODE_FLUSH; - } - else { - ctx->docker_mode_flush = atoi(tmp); - if (ctx->docker_mode_flush <= 0) { - ctx->docker_mode_flush = 1; - } - } - - return 0; -} - -static int modify_json_cond(char *js, size_t js_len, - char **val, size_t *val_len, - char **out, size_t *out_len, - int cond(char*, size_t), - int mod(char*, size_t, char**, size_t*, void*), void *data) -{ - int ret; - struct flb_pack_state state; - jsmntok_t *t; - jsmntok_t *t_val = NULL; - int i; - int i_root = -1; - int i_key = -1; - char *old_val; - size_t old_val_len; - char *new_val = NULL; - size_t new_val_len = 0; - size_t mod_len; - - ret = flb_pack_state_init(&state); - if (ret != 0) { - ret = -1; - goto modify_json_cond_end; - } - - ret = flb_json_tokenise(js, js_len, &state); - if (ret != 0 || state.tokens_count == 0) { - ret = -1; - goto modify_json_cond_end; - } - - for (i = 0; i < state.tokens_count; i++) { - t = &state.tokens[i]; - - if (i_key >= 0) { - if (t->parent == i_key) { - if (t->type == JSMN_STRING) { - t_val = t; - } - break; - } - continue; - } - - if (t->start == 0 && t->parent == -1 && t->type == JSMN_OBJECT) { - i_root = i; - continue; - } - if (i_root == -1) { - continue; - } - - if (t->parent == i_root && t->type == JSMN_STRING && t->end - t->start == 3 && strncmp(js + t->start, "log", 3) == 0) { - i_key = i; - } - } - - if (!t_val) { - ret = -1; - goto modify_json_cond_end; - } - - *out = js; - *out_len = js_len; - - if (val) { - *val = js + t_val->start; - } - if (val_len) { - *val_len = t_val->end - t_val->start; - } - - if (!cond || cond(js + t_val->start, t_val->end - t_val->start)) { - old_val = js + t_val->start; - old_val_len = t_val->end - t_val->start; - ret = mod(old_val, old_val_len, &new_val, &new_val_len, data); - if (ret != 0) { - ret = -1; - goto modify_json_cond_end; - } - - ret = 1; - - if (new_val == old_val) { - goto modify_json_cond_end; - } - - mod_len = js_len + new_val_len - old_val_len; - *out = flb_malloc(mod_len); - if (!*out) { - flb_errno(); - flb_free(new_val); - ret = -1; - goto modify_json_cond_end; - } - *out_len = mod_len; - - memcpy(*out, js, t_val->start); - memcpy(*out + t_val->start, new_val, new_val_len); - memcpy(*out + t_val->start + new_val_len, js + t_val->end, js_len - t_val->end); - - flb_free(new_val); - } - - modify_json_cond_end: - flb_pack_state_reset(&state); - if (ret < 0) { - *out = NULL; - } - return ret; -} - -static int unesc_ends_with_nl(char *str, size_t len) -{ - char* unesc; - int unesc_len; - int nl; - - unesc = flb_malloc(len + 1); - if (!unesc) { - flb_errno(); - return FLB_FALSE; - } - unesc_len = flb_unescape_string(str, len, &unesc); - nl = unesc[unesc_len - 1] == '\n'; - flb_free(unesc); - return nl; -} - -static int prepend_sds_to_str(char *str, size_t len, char **out, size_t *out_len, void *data) -{ - flb_sds_t sds = data; - - if (flb_sds_len(sds) == 0) { - *out = str; - *out_len = len; - return 0; - } - - size_t mod_len = flb_sds_len(sds) + len; - *out = flb_malloc(mod_len); - if (!*out) { - flb_errno(); - return -1; - } - *out_len = mod_len; - - memcpy(*out, sds, flb_sds_len(sds)); - memcpy(*out + flb_sds_len(sds), str, len); - return 0; -} - -static int use_sds(char *str, size_t len, char **out, size_t *out_len, void *data) -{ - flb_sds_t sds = data; - size_t mod_len = flb_sds_len(sds); - *out = flb_malloc(mod_len); - if (!*out) { - flb_errno(); - return -1; - } - *out_len = mod_len; - - memcpy(*out, sds, flb_sds_len(sds)); - return 0; -} - -int flb_tail_dmode_process_content(time_t now, - char* line, size_t line_len, - char **repl_line, size_t *repl_line_len, - struct flb_tail_file *file, - struct flb_tail_config *ctx) -{ - char* val = NULL; - size_t val_len; - int ret; - void *out_buf = NULL; - size_t out_size; - struct flb_time out_time = {0}; - *repl_line = NULL; - *repl_line_len = 0; - flb_sds_t tmp; - flb_sds_t tmp_copy; - -#ifdef FLB_HAVE_REGEX - if (ctx->docker_mode_parser) { - ret = flb_parser_do(ctx->docker_mode_parser, line, line_len, - &out_buf, &out_size, &out_time); - flb_free(out_buf); - - /* - * Set dmode_firstline if the line meets the first-line requirement - */ - if(ret >= 0) { - file->dmode_firstline = true; - } - - /* - * Process if buffer contains full log line - */ - if (flb_sds_len(file->dmode_lastline) > 0 && file->dmode_complete) { - /* - * Buffered log should be flushed out - * as current line meets first-line requirement - */ - if(ret >= 0) { - flb_tail_dmode_flush(file, ctx); - } - - /* - * Flush the buffer if multiline has not been detected yet - */ - if (!file->dmode_firstline) { - flb_tail_dmode_flush(file, ctx); - } - } - } -#endif - - ret = modify_json_cond(line, line_len, - &val, &val_len, - repl_line, repl_line_len, - unesc_ends_with_nl, - prepend_sds_to_str, file->dmode_buf); - if (ret >= 0) { - /* line is a valid json */ - flb_sds_len_set(file->dmode_lastline, 0); - - /* concatenate current log line with buffered one */ - tmp = flb_sds_cat(file->dmode_buf, val, val_len); - if (!tmp) { - flb_errno(); - return -1; - } - file->dmode_buf = tmp; - - tmp_copy = flb_sds_copy(file->dmode_lastline, line, line_len); - if (!tmp_copy) { - flb_errno(); - return -1; - } - - file->dmode_lastline = tmp_copy; - file->dmode_flush_timeout = now + (ctx->docker_mode_flush - 1); - - if (ret == 0) { - /* Line not ended with newline */ - file->dmode_complete = false; - } - else { - /* Line ended with newline */ - file->dmode_complete = true; -#ifdef FLB_HAVE_REGEX - if (!ctx->docker_mode_parser) { - flb_tail_dmode_flush(file, ctx); - } -#else - flb_tail_dmode_flush(file, ctx); -#endif - } - } - return ret; -} - -void flb_tail_dmode_flush(struct flb_tail_file *file, struct flb_tail_config *ctx) -{ - int ret; - char *repl_line = NULL; - size_t repl_line_len = 0; - void *out_buf = NULL; - size_t out_size; - struct flb_time out_time = {0}; - time_t now = time(NULL); - - if (flb_sds_len(file->dmode_lastline) == 0) { - return; - } - - flb_time_zero(&out_time); - - ret = modify_json_cond(file->dmode_lastline, - flb_sds_len(file->dmode_lastline), - NULL, NULL, - &repl_line, &repl_line_len, - NULL, - use_sds, file->dmode_buf); - if (ret < 0) { - return; - } - - flb_sds_len_set(file->dmode_buf, 0); - flb_sds_len_set(file->dmode_lastline, 0); - file->dmode_flush_timeout = 0; - -#ifdef FLB_HAVE_REGEX - if (ctx->parser) { - ret = flb_parser_do(ctx->parser, repl_line, repl_line_len, - &out_buf, &out_size, &out_time); - if (ret >= 0) { - if (flb_time_to_double(&out_time) == 0) { - flb_time_get(&out_time); - } - if (ctx->ignore_older > 0 && (now - ctx->ignore_older) > out_time.tm.tv_sec) { - goto dmode_flush_end; - } - - flb_tail_pack_line_map(&out_time, (char**) &out_buf, &out_size, file, 0); - - goto dmode_flush_end; - } - } -#endif - - flb_tail_file_pack_line(NULL, repl_line, repl_line_len, file, 0); - - dmode_flush_end: - flb_free(repl_line); - flb_free(out_buf); -} - -static void file_pending_flush(struct flb_tail_config *ctx, - struct flb_tail_file *file, time_t now) -{ - if (file->dmode_flush_timeout > now) { - return; - } - - if (flb_sds_len(file->dmode_lastline) == 0) { - return; - } - - flb_tail_dmode_flush(file, ctx); - - if (file->sl_log_event_encoder->output_length > 0) { - flb_input_log_append(ctx->ins, - file->tag_buf, - file->tag_len, - file->sl_log_event_encoder->output_buffer, - file->sl_log_event_encoder->output_length); - - flb_log_event_encoder_reset(file->sl_log_event_encoder); - } -} - -int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx) -{ - time_t expired; - struct mk_list *head; - struct flb_tail_file *file; - - expired = time(NULL) + 3600; - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach(head, &ctx->files_static) { - file = mk_list_entry(head, struct flb_tail_file, _head); - file_pending_flush(ctx, file, expired); - } - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach(head, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - file_pending_flush(ctx, file, expired); - } - - return 0; -} - -int flb_tail_dmode_pending_flush(struct flb_input_instance *ins, - struct flb_config *config, void *context) -{ - time_t now; - struct mk_list *head; - struct flb_tail_file *file; - struct flb_tail_config *ctx = context; - - now = time(NULL); - - /* Iterate static event files with pending bytes */ - mk_list_foreach(head, &ctx->files_static) { - file = mk_list_entry(head, struct flb_tail_file, _head); - file_pending_flush(ctx, file, now); - } - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach(head, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - file_pending_flush(ctx, file, now); - } - - return 0; -} diff --git a/fluent-bit/plugins/in_tail/tail_dockermode.h b/fluent-bit/plugins/in_tail/tail_dockermode.h deleted file mode 100644 index 50869ff62..000000000 --- a/fluent-bit/plugins/in_tail/tail_dockermode.h +++ /dev/null @@ -1,38 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_DOCKERMODE_H -#define FLB_TAIL_DOCKERMODE_H - -#include "tail_file.h" -#define FLB_TAIL_DMODE_FLUSH 4 - -int flb_tail_dmode_create(struct flb_tail_config *ctx, - struct flb_input_instance *ins, struct flb_config *config); -int flb_tail_dmode_process_content(time_t now, - char* line, size_t line_len, - char **repl_line, size_t *repl_line_len, - struct flb_tail_file *file, - struct flb_tail_config *ctx); -void flb_tail_dmode_flush(struct flb_tail_file *file, struct flb_tail_config *ctx); -int flb_tail_dmode_pending_flush(struct flb_input_instance *ins, - struct flb_config *config, void *context); -int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx); - -#endif diff --git a/fluent-bit/plugins/in_tail/tail_file.c b/fluent-bit/plugins/in_tail/tail_file.c deleted file mode 100644 index 2385f0626..000000000 --- a/fluent-bit/plugins/in_tail/tail_file.c +++ /dev/null @@ -1,1860 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> -#include <time.h> -#ifdef FLB_SYSTEM_FREEBSD -#include <sys/user.h> -#include <libutil.h> -#endif - -#include <fluent-bit/flb_compat.h> -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_parser.h> -#ifdef FLB_HAVE_REGEX -#include <fluent-bit/flb_regex.h> -#include <fluent-bit/flb_hash_table.h> -#endif - -#include "tail.h" -#include "tail_file.h" -#include "tail_config.h" -#include "tail_db.h" -#include "tail_signal.h" -#include "tail_dockermode.h" -#include "tail_multiline.h" -#include "tail_scan.h" - -#ifdef FLB_SYSTEM_WINDOWS -#include "win32.h" -#endif - -#include <cfl/cfl.h> - -static inline void consume_bytes(char *buf, int bytes, int length) -{ - memmove(buf, buf + bytes, length - bytes); -} - -static uint64_t stat_get_st_dev(struct stat *st) -{ -#ifdef FLB_SYSTEM_WINDOWS - /* do you want to contribute with a way to extract volume serial number ? */ - return 0; -#else - return st->st_dev; -#endif -} - -static int stat_to_hash_bits(struct flb_tail_config *ctx, struct stat *st, - uint64_t *out_hash) -{ - int len; - uint64_t st_dev; - char tmp[64]; - - st_dev = stat_get_st_dev(st); - - len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 ":%" PRIu64, - st_dev, (uint64_t)st->st_ino); - - *out_hash = cfl_hash_64bits(tmp, len); - return 0; -} - -static int stat_to_hash_key(struct flb_tail_config *ctx, struct stat *st, - flb_sds_t *key) -{ - uint64_t st_dev; - flb_sds_t tmp; - flb_sds_t buf; - - buf = flb_sds_create_size(64); - if (!buf) { - return -1; - } - - st_dev = stat_get_st_dev(st); - tmp = flb_sds_printf(&buf, "%" PRIu64 ":%" PRIu64, - st_dev, (uint64_t)st->st_ino); - if (!tmp) { - flb_sds_destroy(buf); - return -1; - } - - *key = buf; - return 0; -} - -/* Append custom keys and report the number of records processed */ -static int record_append_custom_keys(struct flb_tail_file *file, - char *in_data, size_t in_size, - char **out_data, size_t *out_size) -{ - int i; - int ret; - int records = 0; - msgpack_object k; - msgpack_object v; - struct flb_log_event event; - struct flb_tail_config *ctx; - struct flb_log_event_encoder encoder; - struct flb_log_event_decoder decoder; - - ctx = (struct flb_tail_config *) file->config; - - ret = flb_log_event_decoder_init(&decoder, in_data, in_size); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - return -1; - } - - ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_log_event_decoder_destroy(&decoder); - - return -2; - } - - while (flb_log_event_decoder_next(&decoder, &event) == - FLB_EVENT_DECODER_SUCCESS) { - - ret = flb_log_event_encoder_begin_record(&encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp(&encoder, &event.timestamp); - } - - /* append previous map keys */ - for (i = 0; i < event.body->via.map.size; i++) { - k = event.body->via.map.ptr[i].key; - v = event.body->via.map.ptr[i].val; - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_msgpack_object( - &encoder, - &k); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_msgpack_object( - &encoder, - &v); - } - } - - /* path_key */ - if (ctx->path_key != NULL) { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_cstring( - &encoder, - file->config->path_key); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_cstring( - &encoder, - file->orig_name); - } - } - - /* offset_key */ - if (ctx->offset_key != NULL) { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_cstring( - &encoder, - file->config->offset_key); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_uint64( - &encoder, - file->offset + - file->last_processed_bytes); - } - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&encoder); - } - else { - flb_plg_error(file->config->ins, "error packing event : %d", ret); - - flb_log_event_encoder_rollback_record(&encoder); - } - - /* counter */ - records++; - } - - *out_data = encoder.output_buffer; - *out_size = encoder.output_length; - - /* This function transfers ownership of the internal memory allocated by - * sbuffer using msgpack_sbuffer_release which means the caller is - * responsible for releasing the memory. - */ - flb_log_event_encoder_claim_internal_buffer_ownership(&encoder); - - flb_log_event_decoder_destroy(&decoder); - flb_log_event_encoder_destroy(&encoder); - - return records; -} - -static int flb_tail_repack_map(struct flb_log_event_encoder *encoder, - char *data, - size_t data_size) -{ - msgpack_unpacked source_map; - size_t offset; - int result; - size_t index; - msgpack_object value; - msgpack_object key; - - result = FLB_EVENT_ENCODER_SUCCESS; - - if (data_size > 0) { - msgpack_unpacked_init(&source_map); - - offset = 0; - result = msgpack_unpack_next(&source_map, - data, - data_size, - &offset); - - if (result == MSGPACK_UNPACK_SUCCESS) { - result = FLB_EVENT_ENCODER_SUCCESS; - } - else { - result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; - } - - for (index = 0; - index < source_map.data.via.map.size && - result == FLB_EVENT_ENCODER_SUCCESS; - index++) { - key = source_map.data.via.map.ptr[index].key; - value = source_map.data.via.map.ptr[index].val; - - result = flb_log_event_encoder_append_body_msgpack_object( - encoder, - &key); - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_msgpack_object( - encoder, - &value); - } - } - - msgpack_unpacked_destroy(&source_map); - } - - return result; -} - -int flb_tail_pack_line_map(struct flb_time *time, char **data, - size_t *data_size, struct flb_tail_file *file, - size_t processed_bytes) -{ - int result; - - result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder); - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_set_timestamp( - file->sl_log_event_encoder, time); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_tail_repack_map(file->sl_log_event_encoder, - *data, - *data_size); - } - - /* path_key */ - if (file->config->path_key != NULL) { - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_values( - file->sl_log_event_encoder, - FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key), - FLB_LOG_EVENT_STRING_VALUE(file->orig_name, - file->orig_name_len)); - } - } - - /* offset_key */ - if (file->config->offset_key != NULL) { - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_values( - file->sl_log_event_encoder, - FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key), - FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes)); - } - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder); - } - else { - flb_log_event_encoder_rollback_record(file->sl_log_event_encoder); - } - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(file->config->ins, "error packing event"); - - return -1; - } - - return 0; -} - -int flb_tail_file_pack_line(struct flb_time *time, char *data, size_t data_size, - struct flb_tail_file *file, size_t processed_bytes) -{ - int result; - - result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder); - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_set_timestamp( - file->sl_log_event_encoder, time); - } - - /* path_key */ - if (file->config->path_key != NULL) { - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_values( - file->sl_log_event_encoder, - FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key), - FLB_LOG_EVENT_STRING_VALUE(file->orig_name, - file->orig_name_len)); - } - } - - /* offset_key */ - if (file->config->offset_key != NULL) { - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_values( - file->sl_log_event_encoder, - FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key), - FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes)); - } - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_values( - file->sl_log_event_encoder, - FLB_LOG_EVENT_CSTRING_VALUE(file->config->key), - FLB_LOG_EVENT_STRING_VALUE(data, - data_size)); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder); - } - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(file->config->ins, "error packing event : %d", result); - - return -1; - } - - return 0; -} - -static int ml_stream_buffer_append(struct flb_tail_file *file, char *buf_data, size_t buf_size) -{ - int result; - - result = flb_log_event_encoder_emit_raw_record( - file->ml_log_event_encoder, - buf_data, buf_size); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(file->config->ins, - "log event raw append error : %d", - result); - - return -1; - } - - return 0; -} - -static int ml_stream_buffer_flush(struct flb_tail_config *ctx, struct flb_tail_file *file) -{ - if (file->ml_log_event_encoder->output_length > 0) { - flb_input_log_append(ctx->ins, - file->tag_buf, - file->tag_len, - file->ml_log_event_encoder->output_buffer, - file->ml_log_event_encoder->output_length); - - flb_log_event_encoder_reset(file->ml_log_event_encoder); - } - - return 0; -} - -static int process_content(struct flb_tail_file *file, size_t *bytes) -{ - size_t len; - int lines = 0; - int ret; - size_t processed_bytes = 0; - char *data; - char *end; - char *p; - void *out_buf; - size_t out_size; - int crlf; - char *line; - size_t line_len; - char *repl_line; - size_t repl_line_len; - time_t now = time(NULL); - struct flb_time out_time = {0}; - struct flb_tail_config *ctx; - - ctx = (struct flb_tail_config *) file->config; - - /* Parse the data content */ - data = file->buf_data; - end = data + file->buf_len; - - /* reset last processed bytes */ - file->last_processed_bytes = 0; - - /* Skip null characters from the head (sometimes introduced by copy-truncate log rotation) */ - while (data < end && *data == '\0') { - data++; - processed_bytes++; - } - - while (data < end && (p = memchr(data, '\n', end - data))) { - len = (p - data); - crlf = 0; - if (file->skip_next == FLB_TRUE) { - data += len + 1; - processed_bytes += len + 1; - file->skip_next = FLB_FALSE; - continue; - } - - /* - * Empty line (just breakline) - * --------------------------- - * [NOTE] with the new Multiline core feature and Multiline Filter on - * Fluent Bit v1.8.2, there are a couple of cases where stack traces - * or multi line patterns expects an empty line (meaning only the - * breakline), skipping empty lines on this plugin will break that - * functionality. - * - * We are introducing 'skip_empty_lines=off' configuration - * property to revert this behavior if some user is affected by - * this change. - */ - - if (len == 0 && ctx->skip_empty_lines) { - data++; - processed_bytes++; - continue; - } - - /* Process '\r\n' */ - if (len >= 2) { - crlf = (data[len-1] == '\r'); - if (len == 1 && crlf) { - data += 2; - processed_bytes += 2; - continue; - } - } - - /* Reset time for each line */ - flb_time_zero(&out_time); - - line = data; - line_len = len - crlf; - repl_line = NULL; - - if (ctx->ml_ctx) { - ret = flb_ml_append_text(ctx->ml_ctx, - file->ml_stream_id, - &out_time, - line, - line_len); - goto go_next; - } - else if (ctx->docker_mode) { - ret = flb_tail_dmode_process_content(now, line, line_len, - &repl_line, &repl_line_len, - file, ctx); - if (ret >= 0) { - if (repl_line == line) { - repl_line = NULL; - } - else { - line = repl_line; - line_len = repl_line_len; - } - /* Skip normal parsers flow */ - goto go_next; - } - else { - flb_tail_dmode_flush(file, ctx); - } - } - -#ifdef FLB_HAVE_PARSER - if (ctx->parser) { - /* Common parser (non-multiline) */ - ret = flb_parser_do(ctx->parser, line, line_len, - &out_buf, &out_size, &out_time); - if (ret >= 0) { - if (flb_time_to_nanosec(&out_time) == 0L) { - flb_time_get(&out_time); - } - - /* If multiline is enabled, flush any buffered data */ - if (ctx->multiline == FLB_TRUE) { - flb_tail_mult_flush(file, ctx); - } - - flb_tail_pack_line_map(&out_time, - (char**) &out_buf, &out_size, file, - processed_bytes); - - flb_free(out_buf); - } - else { - /* Parser failed, pack raw text */ - flb_tail_file_pack_line(NULL, data, len, file, processed_bytes); - } - } - else if (ctx->multiline == FLB_TRUE) { - ret = flb_tail_mult_process_content(now, - line, line_len, - file, ctx, processed_bytes); - - /* No multiline */ - if (ret == FLB_TAIL_MULT_NA) { - flb_tail_mult_flush(file, ctx); - - flb_tail_file_pack_line(NULL, - line, line_len, file, processed_bytes); - } - else if (ret == FLB_TAIL_MULT_MORE) { - /* we need more data, do nothing */ - goto go_next; - } - else if (ret == FLB_TAIL_MULT_DONE) { - /* Finalized */ - } - } - else { - flb_tail_file_pack_line(NULL, - line, line_len, file, processed_bytes); - } -#else - flb_tail_file_pack_line(NULL, - line, line_len, file, processed_bytes); -#endif - - go_next: - flb_free(repl_line); - repl_line = NULL; - /* Adjust counters */ - data += len + 1; - processed_bytes += len + 1; - lines++; - file->parsed = 0; - file->last_processed_bytes += processed_bytes; - } - file->parsed = file->buf_len; - - if (lines > 0) { - /* Append buffer content to a chunk */ - *bytes = processed_bytes; - - if (file->sl_log_event_encoder->output_length > 0) { - flb_input_log_append_records(ctx->ins, - lines, - file->tag_buf, - file->tag_len, - file->sl_log_event_encoder->output_buffer, - file->sl_log_event_encoder->output_length); - - flb_log_event_encoder_reset(file->sl_log_event_encoder); - } - } - else if (file->skip_next) { - *bytes = file->buf_len; - } - else { - *bytes = processed_bytes; - } - - if (ctx->ml_ctx) { - ml_stream_buffer_flush(ctx, file); - } - - return lines; -} - -static inline void drop_bytes(char *buf, size_t len, int pos, int bytes) -{ - memmove(buf + pos, - buf + pos + bytes, - len - pos - bytes); -} - -#ifdef FLB_HAVE_REGEX -static void cb_results(const char *name, const char *value, - size_t vlen, void *data) -{ - struct flb_hash_table *ht = data; - - if (vlen == 0) { - return; - } - - flb_hash_table_add(ht, name, strlen(name), (void *) value, vlen); -} -#endif - -#ifdef FLB_HAVE_REGEX -static int tag_compose(char *tag, struct flb_regex *tag_regex, char *fname, - char *out_buf, size_t *out_size, - struct flb_tail_config *ctx) -#else -static int tag_compose(char *tag, char *fname, char *out_buf, size_t *out_size, - struct flb_tail_config *ctx) -#endif -{ - int i; - size_t len; - char *p; - size_t buf_s = 0; -#ifdef FLB_HAVE_REGEX - ssize_t n; - struct flb_regex_search result; - struct flb_hash_table *ht; - char *beg; - char *end; - int ret; - const char *tmp; - size_t tmp_s; -#endif - -#ifdef FLB_HAVE_REGEX - if (tag_regex) { - n = flb_regex_do(tag_regex, fname, strlen(fname), &result); - if (n <= 0) { - flb_plg_error(ctx->ins, "invalid tag_regex pattern for file %s", - fname); - return -1; - } - else { - ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, - FLB_HASH_TABLE_SIZE, FLB_HASH_TABLE_SIZE); - flb_regex_parse(tag_regex, &result, cb_results, ht); - - for (p = tag, beg = p; (beg = strchr(p, '<')); p = end + 2) { - if (beg != p) { - len = (beg - p); - memcpy(out_buf + buf_s, p, len); - buf_s += len; - } - - beg++; - - end = strchr(beg, '>'); - if (end && !memchr(beg, '<', end - beg)) { - end--; - - len = end - beg + 1; - ret = flb_hash_table_get(ht, beg, len, (void *) &tmp, &tmp_s); - if (ret != -1) { - memcpy(out_buf + buf_s, tmp, tmp_s); - buf_s += tmp_s; - } - else { - memcpy(out_buf + buf_s, "_", 1); - buf_s++; - } - } - else { - flb_plg_error(ctx->ins, - "missing closing angle bracket in tag %s " - "at position %lu", tag, beg - tag); - flb_hash_table_destroy(ht); - return -1; - } - } - - flb_hash_table_destroy(ht); - if (*p) { - len = strlen(p); - memcpy(out_buf + buf_s, p, len); - buf_s += len; - } - } - } - else { -#endif - p = strchr(tag, '*'); - if (!p) { - return -1; - } - - /* Copy tag prefix if any */ - len = (p - tag); - if (len > 0) { - memcpy(out_buf, tag, len); - buf_s += len; - } - - /* Append file name */ - len = strlen(fname); - memcpy(out_buf + buf_s, fname, len); - buf_s += len; - - /* Tag suffix (if any) */ - p++; - if (*p) { - len = strlen(tag); - memcpy(out_buf + buf_s, p, (len - (p - tag))); - buf_s += (len - (p - tag)); - } - - /* Sanitize buffer */ - for (i = 0; i < buf_s; i++) { - if (out_buf[i] == '/' || out_buf[i] == '\\' || out_buf[i] == ':') { - if (i > 0) { - out_buf[i] = '.'; - } - else { - drop_bytes(out_buf, buf_s, i, 1); - buf_s--; - i--; - } - } - - if (i > 0 && out_buf[i] == '.') { - if (out_buf[i - 1] == '.') { - drop_bytes(out_buf, buf_s, i, 1); - buf_s--; - i--; - } - } - else if (out_buf[i] == '*') { - drop_bytes(out_buf, buf_s, i, 1); - buf_s--; - i--; - } - } - - /* Check for an ending '.' */ - if (out_buf[buf_s - 1] == '.') { - drop_bytes(out_buf, buf_s, buf_s - 1, 1); - buf_s--; - } -#ifdef FLB_HAVE_REGEX - } -#endif - - out_buf[buf_s] = '\0'; - *out_size = buf_s; - - return 0; -} - -static inline int flb_tail_file_exists(struct stat *st, - struct flb_tail_config *ctx) -{ - int ret; - uint64_t hash; - - ret = stat_to_hash_bits(ctx, st, &hash); - if (ret != 0) { - return -1; - } - - /* static hash */ - if (flb_hash_table_exists(ctx->static_hash, hash)) { - return FLB_TRUE; - } - - /* event hash */ - if (flb_hash_table_exists(ctx->event_hash, hash)) { - return FLB_TRUE; - } - - return FLB_FALSE; -} - -/* - * Based in the configuration or database offset, set the proper 'offset' for the - * file in question. - */ -static int set_file_position(struct flb_tail_config *ctx, - struct flb_tail_file *file) -{ - int64_t ret; - -#ifdef FLB_HAVE_SQLDB - /* - * If the database option is enabled, try to gather the file position. The - * database function updates the file->offset entry. - */ - if (ctx->db) { - ret = flb_tail_db_file_set(file, ctx); - if (ret == 0) { - if (file->offset > 0) { - ret = lseek(file->fd, file->offset, SEEK_SET); - if (ret == -1) { - flb_errno(); - return -1; - } - } - else if (ctx->read_from_head == FLB_FALSE) { - ret = lseek(file->fd, 0, SEEK_END); - if (ret == -1) { - flb_errno(); - return -1; - } - file->offset = ret; - flb_tail_db_file_offset(file, ctx); - } - return 0; - } - } -#endif - - if (ctx->read_from_head == FLB_TRUE) { - /* no need to seek, offset position is already zero */ - return 0; - } - - /* tail... */ - ret = lseek(file->fd, 0, SEEK_END); - if (ret == -1) { - flb_errno(); - return -1; - } - file->offset = ret; - - return 0; -} - -/* Multiline flush callback: invoked every time some content is complete */ -static int ml_flush_callback(struct flb_ml_parser *parser, - struct flb_ml_stream *mst, - void *data, char *buf_data, size_t buf_size) -{ - int result; - size_t mult_size = 0; - char *mult_buf = NULL; - struct flb_tail_file *file = data; - struct flb_tail_config *ctx = file->config; - - if (ctx->path_key == NULL && ctx->offset_key == NULL) { - ml_stream_buffer_append(file, buf_data, buf_size); - } - else { - /* adjust the records in a new buffer */ - result = record_append_custom_keys(file, - buf_data, - buf_size, - &mult_buf, - &mult_size); - - if (result < 0) { - ml_stream_buffer_append(file, buf_data, buf_size); - } - else { - ml_stream_buffer_append(file, mult_buf, mult_size); - - flb_free(mult_buf); - } - } - - if (mst->forced_flush) { - ml_stream_buffer_flush(ctx, file); - } - - return 0; -} - -int flb_tail_file_append(char *path, struct stat *st, int mode, - struct flb_tail_config *ctx) -{ - int fd; - int ret; - uint64_t stream_id; - uint64_t ts; - uint64_t hash_bits; - flb_sds_t hash_key; - size_t len; - char *tag; - char *name; - size_t tag_len; - struct flb_tail_file *file; - struct stat lst; - flb_sds_t inode_str; - - if (!S_ISREG(st->st_mode)) { - return -1; - } - - if (flb_tail_file_exists(st, ctx) == FLB_TRUE) { - return -1; - } - - fd = open(path, O_RDONLY); - if (fd == -1) { - flb_errno(); - flb_plg_error(ctx->ins, "cannot open %s", path); - return -1; - } - - file = flb_calloc(1, sizeof(struct flb_tail_file)); - if (!file) { - flb_errno(); - goto error; - } - - /* Initialize */ - file->watch_fd = -1; - file->fd = fd; - - /* On non-windows environments check if the original path is a link */ - ret = lstat(path, &lst); - if (ret == 0) { - if (S_ISLNK(lst.st_mode)) { - file->is_link = FLB_TRUE; - file->link_inode = lst.st_ino; - } - } - - /* get unique hash for this file */ - ret = stat_to_hash_bits(ctx, st, &hash_bits); - if (ret != 0) { - flb_plg_error(ctx->ins, "error procesisng hash bits for file %s", path); - goto error; - } - file->hash_bits = hash_bits; - - /* store the hash key used for hash_bits */ - ret = stat_to_hash_key(ctx, st, &hash_key); - if (ret != 0) { - flb_plg_error(ctx->ins, "error procesisng hash key for file %s", path); - goto error; - } - file->hash_key = hash_key; - - file->inode = st->st_ino; - file->offset = 0; - file->size = st->st_size; - file->buf_len = 0; - file->parsed = 0; - file->config = ctx; - file->tail_mode = mode; - file->tag_len = 0; - file->tag_buf = NULL; - file->rotated = 0; - file->pending_bytes = 0; - file->mult_firstline = FLB_FALSE; - file->mult_keys = 0; - file->mult_flush_timeout = 0; - file->mult_skipping = FLB_FALSE; - - /* - * Duplicate string into 'file' structure, the called function - * take cares to resolve real-name of the file in case we are - * running in a non-Linux system. - * - * Depending of the operating system, the way to obtain the file - * name associated to it file descriptor can have different behaviors - * specifically if it root path it's under a symbolic link. On Linux - * we can trust the file name but in others it's better to solve it - * with some extra calls. - */ - ret = flb_tail_file_name_dup(path, file); - if (!file->name) { - flb_errno(); - goto error; - } - - /* We keep a copy of the initial filename in orig_name. This is required - * for path_key to continue working after rotation. */ - file->orig_name = flb_strdup(file->name); - if (!file->orig_name) { - flb_free(file->name); - flb_errno(); - goto error; - } - file->orig_name_len = file->name_len; - - /* multiline msgpack buffers */ - file->mult_records = 0; - msgpack_sbuffer_init(&file->mult_sbuf); - msgpack_packer_init(&file->mult_pck, &file->mult_sbuf, - msgpack_sbuffer_write); - - /* docker mode */ - file->dmode_flush_timeout = 0; - file->dmode_complete = true; - file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0); - file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0); - file->dmode_firstline = false; -#ifdef FLB_HAVE_SQLDB - file->db_id = 0; -#endif - file->skip_next = FLB_FALSE; - file->skip_warn = FLB_FALSE; - - /* Multiline core mode */ - if (ctx->ml_ctx) { - /* - * Create inode str to get stream_id. - * - * If stream_id is created by filename, - * it will be same after file rotation and it causes invalid destruction: - * - * - https://github.com/fluent/fluent-bit/issues/4190 - */ - inode_str = flb_sds_create_size(64); - flb_sds_printf(&inode_str, "%"PRIu64, file->inode); - - /* Create a stream for this file */ - ret = flb_ml_stream_create(ctx->ml_ctx, - inode_str, flb_sds_len(inode_str), - ml_flush_callback, file, - &stream_id); - if (ret != 0) { - flb_plg_error(ctx->ins, - "could not create multiline stream for file: %s", - inode_str); - flb_sds_destroy(inode_str); - goto error; - } - file->ml_stream_id = stream_id; - flb_sds_destroy(inode_str); - - /* - * Multiline core file buffer: the multiline core functionality invokes a callback everytime a message is ready - * to be processed by the caller, this can be a multiline message or a message that is considered 'complete'. In - * the previous version of Tail, when it received a message this message was automatically ingested into the pipeline - * without any previous buffering which leads to performance degradation. - * - * The msgpack buffer 'ml_sbuf' keeps all ML provided records and it's flushed just when the file processor finish - * processing the "read() bytes". - */ - } - - /* Local buffer */ - file->buf_size = ctx->buf_chunk_size; - file->buf_data = flb_malloc(file->buf_size); - if (!file->buf_data) { - flb_errno(); - goto error; - } - - /* Initialize (optional) dynamic tag */ - if (ctx->dynamic_tag == FLB_TRUE) { - len = ctx->ins->tag_len + strlen(path) + 1; - tag = flb_malloc(len); - if (!tag) { - flb_errno(); - flb_plg_error(ctx->ins, "failed to allocate tag buffer"); - goto error; - } -#ifdef FLB_HAVE_REGEX - ret = tag_compose(ctx->ins->tag, ctx->tag_regex, path, tag, &tag_len, ctx); -#else - ret = tag_compose(ctx->ins->tag, path, tag, &tag_len, ctx); -#endif - if (ret == 0) { - file->tag_len = tag_len; - file->tag_buf = flb_strdup(tag); - } - flb_free(tag); - if (ret != 0) { - flb_plg_error(ctx->ins, "failed to compose tag for file: %s", path); - goto error; - } - } - else { - file->tag_len = strlen(ctx->ins->tag); - file->tag_buf = flb_strdup(ctx->ins->tag); - } - if (!file->tag_buf) { - flb_plg_error(ctx->ins, "failed to set tag for file: %s", path); - flb_errno(); - goto error; - } - - if (mode == FLB_TAIL_STATIC) { - mk_list_add(&file->_head, &ctx->files_static); - ctx->files_static_count++; - flb_hash_table_add(ctx->static_hash, file->hash_key, flb_sds_len(file->hash_key), - file, sizeof(file)); - tail_signal_manager(file->config); - } - else if (mode == FLB_TAIL_EVENT) { - mk_list_add(&file->_head, &ctx->files_event); - flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key), - file, sizeof(file)); - - /* Register this file into the fs_event monitoring */ - ret = flb_tail_fs_add(ctx, file); - if (ret == -1) { - flb_plg_error(ctx->ins, "could not register file into fs_events"); - goto error; - } - } - - /* Set the file position (database offset, head or tail) */ - ret = set_file_position(ctx, file); - if (ret == -1) { - flb_tail_file_remove(file); - goto error; - } - - /* Remaining bytes to read */ - file->pending_bytes = file->size - file->offset; - -#ifdef FLB_HAVE_METRICS - name = (char *) flb_input_name(ctx->ins); - ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_files_opened, ts, 1, (char *[]) {name}); - - /* Old api */ - flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics); -#endif - - file->sl_log_event_encoder = flb_log_event_encoder_create( - FLB_LOG_EVENT_FORMAT_DEFAULT); - - if (file->sl_log_event_encoder == NULL) { - flb_tail_file_remove(file); - - goto error; - } - - file->ml_log_event_encoder = flb_log_event_encoder_create( - FLB_LOG_EVENT_FORMAT_DEFAULT); - - if (file->ml_log_event_encoder == NULL) { - flb_tail_file_remove(file); - - goto error; - } - - flb_plg_debug(ctx->ins, - "inode=%"PRIu64" with offset=%"PRId64" appended as %s", - file->inode, file->offset, path); - return 0; - -error: - if (file) { - if (file->buf_data) { - flb_free(file->buf_data); - } - if (file->name) { - flb_free(file->name); - } - flb_free(file); - } - close(fd); - - return -1; -} - -void flb_tail_file_remove(struct flb_tail_file *file) -{ - uint64_t ts; - char *name; - struct flb_tail_config *ctx; - - ctx = file->config; - - flb_plg_debug(ctx->ins, "inode=%"PRIu64" removing file name %s", - file->inode, file->name); - - /* remove the multiline.core stream */ - if (ctx->ml_ctx && file->ml_stream_id > 0) { - /* destroy ml stream */ - flb_ml_stream_id_destroy_all(ctx->ml_ctx, file->ml_stream_id); - } - - if (file->rotated > 0) { -#ifdef FLB_HAVE_SQLDB - /* - * Make sure to remove a the file entry from the database if the file - * was rotated and it's not longer being monitored. - */ - if (ctx->db) { - flb_tail_db_file_delete(file, file->config); - } -#endif - mk_list_del(&file->_rotate_head); - } - - msgpack_sbuffer_destroy(&file->mult_sbuf); - - if (file->sl_log_event_encoder != NULL) { - flb_log_event_encoder_destroy(file->sl_log_event_encoder); - } - - if (file->ml_log_event_encoder != NULL) { - flb_log_event_encoder_destroy(file->ml_log_event_encoder); - } - - flb_sds_destroy(file->dmode_buf); - flb_sds_destroy(file->dmode_lastline); - mk_list_del(&file->_head); - flb_tail_fs_remove(ctx, file); - - /* avoid deleting file with -1 fd */ - if (file->fd != -1) { - close(file->fd); - } - if (file->tag_buf) { - flb_free(file->tag_buf); - } - - /* remove any potential entry from the hash tables */ - flb_hash_table_del(ctx->static_hash, file->hash_key); - flb_hash_table_del(ctx->event_hash, file->hash_key); - - flb_free(file->buf_data); - flb_free(file->name); - flb_free(file->orig_name); - flb_free(file->real_name); - flb_sds_destroy(file->hash_key); - -#ifdef FLB_HAVE_METRICS - name = (char *) flb_input_name(ctx->ins); - ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_files_closed, ts, 1, (char *[]) {name}); - - /* old api */ - flb_metrics_sum(FLB_TAIL_METRIC_F_CLOSED, 1, ctx->ins->metrics); -#endif - - flb_free(file); -} - -int flb_tail_file_remove_all(struct flb_tail_config *ctx) -{ - int count = 0; - struct mk_list *head; - struct mk_list *tmp; - struct flb_tail_file *file; - - mk_list_foreach_safe(head, tmp, &ctx->files_static) { - file = mk_list_entry(head, struct flb_tail_file, _head); - flb_tail_file_remove(file); - count++; - } - - mk_list_foreach_safe(head, tmp, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - flb_tail_file_remove(file); - count++; - } - - return count; -} - -static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *file) -{ - int ret; - int64_t offset; - struct stat st; - - ret = fstat(file->fd, &st); - if (ret == -1) { - flb_errno(); - return FLB_TAIL_ERROR; - } - - /* Check if the file was truncated */ - if (file->offset > st.st_size) { - offset = lseek(file->fd, 0, SEEK_SET); - if (offset == -1) { - flb_errno(); - return FLB_TAIL_ERROR; - } - - flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s", - file->inode, file->name); - file->offset = offset; - file->buf_len = 0; - - /* Update offset in the database file */ -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - flb_tail_db_file_offset(file, ctx); - } -#endif - } - else { - file->size = st.st_size; - file->pending_bytes = (st.st_size - file->offset); - } - - return FLB_TAIL_OK; -} - -int flb_tail_file_chunk(struct flb_tail_file *file) -{ - int ret; - char *tmp; - size_t size; - size_t capacity; - size_t processed_bytes; - ssize_t bytes; - struct flb_tail_config *ctx; - - /* Check if we the engine issued a pause */ - ctx = file->config; - if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { - return FLB_TAIL_BUSY; - } - - capacity = (file->buf_size - file->buf_len) - 1; - if (capacity < 1) { - /* - * If there is no more room for more data, try to increase the - * buffer under the limit of buffer_max_size. - */ - if (file->buf_size >= ctx->buf_max_size) { - if (ctx->skip_long_lines == FLB_FALSE) { - flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, " - "lines are too long. Skipping file.", file->name); - return FLB_TAIL_ERROR; - } - - /* Warn the user */ - if (file->skip_warn == FLB_FALSE) { - flb_plg_warn(ctx->ins, "file=%s have long lines. " - "Skipping long lines.", file->name); - file->skip_warn = FLB_TRUE; - } - - /* Do buffer adjustments */ - file->offset += file->buf_len; - file->buf_len = 0; - file->skip_next = FLB_TRUE; - } - else { - size = file->buf_size + ctx->buf_chunk_size; - if (size > ctx->buf_max_size) { - size = ctx->buf_max_size; - } - - /* Increase the buffer size */ - tmp = flb_realloc(file->buf_data, size); - if (tmp) { - flb_plg_trace(ctx->ins, "file=%s increase buffer size " - "%lu => %lu bytes", - file->name, file->buf_size, size); - file->buf_data = tmp; - file->buf_size = size; - } - else { - flb_errno(); - flb_plg_error(ctx->ins, "cannot increase buffer size for %s, " - "skipping file.", file->name); - return FLB_TAIL_ERROR; - } - } - capacity = (file->buf_size - file->buf_len) - 1; - } - - bytes = read(file->fd, file->buf_data + file->buf_len, capacity); - if (bytes > 0) { - /* we read some data, let the content processor take care of it */ - file->buf_len += bytes; - file->buf_data[file->buf_len] = '\0'; - - /* Now that we have some data in the buffer, call the data processor - * which aims to cut lines and register the entries into the engine. - * - * The returned value is the absolute offset the file must be seek - * now. It may need to get back a few bytes at the beginning of a new - * line. - */ - ret = process_content(file, &processed_bytes); - if (ret < 0) { - flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR", - file->inode, file->name); - return FLB_TAIL_ERROR; - } - - /* Adjust the file offset and buffer */ - file->offset += processed_bytes; - consume_bytes(file->buf_data, processed_bytes, file->buf_len); - file->buf_len -= processed_bytes; - file->buf_data[file->buf_len] = '\0'; - -#ifdef FLB_HAVE_SQLDB - if (file->config->db) { - flb_tail_db_file_offset(file, file->config); - } -#endif - - /* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */ - ret = adjust_counters(ctx, file); - - /* Data was consumed but likely some bytes still remain */ - return ret; - } - else if (bytes == 0) { - /* We reached the end of file, let's wait for some incoming data */ - ret = adjust_counters(ctx, file); - if (ret == FLB_TAIL_OK) { - return FLB_TAIL_WAIT; - } - else { - return FLB_TAIL_ERROR; - } - } - else { - /* error */ - flb_errno(); - flb_plg_error(ctx->ins, "error reading %s", file->name); - return FLB_TAIL_ERROR; - } - - return FLB_TAIL_ERROR; -} - -/* Returns FLB_TRUE if a file has been rotated, otherwise FLB_FALSE */ -int flb_tail_file_is_rotated(struct flb_tail_config *ctx, - struct flb_tail_file *file) -{ - int ret; - char *name; - struct stat st; - - /* - * Do not double-check already rotated files since the caller of this - * function will trigger a rotation. - */ - if (file->rotated != 0) { - return FLB_FALSE; - } - - /* Check if the 'original monitored file' is a link and rotated */ - if (file->is_link == FLB_TRUE) { - ret = lstat(file->name, &st); - if (ret == -1) { - /* Broken link or missing file */ - if (errno == ENOENT) { - flb_plg_info(ctx->ins, "inode=%"PRIu64" link_rotated: %s", - file->link_inode, file->name); - return FLB_TRUE; - } - else { - flb_errno(); - flb_plg_error(ctx->ins, - "link_inode=%"PRIu64" cannot detect if file: %s", - file->link_inode, file->name); - return -1; - } - } - else { - /* The file name is there, check if the same that we have */ - if (st.st_ino != file->link_inode) { - return FLB_TRUE; - } - } - } - - /* Retrieve the real file name, operating system lookup */ - name = flb_tail_file_name(file); - if (!name) { - flb_plg_error(ctx->ins, - "inode=%"PRIu64" cannot detect if file was rotated: %s", - file->inode, file->name); - return -1; - } - - - /* Get stats from the file name */ - ret = stat(name, &st); - if (ret == -1) { - flb_errno(); - flb_free(name); - return -1; - } - - /* Compare inodes and names */ - if (file->inode == st.st_ino && - flb_tail_target_file_name_cmp(name, file) == 0) { - flb_free(name); - return FLB_FALSE; - } - - flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated: %s => %s", - file->inode, file->name, name); - - flb_free(name); - return FLB_TRUE; -} - -/* Promote a event in the static list to the dynamic 'events' interface */ -int flb_tail_file_to_event(struct flb_tail_file *file) -{ - int ret; - struct stat st; - struct flb_tail_config *ctx = file->config; - - /* Check if the file promoted have pending bytes */ - ret = fstat(file->fd, &st); - if (ret != 0) { - flb_errno(); - return -1; - } - - if (file->offset < st.st_size) { - file->pending_bytes = (st.st_size - file->offset); - tail_signal_pending(file->config); - } - else { - file->pending_bytes = 0; - } - - /* Check if the file has been rotated */ - ret = flb_tail_file_is_rotated(ctx, file); - if (ret == FLB_TRUE) { - flb_tail_file_rotated(file); - } - - /* Notify the fs-event handler that we will start monitoring this 'file' */ - ret = flb_tail_fs_add(ctx, file); - if (ret == -1) { - return -1; - } - - /* List swap: change from 'static' to 'event' list */ - mk_list_del(&file->_head); - ctx->files_static_count--; - flb_hash_table_del(ctx->static_hash, file->hash_key); - - mk_list_add(&file->_head, &file->config->files_event); - flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key), - file, sizeof(file)); - - file->tail_mode = FLB_TAIL_EVENT; - - return 0; -} - -/* - * Given an open file descriptor, return the filename. This function is a - * bit slow and it aims to be used only when a file is rotated. - */ -char *flb_tail_file_name(struct flb_tail_file *file) -{ - int ret; - char *buf; -#ifdef __linux__ - ssize_t s; - char tmp[128]; -#elif defined(__APPLE__) - char path[PATH_MAX]; -#elif defined(FLB_SYSTEM_WINDOWS) - HANDLE h; -#elif defined(FLB_SYSTEM_FREEBSD) - struct kinfo_file *file_entries; - int file_count; - int file_index; -#endif - - buf = flb_malloc(PATH_MAX); - if (!buf) { - flb_errno(); - return NULL; - } - -#ifdef __linux__ - ret = snprintf(tmp, sizeof(tmp) - 1, "/proc/%i/fd/%i", getpid(), file->fd); - if (ret == -1) { - flb_errno(); - flb_free(buf); - return NULL; - } - - s = readlink(tmp, buf, PATH_MAX); - if (s == -1) { - flb_free(buf); - flb_errno(); - return NULL; - } - buf[s] = '\0'; - -#elif __APPLE__ - int len; - - ret = fcntl(file->fd, F_GETPATH, path); - if (ret == -1) { - flb_errno(); - flb_free(buf); - return NULL; - } - - len = strlen(path); - memcpy(buf, path, len); - buf[len] = '\0'; - -#elif defined(FLB_SYSTEM_WINDOWS) - int len; - - h = (HANDLE) _get_osfhandle(file->fd); - if (h == INVALID_HANDLE_VALUE) { - flb_errno(); - flb_free(buf); - return NULL; - } - - /* This function returns the length of the string excluding "\0" - * and the resulting path has a "\\?\" prefix. - */ - len = GetFinalPathNameByHandleA(h, buf, PATH_MAX, FILE_NAME_NORMALIZED); - if (len == 0 || len >= PATH_MAX) { - flb_free(buf); - return NULL; - } - - if (strstr(buf, "\\\\?\\")) { - memmove(buf, buf + 4, len + 1); - } -#elif defined(FLB_SYSTEM_FREEBSD) - if ((file_entries = kinfo_getfile(getpid(), &file_count)) == NULL) { - flb_free(buf); - return NULL; - } - - for (file_index=0; file_index < file_count; file_index++) { - if (file_entries[file_index].kf_fd == file->fd) { - strncpy(buf, file_entries[file_index].kf_path, PATH_MAX - 1); - buf[PATH_MAX - 1] = 0; - break; - } - } - free(file_entries); -#endif - return buf; -} - -int flb_tail_file_name_dup(char *path, struct flb_tail_file *file) -{ - file->name = flb_strdup(path); - if (!file->name) { - flb_errno(); - return -1; - } - file->name_len = strlen(file->name); - - if (file->real_name) { - flb_free(file->real_name); - } - - file->real_name = flb_tail_file_name(file); - if (!file->real_name) { - flb_errno(); - flb_free(file->name); - file->name = NULL; - return -1; - } - - return 0; -} - -/* Invoked every time a file was rotated */ -int flb_tail_file_rotated(struct flb_tail_file *file) -{ - int ret; - uint64_t ts; - char *name; - char *i_name; - char *tmp; - struct stat st; - struct flb_tail_config *ctx = file->config; - - /* Get the new file name */ - name = flb_tail_file_name(file); - if (!name) { - return -1; - } - - flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated %s -> %s", - file->inode, file->name, name); - - /* Update local file entry */ - tmp = file->name; - flb_tail_file_name_dup(name, file); - flb_plg_info(ctx->ins, "inode=%"PRIu64" handle rotation(): %s => %s", - file->inode, tmp, file->name); - if (file->rotated == 0) { - file->rotated = time(NULL); - mk_list_add(&file->_rotate_head, &file->config->files_rotated); - - /* Rotate the file in the database */ -#ifdef FLB_HAVE_SQLDB - if (file->config->db) { - ret = flb_tail_db_file_rotate(name, file, file->config); - if (ret == -1) { - flb_plg_error(ctx->ins, "could not rotate file %s->%s in database", - file->name, name); - } - } -#endif - -#ifdef FLB_HAVE_METRICS - i_name = (char *) flb_input_name(ctx->ins); - ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_files_rotated, ts, 1, (char *[]) {i_name}); - - /* OLD api */ - flb_metrics_sum(FLB_TAIL_METRIC_F_ROTATED, - 1, file->config->ins->metrics); -#endif - - /* Check if a new file has been created */ - ret = stat(tmp, &st); - if (ret == 0 && st.st_ino != file->inode) { - if (flb_tail_file_exists(&st, ctx) == FLB_FALSE) { - ret = flb_tail_file_append(tmp, &st, FLB_TAIL_STATIC, ctx); - if (ret == -1) { - flb_tail_scan(ctx->path_list, ctx); - } - else { - tail_signal_manager(file->config); - } - } - } - } - flb_free(tmp); - flb_free(name); - - return 0; -} - -static int check_purge_deleted_file(struct flb_tail_config *ctx, - struct flb_tail_file *file, time_t ts) -{ - int ret; - struct stat st; - - ret = fstat(file->fd, &st); - if (ret == -1) { - flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name); - flb_tail_file_remove(file); - return FLB_TRUE; - } - - if (st.st_nlink == 0) { - flb_plg_debug(ctx->ins, "purge: monitored file has been deleted: %s", - file->name); -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - /* Remove file entry from the database */ - flb_tail_db_file_delete(file, file->config); - } -#endif - /* Remove file from the monitored list */ - flb_tail_file_remove(file); - return FLB_TRUE; - } - - return FLB_FALSE; -} - -/* Purge rotated and deleted files */ -int flb_tail_file_purge(struct flb_input_instance *ins, - struct flb_config *config, void *context) -{ - int ret; - int count = 0; - struct mk_list *tmp; - struct mk_list *head; - struct flb_tail_file *file; - struct flb_tail_config *ctx = context; - time_t now; - struct stat st; - - /* Rotated files */ - now = time(NULL); - mk_list_foreach_safe(head, tmp, &ctx->files_rotated) { - file = mk_list_entry(head, struct flb_tail_file, _rotate_head); - if ((file->rotated + ctx->rotate_wait) <= now) { - ret = fstat(file->fd, &st); - if (ret == 0) { - flb_plg_debug(ctx->ins, - "inode=%"PRIu64" purge rotated file %s " \ - "(offset=%"PRId64" / size = %"PRIu64")", - file->inode, file->name, file->offset, (uint64_t)st.st_size); - if (file->pending_bytes > 0 && flb_input_buf_paused(ins)) { - flb_plg_warn(ctx->ins, "purged rotated file while data " - "ingestion is paused, consider increasing " - "rotate_wait"); - } - } - else { - flb_plg_debug(ctx->ins, - "inode=%"PRIu64" purge rotated file %s (offset=%"PRId64")", - file->inode, file->name, file->offset); - } - - flb_tail_file_remove(file); - count++; - } - } - - /* - * Deleted files: under high load scenarios, exists the chances that in - * our event loop we miss some notifications about a file. In order to - * sanitize our list of monitored files we will iterate all of them and check - * if they have been deleted or not. - */ - mk_list_foreach_safe(head, tmp, &ctx->files_static) { - file = mk_list_entry(head, struct flb_tail_file, _head); - check_purge_deleted_file(ctx, file, now); - } - mk_list_foreach_safe(head, tmp, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - check_purge_deleted_file(ctx, file, now); - } - - return count; -} diff --git a/fluent-bit/plugins/in_tail/tail_file.h b/fluent-bit/plugins/in_tail/tail_file.h deleted file mode 100644 index 796224c37..000000000 --- a/fluent-bit/plugins/in_tail/tail_file.h +++ /dev/null @@ -1,137 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_FILE_H -#define FLB_TAIL_FILE_H - -#include <sys/types.h> -#include <sys/stat.h> - -#include <fluent-bit/flb_compat.h> -#include <fluent-bit/flb_input.h> - -#include "tail.h" -#include "tail_fs.h" -#include "tail_config.h" -#include "tail_file_internal.h" - -#ifdef FLB_SYSTEM_WINDOWS -#include "win32.h" -#endif - -#ifdef FLB_HAVE_REGEX -#define FLB_HASH_TABLE_SIZE 50 -#endif - -/* return the file modification time in seconds since epoch */ -static inline int64_t flb_tail_stat_mtime(struct stat *st) -{ -#if defined(FLB_HAVE_WINDOWS) - return (int64_t) st->st_mtime; -#elif defined(__APPLE__) && !defined(_POSIX_C_SOURCE) - return (int64_t) st->st_mtimespec.tv_sec; -#elif (_POSIX_C_SOURCE >= 200809L || \ - defined(_BSD_SOURCE) || defined(_SVID_SOURCE) || \ - defined(__BIONIC__) || (defined (__SVR4) && defined (__sun)) || \ - defined(__FreeBSD__) || defined (__linux__)) - return (int64_t) st->st_mtim.tv_sec; -#elif defined(_AIX) - return (int64_t) st->st_mtime; -#else - return (int64_t) st->st_mtime; -#endif - - /* backend unsupported: submit a PR :) */ - return -1; -} - -static inline int flb_tail_target_file_name_cmp(char *name, - struct flb_tail_file *file) -{ - int ret; - char *name_a = NULL; - char *name_b = NULL; - char *base_a = NULL; - char *base_b = NULL; - - name_a = flb_strdup(name); - if (!name_a) { - flb_errno(); - ret = -1; - goto out; - } - - base_a = flb_strdup(basename(name_a)); - if (!base_a) { - flb_errno(); - ret = -1; - goto out; - } - -#if defined(FLB_SYSTEM_WINDOWS) - name_b = flb_strdup(file->real_name); - if (!name_b) { - flb_errno(); - ret = -1; - goto out; - } - - base_b = basename(name_b); - ret = _stricmp(base_a, base_b); -#else - name_b = flb_strdup(file->real_name); - if (!name_b) { - flb_errno(); - ret = -1; - goto out; - } - base_b = basename(name_b); - ret = strcmp(base_a, base_b); -#endif - - out: - flb_free(name_a); - flb_free(name_b); - flb_free(base_a); - - /* FYI: 'base_b' never points to a new allocation, no flb_free is needed */ - - return ret; -} - -int flb_tail_file_name_dup(char *path, struct flb_tail_file *file); -int flb_tail_file_to_event(struct flb_tail_file *file); -int flb_tail_file_chunk(struct flb_tail_file *file); -int flb_tail_file_append(char *path, struct stat *st, int mode, - struct flb_tail_config *ctx); -void flb_tail_file_remove(struct flb_tail_file *file); -int flb_tail_file_remove_all(struct flb_tail_config *ctx); -char *flb_tail_file_name(struct flb_tail_file *file); -int flb_tail_file_is_rotated(struct flb_tail_config *ctx, - struct flb_tail_file *file); -int flb_tail_file_rotated(struct flb_tail_file *file); -int flb_tail_file_purge(struct flb_input_instance *ins, - struct flb_config *config, void *context); -int flb_tail_pack_line_map(struct flb_time *time, char **data, - size_t *data_size, struct flb_tail_file *file, - size_t processed_bytes); -int flb_tail_file_pack_line(struct flb_time *time, char *data, size_t data_size, - struct flb_tail_file *file, size_t processed_bytes); - -#endif diff --git a/fluent-bit/plugins/in_tail/tail_file_internal.h b/fluent-bit/plugins/in_tail/tail_file_internal.h deleted file mode 100644 index 6d95c87c1..000000000 --- a/fluent-bit/plugins/in_tail/tail_file_internal.h +++ /dev/null @@ -1,130 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_INTERNAL_H -#define FLB_TAIL_INTERNAL_H - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input.h> -#include <fluent-bit/flb_sds.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_log_event_encoder.h> - -#ifdef FLB_HAVE_PARSER -#include <fluent-bit/multiline/flb_ml.h> -#endif - -#include "tail.h" -#include "tail_config.h" - -struct flb_tail_file { - /* Inotify */ - int watch_fd; - /* file lookup info */ - int fd; - int64_t size; - int64_t offset; - int64_t last_line; - uint64_t dev_id; - uint64_t inode; - uint64_t link_inode; - int is_link; - char *name; /* target file name given by scan routine */ - char *real_name; /* real file name in the file system */ - char *orig_name; /* original file name (before rotation) */ - size_t name_len; - size_t orig_name_len; - time_t rotated; - int64_t pending_bytes; - - /* dynamic tag for this file */ - int tag_len; - char *tag_buf; - - /* OLD multiline */ - time_t mult_flush_timeout; /* time when multiline started */ - int mult_firstline; /* bool: mult firstline found ? */ - int mult_firstline_append; /* bool: mult firstline appendable ? */ - int mult_skipping; /* skipping because ignode_older than ? */ - int mult_keys; /* total number of buffered keys */ - - int mult_records; /* multiline records counter mult_sbuf */ - msgpack_sbuffer mult_sbuf; /* temporary msgpack buffer */ - msgpack_packer mult_pck; /* temporary msgpack packer */ - struct flb_time mult_time; /* multiline time parsed from first line */ - - /* OLD docker mode */ - time_t dmode_flush_timeout; /* time when docker mode started */ - flb_sds_t dmode_buf; /* buffer for docker mode */ - flb_sds_t dmode_lastline; /* last incomplete line */ - bool dmode_complete; /* buffer contains completed log */ - bool dmode_firstline; /* dmode mult firstline found ? */ - - /* multiline engine: file stream_id and local buffers */ - uint64_t ml_stream_id; - - /* content parsing, positions and buffer */ - size_t parsed; - size_t buf_len; - size_t buf_size; - char *buf_data; - - /* - * This value represent the number of bytes procesed by process_content() - * in the last iteration. - */ - size_t last_processed_bytes; - - /* - * Long-lines handling: this flag is enabled when a previous line was - * too long and the buffer did not contain a \n, so when reaching the - * missing \n, skip that content and move forward. - * - * This flag is only set when Skip_Long_Lines is On. - */ - int skip_next; - - /* Did the plugin already warn the user about long lines ? */ - int skip_warn; - - /* Opaque data type for specific fs-event backend data */ - void *fs_backend; - - /* database reference */ - uint64_t db_id; - - uint64_t hash_bits; - flb_sds_t hash_key; - - /* There are dedicated log event encoders for - * single and multi line events because I am respecting - * the old behavior which resulted in grouping both types - * of logs in tail_file.c but I don't know if this is - * strictly necessary. - */ - struct flb_log_event_encoder *ml_log_event_encoder; - struct flb_log_event_encoder *sl_log_event_encoder; - - /* reference */ - int tail_mode; - struct flb_tail_config *config; - struct mk_list _head; - struct mk_list _rotate_head; -}; -#endif diff --git a/fluent-bit/plugins/in_tail/tail_fs.h b/fluent-bit/plugins/in_tail/tail_fs.h deleted file mode 100644 index 948954333..000000000 --- a/fluent-bit/plugins/in_tail/tail_fs.h +++ /dev/null @@ -1,96 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_FS_H -#define FLB_TAIL_FS_H - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input.h> - -#include "tail_config.h" -#include "tail_file_internal.h" - -#include "tail_fs_stat.h" -#ifdef FLB_HAVE_INOTIFY -#include "tail_fs_inotify.h" -#endif - -static inline int flb_tail_fs_init(struct flb_input_instance *in, - struct flb_tail_config *ctx, struct flb_config *config) -{ -#ifdef FLB_HAVE_INOTIFY - if (ctx->inotify_watcher) { - return flb_tail_fs_inotify_init(in, ctx, config); - } -#endif - return flb_tail_fs_stat_init(in, ctx, config); -} - -static inline void flb_tail_fs_pause(struct flb_tail_config *ctx) -{ -#ifdef FLB_HAVE_INOTIFY - if (ctx->inotify_watcher) { - return flb_tail_fs_inotify_pause(ctx); - } -#endif - return flb_tail_fs_stat_pause(ctx); -} - -static inline void flb_tail_fs_resume(struct flb_tail_config *ctx) -{ -#ifdef FLB_HAVE_INOTIFY - if (ctx->inotify_watcher) { - return flb_tail_fs_inotify_resume(ctx); - } -#endif - return flb_tail_fs_stat_resume(ctx); -} - -static inline int flb_tail_fs_add(struct flb_tail_config *ctx, struct flb_tail_file *file) -{ -#ifdef FLB_HAVE_INOTIFY - if (ctx->inotify_watcher) { - return flb_tail_fs_inotify_add(file); - } -#endif - return flb_tail_fs_stat_add(file); -} - -static inline int flb_tail_fs_remove(struct flb_tail_config *ctx, struct flb_tail_file *file) -{ -#ifdef FLB_HAVE_INOTIFY - if (ctx->inotify_watcher) { - return flb_tail_fs_inotify_remove(file); - } -#endif - return flb_tail_fs_stat_remove(file); -} - -static inline int flb_tail_fs_exit(struct flb_tail_config *ctx) -{ -#ifdef FLB_HAVE_INOTIFY - if (ctx->inotify_watcher) { - return flb_tail_fs_inotify_exit(ctx); - } -#endif - return flb_tail_fs_stat_exit(ctx); -} - - -#endif diff --git a/fluent-bit/plugins/in_tail/tail_fs_inotify.c b/fluent-bit/plugins/in_tail/tail_fs_inotify.c deleted file mode 100644 index 59d10ca08..000000000 --- a/fluent-bit/plugins/in_tail/tail_fs_inotify.c +++ /dev/null @@ -1,433 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define _DEFAULT_SOURCE - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input.h> -#include <fluent-bit/flb_input_plugin.h> - -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <sys/inotify.h> - -#include "tail_config.h" -#include "tail_file.h" -#include "tail_db.h" -#include "tail_signal.h" - -#include <limits.h> -#include <fcntl.h> - -#include <sys/ioctl.h> - -static int debug_event_mask(struct flb_tail_config *ctx, - struct flb_tail_file *file, - uint32_t mask) -{ - flb_sds_t buf; - int buf_size = 256; - - /* Only enter this function if debug mode is allowed */ - if (flb_log_check(FLB_LOG_DEBUG) == 0) { - return 0; - } - - if (file) { - buf_size = file->name_len + 128; - } - - if (buf_size < 256) { - buf_size = 256; - } - - /* Create buffer */ - buf = flb_sds_create_size(buf_size); - if (!buf) { - return -1; - } - - /* Print info into sds */ - if (file) { - flb_sds_printf(&buf, "inode=%"PRIu64", %s, events: ", file->inode, file->name); - } - else { - flb_sds_printf(&buf, "events: "); - } - - if (mask & IN_ATTRIB) { - flb_sds_printf(&buf, "IN_ATTRIB "); - } - if (mask & IN_IGNORED) { - flb_sds_printf(&buf, "IN_IGNORED "); - } - if (mask & IN_MODIFY) { - flb_sds_printf(&buf, "IN_MODIFY "); - } - if (mask & IN_MOVE_SELF) { - flb_sds_printf(&buf, "IN_MOVE_SELF "); - } - if (mask & IN_Q_OVERFLOW) { - flb_sds_printf(&buf, "IN_Q_OVERFLOW "); - } - - flb_plg_debug(ctx->ins, "%s", buf); - flb_sds_destroy(buf); - - return 0; -} - -static int tail_fs_add(struct flb_tail_file *file, int check_rotated) -{ - int flags; - int watch_fd; - char *name; - struct flb_tail_config *ctx = file->config; - - /* - * If there is no watcher associated, we only want to monitor events if - * this file is rotated to somewhere. Note at this point we are polling - * lines from the file and once we reach EOF (and a watch_fd exists), - * we update the flags to receive notifications. - */ - flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW; - - if (check_rotated == FLB_TRUE) { - flags |= IN_MOVE_SELF; - } - - /* - * Double check real name of the file associated to the inode: - * - * Inotify interface in the Kernel uses the inode number as a real reference - * for the file we have opened. If for some reason the file we are pointing - * out in file->name has been rotated and not been updated, we might not add - * the watch to the real file we aim to. - * - * A case like this can generate the issue: - * - * 1. inode=1 : file a.log is being watched - * 2. inode=1 : file a.log is rotated to a.log.1, but notification not - * delivered yet. - * 3. inode=2 : new file 'a.log' is created - * 4. inode=2 : the scan_path routine discover the new 'a.log' file - * 5. inode=2 : add an inotify watcher for 'a.log' - * 6. conflict: inotify_add_watch() receives the path 'a.log', - */ - - name = flb_tail_file_name(file); - if (!name) { - flb_plg_error(ctx->ins, "inode=%"PRIu64" cannot get real filename for inotify", - file->inode); - return -1; - } - - /* Register or update the flags */ - watch_fd = inotify_add_watch(ctx->fd_notify, name, flags); - flb_free(name); - - if (watch_fd == -1) { - flb_errno(); - if (errno == ENOSPC) { - flb_plg_error(ctx->ins, "inotify: The user limit on the total " - "number of inotify watches was reached or the kernel " - "failed to allocate a needed resource (ENOSPC)"); - } - return -1; - } - file->watch_fd = watch_fd; - flb_plg_info(ctx->ins, "inotify_fs_add(): inode=%"PRIu64" watch_fd=%i name=%s", - file->inode, watch_fd, file->name); - return 0; -} - -static int flb_tail_fs_add_rotated(struct flb_tail_file *file) -{ - return tail_fs_add(file, FLB_FALSE); -} - -static int tail_fs_event(struct flb_input_instance *ins, - struct flb_config *config, void *in_context) -{ - int ret; - int64_t offset; - struct mk_list *head; - struct mk_list *tmp; - struct flb_tail_config *ctx = in_context; - struct flb_tail_file *file = NULL; - struct inotify_event ev; - struct stat st; - - /* Read the event */ - ret = read(ctx->fd_notify, &ev, sizeof(struct inotify_event)); - if (ret < 1) { - return -1; - } - - /* Lookup watched file */ - mk_list_foreach_safe(head, tmp, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - if (file->watch_fd != ev.wd) { - file = NULL; - continue; - } - break; - } - - if (!file) { - return -1; - } - - /* Debug event */ - debug_event_mask(ctx, file, ev.mask); - - if (ev.mask & IN_IGNORED) { - flb_plg_debug(ctx->ins, "inode=%"PRIu64" watch_fd=%i IN_IGNORED", - file->inode, ev.wd); - return -1; - } - - /* Check file rotation (only if it has not been rotated before) */ - if (ev.mask & IN_MOVE_SELF && file->rotated == 0) { - flb_plg_debug(ins, "inode=%"PRIu64" rotated IN_MOVE SELF '%s'", - file->inode, file->name); - - /* A rotated file must be re-registered */ - flb_tail_file_rotated(file); - flb_tail_fs_remove(ctx, file); - flb_tail_fs_add_rotated(file); - } - - ret = fstat(file->fd, &st); - if (ret == -1) { - flb_plg_debug(ins, "inode=%"PRIu64" error stat(2) %s, removing", - file->inode, file->name); - flb_tail_file_remove(file); - return 0; - } - file->size = st.st_size; - file->pending_bytes = (file->size - file->offset); - - /* File was removed ? */ - if (ev.mask & IN_ATTRIB) { - /* Check if the file have been deleted */ - if (st.st_nlink == 0) { - flb_plg_debug(ins, "inode=%"PRIu64" file has been deleted: %s", - file->inode, file->name); - -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - /* Remove file entry from the database */ - flb_tail_db_file_delete(file, ctx); - } -#endif - /* Remove file from the monitored list */ - flb_tail_file_remove(file); - return 0; - } - } - - if (ev.mask & IN_MODIFY) { - /* - * The file was modified, check how many new bytes do - * we have. - */ - - /* Check if the file was truncated */ - if (file->offset > st.st_size) { - offset = lseek(file->fd, 0, SEEK_SET); - if (offset == -1) { - flb_errno(); - return -1; - } - - flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s", - file->inode, file->name); - file->offset = offset; - file->buf_len = 0; - - /* Update offset in the database file */ -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - flb_tail_db_file_offset(file, ctx); - } -#endif - } - } - - /* Collect the data */ - ret = in_tail_collect_event(file, config); - if (ret != FLB_TAIL_ERROR) { - /* - * Due to read buffer size capacity, there are some cases where the - * read operation cannot consume all new data available on one - * round; upon successfull read(2) some data can still remain. - * - * If that is the case, we set in the structure how - * many bytes are available 'now', so then the further - * routine that check pending bytes and then the inotified-file - * can process them properly after an internal signal. - * - * The goal to defer this routine is to avoid a blocking - * read(2) operation, that might kill performance. Just let's - * wait a second and do a good job. - */ - tail_signal_pending(ctx); - } - else { - return ret; - } - - return 0; -} - -static int in_tail_progress_check_callback(struct flb_input_instance *ins, - struct flb_config *config, void *context) -{ - int ret = 0; - struct mk_list *tmp; - struct mk_list *head; - struct flb_tail_config *ctx = context; - struct flb_tail_file *file; - int pending_data_detected; - struct stat st; - - (void) config; - - pending_data_detected = FLB_FALSE; - - mk_list_foreach_safe(head, tmp, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - - if (file->offset < file->size) { - pending_data_detected = FLB_TRUE; - - continue; - } - - ret = fstat(file->fd, &st); - if (ret == -1) { - flb_errno(); - flb_plg_error(ins, "fstat error"); - - continue; - } - - if (file->offset < st.st_size) { - file->size = st.st_size; - file->pending_bytes = (file->size - file->offset); - - pending_data_detected = FLB_TRUE; - } - } - - if (pending_data_detected) { - tail_signal_pending(ctx); - } - - return 0; -} - -/* File System events based on Inotify(2). Linux >= 2.6.32 is suggested */ -int flb_tail_fs_inotify_init(struct flb_input_instance *in, - struct flb_tail_config *ctx, struct flb_config *config) -{ - int fd; - int ret; - - flb_plg_debug(ctx->ins, "flb_tail_fs_inotify_init() initializing inotify tail input"); - - /* Create inotify instance */ - fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); - if (fd == -1) { - flb_errno(); - return -1; - } - flb_plg_debug(ctx->ins, "inotify watch fd=%i", fd); - ctx->fd_notify = fd; - - /* This backend use Fluent Bit event-loop to trigger notifications */ - ret = flb_input_set_collector_event(in, tail_fs_event, - ctx->fd_notify, config); - if (ret < 0) { - close(fd); - return -1; - } - ctx->coll_fd_fs1 = ret; - - /* Register callback to check current tail offsets */ - ret = flb_input_set_collector_time(in, in_tail_progress_check_callback, - ctx->progress_check_interval, - ctx->progress_check_interval_nsec, - config); - if (ret == -1) { - flb_tail_config_destroy(ctx); - return -1; - } - ctx->coll_fd_progress_check = ret; - - return 0; -} - -void flb_tail_fs_inotify_pause(struct flb_tail_config *ctx) -{ - flb_input_collector_pause(ctx->coll_fd_fs1, ctx->ins); -} - -void flb_tail_fs_inotify_resume(struct flb_tail_config *ctx) -{ - flb_input_collector_resume(ctx->coll_fd_fs1, ctx->ins); -} - -int flb_tail_fs_inotify_add(struct flb_tail_file *file) -{ - int ret; - struct flb_tail_config *ctx = file->config; - - ret = tail_fs_add(file, FLB_TRUE); - if (ret == -1) { - flb_plg_error(ctx->ins, "inode=%"PRIu64" cannot register file %s", - file->inode, file->name); - return -1; - } - - return 0; -} - -int flb_tail_fs_inotify_remove(struct flb_tail_file *file) -{ - struct flb_tail_config *ctx = file->config; - - if (file->watch_fd == -1) { - return 0; - } - - flb_plg_info(ctx->ins, "inotify_fs_remove(): inode=%"PRIu64" watch_fd=%i", - file->inode, file->watch_fd); - - inotify_rm_watch(file->config->fd_notify, file->watch_fd); - file->watch_fd = -1; - return 0; -} - -int flb_tail_fs_inotify_exit(struct flb_tail_config *ctx) -{ - return close(ctx->fd_notify); -} diff --git a/fluent-bit/plugins/in_tail/tail_fs_inotify.h b/fluent-bit/plugins/in_tail/tail_fs_inotify.h deleted file mode 100644 index 128ab0624..000000000 --- a/fluent-bit/plugins/in_tail/tail_fs_inotify.h +++ /dev/null @@ -1,37 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_FS_INOTIFY_H -#define FLB_TAIL_FS_INOTIFY_H - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input.h> - -#include "tail_config.h" -#include "tail_file_internal.h" - -int flb_tail_fs_inotify_init(struct flb_input_instance *in, - struct flb_tail_config *ctx, struct flb_config *config); -int flb_tail_fs_inotify_add(struct flb_tail_file *file); -int flb_tail_fs_inotify_remove(struct flb_tail_file *file); -int flb_tail_fs_inotify_exit(struct flb_tail_config *ctx); -void flb_tail_fs_inotify_pause(struct flb_tail_config *ctx); -void flb_tail_fs_inotify_resume(struct flb_tail_config *ctx); - -#endif diff --git a/fluent-bit/plugins/in_tail/tail_fs_stat.c b/fluent-bit/plugins/in_tail/tail_fs_stat.c deleted file mode 100644 index 6b312c9bd..000000000 --- a/fluent-bit/plugins/in_tail/tail_fs_stat.c +++ /dev/null @@ -1,253 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define _DEFAULT_SOURCE - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input_plugin.h> - -#include <sys/types.h> -#include <sys/stat.h> - -#include "tail_file.h" -#include "tail_db.h" -#include "tail_config.h" -#include "tail_signal.h" - -#ifdef FLB_SYSTEM_WINDOWS -#include "win32.h" -#endif - -struct fs_stat { - /* last time check */ - time_t checked; - - /* previous status */ - struct stat st; -}; - -static int tail_fs_event(struct flb_input_instance *ins, - struct flb_config *config, void *in_context) -{ - int ret; - struct mk_list *head; - struct mk_list *tmp; - struct flb_tail_config *ctx = in_context; - struct flb_tail_file *file = NULL; - struct fs_stat *fst; - struct stat st; - time_t t; - - t = time(NULL); - - /* Lookup watched file */ - mk_list_foreach_safe(head, tmp, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - fst = file->fs_backend; - - /* Check current status of the file */ - ret = fstat(file->fd, &st); - if (ret == -1) { - flb_errno(); - continue; - } - - /* Check if the file was modified */ - if ((fst->st.st_mtime != st.st_mtime) || - (fst->st.st_size != st.st_size)) { - /* Update stat info and trigger the notification */ - memcpy(&fst->st, &st, sizeof(struct stat)); - fst->checked = t; - in_tail_collect_event(file, config); - } - } - - return 0; -} - -static int tail_fs_check(struct flb_input_instance *ins, - struct flb_config *config, void *in_context) -{ - int ret; - int64_t offset; - char *name; - struct mk_list *tmp; - struct mk_list *head; - struct flb_tail_config *ctx = in_context; - struct flb_tail_file *file = NULL; - struct fs_stat *fst; - struct stat st; - - /* Lookup watched file */ - mk_list_foreach_safe(head, tmp, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - fst = file->fs_backend; - - ret = fstat(file->fd, &st); - if (ret == -1) { - flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name); - flb_tail_file_remove(file); - continue; - } - - /* Check if the file have been deleted */ - if (st.st_nlink == 0) { - flb_plg_debug(ctx->ins, "file has been deleted: %s", file->name); -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - /* Remove file entry from the database */ - flb_tail_db_file_delete(file, ctx); - } -#endif - flb_tail_file_remove(file); - continue; - } - - /* Check if the file was truncated */ - if (file->offset > st.st_size) { - offset = lseek(file->fd, 0, SEEK_SET); - if (offset == -1) { - flb_errno(); - return -1; - } - - flb_plg_debug(ctx->ins, "file truncated %s", file->name); - file->offset = offset; - file->buf_len = 0; - memcpy(&fst->st, &st, sizeof(struct stat)); - -#ifdef FLB_HAVE_SQLDB - /* Update offset in database file */ - if (ctx->db) { - flb_tail_db_file_offset(file, ctx); - } -#endif - } - - if (file->offset < st.st_size) { - file->pending_bytes = (st.st_size - file->offset); - tail_signal_pending(ctx); - } - else { - file->pending_bytes = 0; - } - - - /* Discover the current file name for the open file descriptor */ - name = flb_tail_file_name(file); - if (!name) { - flb_plg_debug(ctx->ins, "could not resolve %s, removing", file->name); - flb_tail_file_remove(file); - continue; - } - - /* - * Check if file still exists. This method requires explicity that the - * user is using an absolute path, otherwise we will be rotating the - * wrong file. - * - * flb_tail_target_file_name_cmp is a deeper compare than - * flb_tail_file_name_cmp. If applicable, it compares to the underlying - * real_name of the file. - */ - if (flb_tail_file_is_rotated(ctx, file) == FLB_TRUE) { - flb_tail_file_rotated(file); - } - flb_free(name); - - } - - return 0; -} - -/* File System events based on stat(2) */ -int flb_tail_fs_stat_init(struct flb_input_instance *in, - struct flb_tail_config *ctx, struct flb_config *config) -{ - int ret; - - flb_plg_debug(ctx->ins, "flb_tail_fs_stat_init() initializing stat tail input"); - - /* Set a manual timer to collect events every 0.250 seconds */ - ret = flb_input_set_collector_time(in, tail_fs_event, - 0, 250000000, config); - if (ret < 0) { - return -1; - } - ctx->coll_fd_fs1 = ret; - - /* Set a manual timer to check deleted/rotated files every 2.5 seconds */ - ret = flb_input_set_collector_time(in, tail_fs_check, - 2, 500000000, config); - if (ret < 0) { - return -1; - } - ctx->coll_fd_fs2 = ret; - - return 0; -} - -void flb_tail_fs_stat_pause(struct flb_tail_config *ctx) -{ - flb_input_collector_pause(ctx->coll_fd_fs1, ctx->ins); - flb_input_collector_pause(ctx->coll_fd_fs2, ctx->ins); -} - -void flb_tail_fs_stat_resume(struct flb_tail_config *ctx) -{ - flb_input_collector_resume(ctx->coll_fd_fs1, ctx->ins); - flb_input_collector_resume(ctx->coll_fd_fs2, ctx->ins); -} - -int flb_tail_fs_stat_add(struct flb_tail_file *file) -{ - int ret; - struct fs_stat *fst; - - fst = flb_malloc(sizeof(struct fs_stat)); - if (!fst) { - flb_errno(); - return -1; - } - - fst->checked = time(NULL); - ret = stat(file->name, &fst->st); - if (ret == -1) { - flb_errno(); - flb_free(fst); - return -1; - } - file->fs_backend = fst; - - return 0; -} - -int flb_tail_fs_stat_remove(struct flb_tail_file *file) -{ - if (file->tail_mode == FLB_TAIL_EVENT) { - flb_free(file->fs_backend); - } - return 0; -} - -int flb_tail_fs_stat_exit(struct flb_tail_config *ctx) -{ - (void) ctx; - return 0; -} diff --git a/fluent-bit/plugins/in_tail/tail_fs_stat.h b/fluent-bit/plugins/in_tail/tail_fs_stat.h deleted file mode 100644 index 21a0704cb..000000000 --- a/fluent-bit/plugins/in_tail/tail_fs_stat.h +++ /dev/null @@ -1,37 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_FS_STAT_H -#define FLB_TAIL_FS_STAT_H - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input.h> - -#include "tail_config.h" -#include "tail_file_internal.h" - -int flb_tail_fs_stat_init(struct flb_input_instance *in, - struct flb_tail_config *ctx, struct flb_config *config); -int flb_tail_fs_stat_add(struct flb_tail_file *file); -int flb_tail_fs_stat_remove(struct flb_tail_file *file); -int flb_tail_fs_stat_exit(struct flb_tail_config *ctx); -void flb_tail_fs_stat_pause(struct flb_tail_config *ctx); -void flb_tail_fs_stat_resume(struct flb_tail_config *ctx); - -#endif diff --git a/fluent-bit/plugins/in_tail/tail_multiline.c b/fluent-bit/plugins/in_tail/tail_multiline.c deleted file mode 100644 index 71c031014..000000000 --- a/fluent-bit/plugins/in_tail/tail_multiline.c +++ /dev/null @@ -1,606 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_config.h> -#include <fluent-bit/flb_kv.h> - -#include "tail_config.h" -#include "tail_multiline.h" - -static int tail_mult_append(struct flb_parser *parser, - struct flb_tail_config *ctx) -{ - struct flb_tail_mult *mp; - - mp = flb_malloc(sizeof(struct flb_tail_mult)); - if (!mp) { - flb_errno(); - return -1; - } - - mp->parser = parser; - mk_list_add(&mp->_head, &ctx->mult_parsers); - - return 0; -} - -int flb_tail_mult_create(struct flb_tail_config *ctx, - struct flb_input_instance *ins, - struct flb_config *config) -{ - int ret; - const char *tmp; - struct mk_list *head; - struct flb_parser *parser; - struct flb_kv *kv; - - if (ctx->multiline_flush <= 0) { - ctx->multiline_flush = 1; - } - - mk_list_init(&ctx->mult_parsers); - - /* Get firstline parser */ - tmp = flb_input_get_property("parser_firstline", ins); - if (!tmp) { - flb_plg_error(ctx->ins, "multiline: no parser defined for firstline"); - return -1; - } - parser = flb_parser_get(tmp, config); - if (!parser) { - flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", tmp); - return -1; - } - - ctx->mult_parser_firstline = parser; - - /* Read all multiline rules */ - mk_list_foreach(head, &ins->properties) { - kv = mk_list_entry(head, struct flb_kv, _head); - if (strcasecmp("parser_firstline", kv->key) == 0) { - continue; - } - - if (strncasecmp("parser_", kv->key, 7) == 0) { - parser = flb_parser_get(kv->val, config); - if (!parser) { - flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", kv->val); - return -1; - } - - ret = tail_mult_append(parser, ctx); - if (ret == -1) { - return -1; - } - } - } - - return 0; -} - -int flb_tail_mult_destroy(struct flb_tail_config *ctx) -{ - struct mk_list *tmp; - struct mk_list *head; - struct flb_tail_mult *mp; - - if (ctx->multiline == FLB_FALSE) { - return 0; - } - - mk_list_foreach_safe(head, tmp, &ctx->mult_parsers) { - mp = mk_list_entry(head, struct flb_tail_mult, _head); - mk_list_del(&mp->_head); - flb_free(mp); - } - - return 0; -} - -/* Process the result of a firstline match */ -int flb_tail_mult_process_first(time_t now, - char *buf, size_t size, - struct flb_time *out_time, - struct flb_tail_file *file, - struct flb_tail_config *ctx) -{ - int ret; - size_t off; - msgpack_object map; - msgpack_unpacked result; - - /* If a previous multiline context already exists, flush first */ - if (file->mult_firstline && !file->mult_skipping) { - flb_tail_mult_flush(file, ctx); - } - - /* Remark as first multiline message */ - file->mult_firstline = FLB_TRUE; - - /* Validate obtained time, if not set, set the current time */ - if (flb_time_to_nanosec(out_time) == 0L) { - flb_time_get(out_time); - } - - /* Should we skip this multiline record ? */ - if (ctx->ignore_older > 0) { - if ((now - ctx->ignore_older) > out_time->tm.tv_sec) { - flb_free(buf); - file->mult_skipping = FLB_TRUE; - file->mult_firstline = FLB_TRUE; - - /* we expect more data to skip */ - return FLB_TAIL_MULT_MORE; - } - } - - /* Re-initiate buffers */ - msgpack_sbuffer_init(&file->mult_sbuf); - msgpack_packer_init(&file->mult_pck, &file->mult_sbuf, msgpack_sbuffer_write); - - /* - * flb_parser_do() always return a msgpack buffer, so we tweak our - * local msgpack reference to avoid an extra allocation. The only - * concern is that we don't know what's the real size of the memory - * allocated, so we assume it's just 'out_size'. - */ - file->mult_flush_timeout = now + (ctx->multiline_flush - 1); - file->mult_sbuf.data = buf; - file->mult_sbuf.size = size; - file->mult_sbuf.alloc = size; - - /* Set multiline status */ - file->mult_firstline = FLB_TRUE; - file->mult_skipping = FLB_FALSE; - flb_time_copy(&file->mult_time, out_time); - - off = 0; - msgpack_unpacked_init(&result); - ret = msgpack_unpack_next(&result, buf, size, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { - msgpack_sbuffer_destroy(&file->mult_sbuf); - msgpack_unpacked_destroy(&result); - return FLB_TAIL_MULT_NA; - } - - map = result.data; - file->mult_keys = map.via.map.size; - msgpack_unpacked_destroy(&result); - - /* We expect more data */ - return FLB_TAIL_MULT_MORE; -} - -/* Append a raw log entry to the last structured field in the mult buffer */ -static inline void flb_tail_mult_append_raw(char *buf, int size, - struct flb_tail_file *file, - struct flb_tail_config *config) -{ - /* Append the raw string */ - msgpack_pack_str(&file->mult_pck, size); - msgpack_pack_str_body(&file->mult_pck, buf, size); -} - -/* Check if the last key value type of a map is string or not */ -static inline int is_last_key_val_string(char *buf, size_t size) -{ - int ret = FLB_FALSE; - size_t off; - msgpack_unpacked result; - msgpack_object v; - msgpack_object root; - - off = 0; - msgpack_unpacked_init(&result); - ret = msgpack_unpack_next(&result, buf, size, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { - return ret; - } - - root = result.data; - if (root.type != MSGPACK_OBJECT_MAP) { - ret = FLB_FALSE; - } - else { - if (root.via.map.size == 0) { - ret = FLB_FALSE; - } - else { - v = root.via.map.ptr[root.via.map.size - 1].val; - if (v.type == MSGPACK_OBJECT_STR) { - ret = FLB_TRUE; - } - } - } - - msgpack_unpacked_destroy(&result); - return ret; -} - -int flb_tail_mult_process_content(time_t now, - char *buf, size_t len, - struct flb_tail_file *file, - struct flb_tail_config *ctx, - size_t processed_bytes) -{ - int ret; - size_t off; - void *out_buf; - size_t out_size = 0; - struct mk_list *head; - struct flb_tail_mult *mult_parser = NULL; - struct flb_time out_time = {0}; - msgpack_object map; - msgpack_unpacked result; - - /* Always check if this line is the beginning of a new multiline message */ - ret = flb_parser_do(ctx->mult_parser_firstline, - buf, len, - &out_buf, &out_size, &out_time); - if (ret >= 0) { - /* - * The content is a candidate for a firstline, but we need to perform - * the extra-mandatory check where the last key value type must be - * a string, otherwise no string concatenation with continuation lines - * will be possible. - */ - ret = is_last_key_val_string(out_buf, out_size); - if (ret == FLB_TRUE) - file->mult_firstline_append = FLB_TRUE; - else - file->mult_firstline_append = FLB_FALSE; - - flb_tail_mult_process_first(now, out_buf, out_size, &out_time, - file, ctx); - return FLB_TAIL_MULT_MORE; - } - - if (file->mult_skipping == FLB_TRUE) { - return FLB_TAIL_MULT_MORE; - } - - /* - * Once here means we have some data that is a continuation, iterate - * parsers trying to find a match - */ - out_buf = NULL; - mk_list_foreach(head, &ctx->mult_parsers) { - mult_parser = mk_list_entry(head, struct flb_tail_mult, _head); - - /* Process line text with current parser */ - out_buf = NULL; - out_size = 0; - ret = flb_parser_do(mult_parser->parser, - buf, len, - &out_buf, &out_size, &out_time); - if (ret < 0) { - mult_parser = NULL; - continue; - } - - /* The line was processed, break the loop and buffer the data */ - break; - } - - if (!mult_parser) { - /* - * If no parser was found means the string log must be appended - * to the last structured field. - */ - if (file->mult_firstline && file->mult_firstline_append) { - flb_tail_mult_append_raw(buf, len, file, ctx); - } - else { - flb_tail_file_pack_line(NULL, buf, len, file, processed_bytes); - } - - return FLB_TAIL_MULT_MORE; - } - - off = 0; - msgpack_unpacked_init(&result); - msgpack_unpack_next(&result, out_buf, out_size, &off); - map = result.data; - - /* Append new map to our local msgpack buffer */ - file->mult_keys += map.via.map.size; - msgpack_unpacked_destroy(&result); - msgpack_sbuffer_write(&file->mult_sbuf, out_buf, out_size); - flb_free(out_buf); - - return FLB_TAIL_MULT_MORE; -} - -static int flb_tail_mult_pack_line_body( - struct flb_log_event_encoder *context, - struct flb_tail_file *file) -{ - size_t adjacent_object_offset; - size_t continuation_length; - msgpack_unpacked adjacent_object; - msgpack_unpacked current_object; - size_t entry_index; - msgpack_object entry_value; - msgpack_object entry_key; - msgpack_object_map *data_map; - int map_size; - size_t offset; - struct flb_tail_config *config; - int result; - - result = FLB_EVENT_ENCODER_SUCCESS; - config = (struct flb_tail_config *) file->config; - - /* New Map size */ - map_size = file->mult_keys; - - if (file->config->path_key != NULL) { - map_size++; - - result = flb_log_event_encoder_append_body_values( - context, - FLB_LOG_EVENT_CSTRING_VALUE(config->path_key), - FLB_LOG_EVENT_CSTRING_VALUE(file->name)); - } - - - msgpack_unpacked_init(¤t_object); - msgpack_unpacked_init(&adjacent_object); - - offset = 0; - - while (result == FLB_EVENT_ENCODER_SUCCESS && - msgpack_unpack_next(¤t_object, - file->mult_sbuf.data, - file->mult_sbuf.size, - &offset) == MSGPACK_UNPACK_SUCCESS) { - if (current_object.data.type != MSGPACK_OBJECT_MAP) { - continue; - } - - data_map = ¤t_object.data.via.map; - - continuation_length = 0; - - for (entry_index = 0; entry_index < data_map->size; entry_index++) { - entry_key = data_map->ptr[entry_index].key; - entry_value = data_map->ptr[entry_index].val; - - result = flb_log_event_encoder_append_body_msgpack_object(context, - &entry_key); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - break; - } - - /* Check if this is the last entry in the map and if that is - * the case then add the lengths of all the trailing string - * objects after the map in order to append them to the value - * but only if the value object is a string - */ - if (entry_index + 1 == data_map->size && - entry_value.type == MSGPACK_OBJECT_STR) { - adjacent_object_offset = offset; - - while (msgpack_unpack_next( - &adjacent_object, - file->mult_sbuf.data, - file->mult_sbuf.size, - &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) { - if (adjacent_object.data.type != MSGPACK_OBJECT_STR) { - break; - } - - /* Sum total bytes to append */ - continuation_length += adjacent_object.data.via.str.size + 1; - } - - result = flb_log_event_encoder_append_body_string_length( - context, - entry_value.via.str.size + - continuation_length); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - break; - } - - result = flb_log_event_encoder_append_body_string_body( - context, - (char *) entry_value.via.str.ptr, - entry_value.via.str.size); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - break; - } - - if (continuation_length > 0) { - adjacent_object_offset = offset; - - while (msgpack_unpack_next( - &adjacent_object, - file->mult_sbuf.data, - file->mult_sbuf.size, - &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) { - if (adjacent_object.data.type != MSGPACK_OBJECT_STR) { - break; - } - - result = flb_log_event_encoder_append_body_string_body( - context, - "\n", - 1); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - break; - } - - result = flb_log_event_encoder_append_body_string_body( - context, - (char *) adjacent_object.data.via.str.ptr, - adjacent_object.data.via.str.size); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - break; - } - } - } - } - else { - result = flb_log_event_encoder_append_body_msgpack_object(context, - &entry_value); - } - } - } - - msgpack_unpacked_destroy(¤t_object); - msgpack_unpacked_destroy(&adjacent_object); - - /* Reset status */ - file->mult_firstline = FLB_FALSE; - file->mult_skipping = FLB_FALSE; - file->mult_keys = 0; - file->mult_flush_timeout = 0; - - msgpack_sbuffer_destroy(&file->mult_sbuf); - - file->mult_sbuf.data = NULL; - - flb_time_zero(&file->mult_time); - - return result; -} - -/* Flush any multiline context data into outgoing buffers */ -int flb_tail_mult_flush(struct flb_tail_file *file, struct flb_tail_config *ctx) -{ - int result; - - /* nothing to flush */ - if (file->mult_firstline == FLB_FALSE) { - return -1; - } - - if (file->mult_keys == 0) { - return -1; - } - - result = flb_log_event_encoder_begin_record(file->ml_log_event_encoder); - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_set_timestamp( - file->ml_log_event_encoder, &file->mult_time); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_tail_mult_pack_line_body( - file->ml_log_event_encoder, - file); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_commit_record( - file->ml_log_event_encoder); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - flb_input_log_append(ctx->ins, - file->tag_buf, - file->tag_len, - file->ml_log_event_encoder->output_buffer, - file->ml_log_event_encoder->output_length); - result = 0; - } - else { - flb_plg_error(file->config->ins, "error packing event : %d", result); - - result = -1; - } - - flb_log_event_encoder_reset(file->ml_log_event_encoder); - - return result; -} - -static void file_pending_flush(struct flb_tail_config *ctx, - struct flb_tail_file *file, time_t now) -{ - if (file->mult_flush_timeout > now) { - return; - } - - if (file->mult_firstline == FLB_FALSE) { - if (file->mult_sbuf.data == NULL || file->mult_sbuf.size <= 0) { - return; - } - } - - flb_tail_mult_flush(file, ctx); -} - -int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx) -{ - time_t expired; - struct mk_list *head; - struct flb_tail_file *file; - - expired = time(NULL) + 3600; - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach(head, &ctx->files_static) { - file = mk_list_entry(head, struct flb_tail_file, _head); - file_pending_flush(ctx, file, expired); - } - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach(head, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - file_pending_flush(ctx, file, expired); - } - - return 0; -} - -int flb_tail_mult_pending_flush(struct flb_input_instance *ins, - struct flb_config *config, void *context) -{ - time_t now; - struct mk_list *head; - struct flb_tail_file *file; - struct flb_tail_config *ctx = context; - - now = time(NULL); - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach(head, &ctx->files_static) { - file = mk_list_entry(head, struct flb_tail_file, _head); - - file_pending_flush(ctx, file, now); - } - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach(head, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - - file_pending_flush(ctx, file, now); - } - - return 0; -} diff --git a/fluent-bit/plugins/in_tail/tail_multiline.h b/fluent-bit/plugins/in_tail/tail_multiline.h deleted file mode 100644 index d7f7539b1..000000000 --- a/fluent-bit/plugins/in_tail/tail_multiline.h +++ /dev/null @@ -1,57 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_TAIL_MULT_H -#define FLB_TAIL_TAIL_MULT_H - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input.h> - -#include "tail_config.h" -#include "tail_file.h" - -#define FLB_TAIL_MULT_NA -1 /* not applicable as a multiline stream */ -#define FLB_TAIL_MULT_DONE 0 /* finished a multiline stream */ -#define FLB_TAIL_MULT_MORE 1 /* expect more lines to come */ -#define FLB_TAIL_MULT_FLUSH "4" /* max flush time for multiline: 4 seconds */ - -struct flb_tail_mult { - struct flb_parser *parser; - struct mk_list _head; -}; - -int flb_tail_mult_create(struct flb_tail_config *ctx, - struct flb_input_instance *ins, - struct flb_config *config); - -int flb_tail_mult_destroy(struct flb_tail_config *ctx); - -int flb_tail_mult_process_content(time_t now, - char *buf, size_t len, - struct flb_tail_file *file, - struct flb_tail_config *ctx, - size_t processed_bytes); -int flb_tail_mult_flush(struct flb_tail_file *file, - struct flb_tail_config *ctx); - -int flb_tail_mult_pending_flush(struct flb_input_instance *ins, - struct flb_config *config, void *context); -int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx); - -#endif diff --git a/fluent-bit/plugins/in_tail/tail_scan.c b/fluent-bit/plugins/in_tail/tail_scan.c deleted file mode 100644 index ccb8e070a..000000000 --- a/fluent-bit/plugins/in_tail/tail_scan.c +++ /dev/null @@ -1,71 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_input_plugin.h> -#include "tail.h" -#include "tail_config.h" - -/* - * Include proper scan backend - */ -#ifdef FLB_SYSTEM_WINDOWS -#include "tail_scan_win32.c" -#else -#include "tail_scan_glob.c" -#endif - -int flb_tail_scan(struct mk_list *path_list, struct flb_tail_config *ctx) -{ - int ret; - struct mk_list *head; - struct flb_slist_entry *pattern; - - mk_list_foreach(head, path_list) { - pattern = mk_list_entry(head, struct flb_slist_entry, _head); - ret = tail_scan_path(pattern->str, ctx); - if (ret == -1) { - flb_plg_warn(ctx->ins, "error scanning path: %s", pattern->str); - } - else { - flb_plg_debug(ctx->ins, "%i new files found on path '%s'", - ret, pattern->str); - } - } - - return 0; -} - -/* - * Triggered by refresh_interval, it re-scan the path looking for new files - * that match the original path pattern. - */ -int flb_tail_scan_callback(struct flb_input_instance *ins, - struct flb_config *config, void *context) -{ - int ret; - struct flb_tail_config *ctx = context; - (void) config; - - ret = flb_tail_scan(ctx->path_list, ctx); - if (ret > 0) { - flb_plg_debug(ins, "%i new files found", ret); - } - - return ret; -} diff --git a/fluent-bit/plugins/in_tail/tail_scan.h b/fluent-bit/plugins/in_tail/tail_scan.h deleted file mode 100644 index ec3c96a2a..000000000 --- a/fluent-bit/plugins/in_tail/tail_scan.h +++ /dev/null @@ -1,29 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_SCAN_H -#define FLB_TAIL_SCAN_H - -#include "tail_config.h" - -int flb_tail_scan(struct mk_list *path, struct flb_tail_config *ctx); -int flb_tail_scan_callback(struct flb_input_instance *ins, - struct flb_config *config, void *context); - -#endif diff --git a/fluent-bit/plugins/in_tail/tail_scan_glob.c b/fluent-bit/plugins/in_tail/tail_scan_glob.c deleted file mode 100644 index b330b7c3b..000000000 --- a/fluent-bit/plugins/in_tail/tail_scan_glob.c +++ /dev/null @@ -1,278 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/types.h> -#include <sys/stat.h> -#include <glob.h> -#include <fnmatch.h> - -#include <fluent-bit/flb_compat.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_utils.h> - -#include "tail.h" -#include "tail_file.h" -#include "tail_signal.h" -#include "tail_scan.h" -#include "tail_config.h" - -/* Define missing GLOB_TILDE if not exists */ -#ifndef GLOB_TILDE -#define GLOB_TILDE 1<<2 /* use GNU Libc value */ -#define UNSUP_TILDE 1 - -/* we need these extra headers for path resolution */ -#include <limits.h> -#include <sys/types.h> -#include <pwd.h> - -static char *expand_tilde(const char *path) -{ - int len; - char user[256]; - char *p = NULL; - char *dir = NULL; - char *tmp = NULL; - struct passwd *uinfo = NULL; - - if (path[0] == '~') { - p = strchr(path, '/'); - - if (p) { - /* check case '~/' */ - if ((p - path) == 1) { - dir = getenv("HOME"); - if (!dir) { - return path; - } - } - else { - /* - * it refers to a different user: ~user/abc, first step grab - * the user name. - */ - len = (p - path) - 1; - memcpy(user, path + 1, len); - user[len] = '\0'; - - /* use getpwnam() to resolve user information */ - uinfo = getpwnam(user); - if (!uinfo) { - return path; - } - - dir = uinfo->pw_dir; - } - } - else { - dir = getenv("HOME"); - if (!dir) { - return path; - } - } - - if (p) { - tmp = flb_malloc(PATH_MAX); - if (!tmp) { - flb_errno(); - return NULL; - } - snprintf(tmp, PATH_MAX - 1, "%s%s", dir, p); - } - else { - dir = getenv("HOME"); - if (!dir) { - return path; - } - - tmp = flb_strdup(dir); - if (!tmp) { - return path; - } - } - - return tmp; - } - - return path; -} -#endif - -static int tail_is_excluded(char *path, struct flb_tail_config *ctx) -{ - struct mk_list *head; - struct flb_slist_entry *pattern; - - if (!ctx->exclude_list) { - return FLB_FALSE; - } - - mk_list_foreach(head, ctx->exclude_list) { - pattern = mk_list_entry(head, struct flb_slist_entry, _head); - if (fnmatch(pattern->str, path, 0) == 0) { - return FLB_TRUE; - } - } - - return FLB_FALSE; -} - -static inline int do_glob(const char *pattern, int flags, - void *not_used, glob_t *pglob) -{ - int ret; - int new_flags; - char *tmp = NULL; - int tmp_needs_free = FLB_FALSE; - (void) not_used; - - /* Save current values */ - new_flags = flags; - - if (flags & GLOB_TILDE) { -#ifdef UNSUP_TILDE - /* - * Some libc libraries like Musl do not support GLOB_TILDE for tilde - * expansion. A workaround is to use wordexp(3) but looking at it - * implementation in Musl it looks quite expensive: - * - * http://git.musl-libc.org/cgit/musl/tree/src/misc/wordexp.c - * - * the workaround is to do our own tilde expansion in a temporary buffer. - */ - - /* Look for a tilde */ - tmp = expand_tilde(pattern); - if (tmp != pattern) { - /* the path was expanded */ - pattern = tmp; - tmp_needs_free = FLB_TRUE; - } - - /* remove unused flag */ - new_flags &= ~GLOB_TILDE; -#endif - } - - /* invoke glob with new parameters */ - ret = glob(pattern, new_flags, NULL, pglob); - - /* remove temporary buffer, if allocated by expand_tilde above. - * Note that this buffer is only used for libc implementations - * that do not support the GLOB_TILDE flag, like musl. */ - if ((tmp != NULL) && (tmp_needs_free == FLB_TRUE)) { - flb_free(tmp); - } - - return ret; -} - -/* Scan a path, register the entries and return how many */ -static int tail_scan_path(const char *path, struct flb_tail_config *ctx) -{ - int i; - int ret; - int count = 0; - glob_t globbuf; - time_t now; - int64_t mtime; - struct stat st; - - flb_plg_debug(ctx->ins, "scanning path %s", path); - - /* Safe reset for globfree() */ - globbuf.gl_pathv = NULL; - - /* Scan the given path */ - ret = do_glob(path, GLOB_TILDE | GLOB_ERR, NULL, &globbuf); - if (ret != 0) { - switch (ret) { - case GLOB_NOSPACE: - flb_plg_error(ctx->ins, "no memory space available"); - return -1; - case GLOB_ABORTED: - flb_plg_error(ctx->ins, "read error, check permissions: %s", path); - return -1; - case GLOB_NOMATCH: - ret = stat(path, &st); - if (ret == -1) { - flb_plg_debug(ctx->ins, "cannot read info from: %s", path); - } - else { - ret = access(path, R_OK); - if (ret == -1 && errno == EACCES) { - flb_plg_error(ctx->ins, "NO read access for path: %s", path); - } - else { - flb_plg_debug(ctx->ins, "NO matches for path: %s", path); - } - } - return 0; - } - } - - - /* For every entry found, generate an output list */ - now = time(NULL); - for (i = 0; i < globbuf.gl_pathc; i++) { - ret = stat(globbuf.gl_pathv[i], &st); - if (ret == 0 && S_ISREG(st.st_mode)) { - /* Check if this file is blacklisted */ - if (tail_is_excluded(globbuf.gl_pathv[i], ctx) == FLB_TRUE) { - flb_plg_debug(ctx->ins, "excluded=%s", globbuf.gl_pathv[i]); - continue; - } - - if (ctx->ignore_older > 0) { - mtime = flb_tail_stat_mtime(&st); - if (mtime > 0) { - if ((now - ctx->ignore_older) > mtime) { - flb_plg_debug(ctx->ins, "excluded=%s (ignore_older)", - globbuf.gl_pathv[i]); - continue; - } - } - } - - /* Append file to list */ - ret = flb_tail_file_append(globbuf.gl_pathv[i], &st, - FLB_TAIL_STATIC, ctx); - if (ret == 0) { - flb_plg_debug(ctx->ins, "scan_glob add(): %s, inode %li", - globbuf.gl_pathv[i], st.st_ino); - count++; - } - else { - flb_plg_debug(ctx->ins, "scan_blog add(): dismissed: %s, inode %li", - globbuf.gl_pathv[i], st.st_ino); - } - } - else { - flb_plg_debug(ctx->ins, "skip (invalid) entry=%s", - globbuf.gl_pathv[i]); - } - } - - if (count > 0) { - tail_signal_manager(ctx); - } - - globfree(&globbuf); - return count; -} diff --git a/fluent-bit/plugins/in_tail/tail_scan_win32.c b/fluent-bit/plugins/in_tail/tail_scan_win32.c deleted file mode 100644 index 94733f065..000000000 --- a/fluent-bit/plugins/in_tail/tail_scan_win32.c +++ /dev/null @@ -1,245 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * This file implements glob-like patch matching feature for Windows - * based on Win32 API. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_compat.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_utils.h> - -#include <shlwapi.h> - -#include "tail.h" -#include "tail_file.h" -#include "tail_signal.h" -#include "tail_config.h" - -#include "win32.h" - -static int tail_is_excluded(char *path, struct flb_tail_config *ctx) -{ - struct mk_list *head; - struct flb_slist_entry *pattern; - - if (!ctx->exclude_list) { - return FLB_FALSE; - } - - mk_list_foreach(head, ctx->exclude_list) { - pattern = mk_list_entry(head, struct flb_slist_entry, _head); - if (PathMatchSpecA(path, pattern->str)) { - return FLB_TRUE; - } - } - - return FLB_FALSE; -} - -/* - * This function is a thin wrapper over flb_tail_file_append(), - * adding normalization and sanity checks on top of it. - */ -static int tail_register_file(const char *target, struct flb_tail_config *ctx, - time_t ts) -{ - int64_t mtime; - struct stat st; - char path[MAX_PATH]; - - if (_fullpath(path, target, MAX_PATH) == NULL) { - flb_plg_error(ctx->ins, "cannot get absolute path of %s", target); - return -1; - } - - if (stat(path, &st) != 0 || !S_ISREG(st.st_mode)) { - return -1; - } - - if (ctx->ignore_older > 0) { - mtime = flb_tail_stat_mtime(&st); - if (mtime > 0) { - if ((ts - ctx->ignore_older) > mtime) { - flb_plg_debug(ctx->ins, "excluded=%s (ignore_older)", - target); - return -1; - } - } - } - - if (tail_is_excluded(path, ctx) == FLB_TRUE) { - flb_plg_trace(ctx->ins, "skip '%s' (excluded)", path); - return -1; - } - - return flb_tail_file_append(path, &st, FLB_TAIL_STATIC, ctx); -} - -/* - * Perform patern match on the given path string. This function - * supports patterns with "nested" wildcards like below. - * - * tail_scan_pattern("C:\fluent-bit\*\*.txt", ctx); - * - * On success, the number of files found is returned (zero indicates - * "no file found"). On error, -1 is returned. - */ -static int tail_scan_pattern(const char *path, struct flb_tail_config *ctx) -{ - char *star, *p0, *p1; - char pattern[MAX_PATH]; - char buf[MAX_PATH]; - int ret; - int n_added = 0; - time_t now; - int64_t mtime; - HANDLE h; - WIN32_FIND_DATA data; - - if (strlen(path) > MAX_PATH - 1) { - flb_plg_error(ctx->ins, "path too long '%s'"); - return -1; - } - - star = strchr(path, '*'); - if (star == NULL) { - return -1; - } - - /* - * C:\data\tmp\input_*.conf - * 0<-----| - */ - p0 = star; - while (path <= p0 && *p0 != '\\') { - p0--; - } - - /* - * C:\data\tmp\input_*.conf - * |---->1 - */ - p1 = star; - while (*p1 && *p1 != '\\') { - p1++; - } - - memcpy(pattern, path, (p1 - path)); - pattern[p1 - path] = '\0'; - - h = FindFirstFileA(pattern, &data); - if (h == INVALID_HANDLE_VALUE) { - return 0; /* none matched */ - } - - now = time(NULL); - do { - /* Ignore the current and parent dirs */ - if (!strcmp(".", data.cFileName) || !strcmp("..", data.cFileName)) { - continue; - } - - /* Avoid an infinite loop */ - if (strchr(data.cFileName, '*')) { - continue; - } - - /* Create a path (prefix + filename + suffix) */ - memcpy(buf, path, p0 - path + 1); - buf[p0 - path + 1] = '\0'; - - if (strlen(buf) + strlen(data.cFileName) + strlen(p1) > MAX_PATH - 1) { - flb_plg_warn(ctx->ins, "'%s%s%s' is too long", buf, data.cFileName, p1); - continue; - } - strcat(buf, data.cFileName); - strcat(buf, p1); - - if (strchr(p1, '*')) { - ret = tail_scan_pattern(buf, ctx); /* recursive */ - if (ret >= 0) { - n_added += ret; - } - continue; - } - - /* Try to register the target file */ - ret = tail_register_file(buf, ctx, now); - if (ret == 0) { - n_added++; - } - } while (FindNextFileA(h, &data) != 0); - - FindClose(h); - return n_added; -} - -static int tail_filepath(char *buf, int len, const char *basedir, const char *filename) -{ - char drive[_MAX_DRIVE]; - char dir[_MAX_DIR]; - char fname[_MAX_FNAME]; - char ext[_MAX_EXT]; - char tmp[MAX_PATH]; - int ret; - - ret = _splitpath_s(basedir, drive, _MAX_DRIVE, dir, _MAX_DIR, NULL, 0, NULL, 0); - if (ret) { - return -1; - } - - ret = _splitpath_s(filename, NULL, 0, NULL, 0, fname, _MAX_FNAME, ext, _MAX_EXT); - if (ret) { - return -1; - } - - ret = _makepath_s(tmp, MAX_PATH, drive, dir, fname, ext); - if (ret) { - return -1; - } - - if (_fullpath(buf, tmp, len) == NULL) { - return -1; - } - - return 0; -} - -static int tail_scan_path(const char *path, struct flb_tail_config *ctx) -{ - int ret; - int n_added = 0; - time_t now; - - if (strchr(path, '*')) { - return tail_scan_pattern(path, ctx); - } - - /* No wildcard involved. Let's just handle the file... */ - now = time(NULL); - ret = tail_register_file(path, ctx, now); - if (ret == 0) { - n_added++; - } - - return n_added; -} diff --git a/fluent-bit/plugins/in_tail/tail_signal.h b/fluent-bit/plugins/in_tail/tail_signal.h deleted file mode 100644 index 1a81fec64..000000000 --- a/fluent-bit/plugins/in_tail/tail_signal.h +++ /dev/null @@ -1,98 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_SIGNAL_H -#define FLB_TAIL_SIGNAL_H - -#include "tail_config.h" - -static inline int tail_signal_manager(struct flb_tail_config *ctx) -{ - int n; - uint64_t val = 0xc001; - - /* - * The number of signal reads might be less than the written signals, this - * means that some event is still pending in the queue. On that case we - * don't need to signal it again. - */ - if (ctx->ch_reads < ctx->ch_writes) { - return 1; - } - - /* Reset counters: prevent an overflow, unlikely..but let's keep safe */ - if (ctx->ch_reads == ctx->ch_writes) { - ctx->ch_reads = 0; - ctx->ch_writes = 0; - } - - /* Insert a dummy event into the channel manager */ - n = flb_pipe_w(ctx->ch_manager[1], (const char *) &val, sizeof(val)); - if (n == -1) { - flb_errno(); - return -1; - } - else { - ctx->ch_writes++; - } - - return n; -} - -static inline int tail_signal_pending(struct flb_tail_config *ctx) -{ - int n; - uint64_t val = 0xc002; - - /* Insert a dummy event into the 'pending' channel */ - n = flb_pipe_w(ctx->ch_pending[1], (const char *) &val, sizeof(val)); - - /* - * If we get EAGAIN, it simply means pending channel is full. As - * notification is already pending, it's safe to ignore. - */ - if (n == -1 && !FLB_PIPE_WOULDBLOCK()) { - flb_errno(); - return -1; - } - - return n; -} - -static inline int tail_consume_pending(struct flb_tail_config *ctx) -{ - int ret; - uint64_t val; - - /* - * We need to consume the pending bytes. Loop until we would have - * blocked (pipe is empty). - */ - do { - ret = flb_pipe_r(ctx->ch_pending[0], (char *) &val, sizeof(val)); - if (ret <= 0 && !FLB_PIPE_WOULDBLOCK()) { - flb_errno(); - return -1; - } - } while (!FLB_PIPE_WOULDBLOCK()); - - return 0; -} - -#endif diff --git a/fluent-bit/plugins/in_tail/tail_sql.h b/fluent-bit/plugins/in_tail/tail_sql.h deleted file mode 100644 index 855933a01..000000000 --- a/fluent-bit/plugins/in_tail/tail_sql.h +++ /dev/null @@ -1,65 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_SQL_H -#define FLB_TAIL_SQL_H - -/* - * In Fluent Bit we try to have a common convention for table names, - * if the table belong to an input/output plugin, use plugin name - * plus to what it's about, e.g: - * - * in_tail plugin table to track files: in_tail_files - */ -#define SQL_CREATE_FILES \ - "CREATE TABLE IF NOT EXISTS in_tail_files (" \ - " id INTEGER PRIMARY KEY," \ - " name TEXT NOT NULL," \ - " offset INTEGER," \ - " inode INTEGER," \ - " created INTEGER," \ - " rotated INTEGER DEFAULT 0" \ - ");" - -#define SQL_GET_FILE \ - "SELECT * from in_tail_files WHERE inode=@inode order by id desc;" - -#define SQL_INSERT_FILE \ - "INSERT INTO in_tail_files (name, offset, inode, created)" \ - " VALUES (@name, @offset, @inode, @created);" - -#define SQL_ROTATE_FILE \ - "UPDATE in_tail_files set name=@name,rotated=1 WHERE id=@id;" - -#define SQL_UPDATE_OFFSET \ - "UPDATE in_tail_files set offset=@offset WHERE id=@id;" - -#define SQL_DELETE_FILE \ - "DELETE FROM in_tail_files WHERE id=@id;" - -#define SQL_PRAGMA_SYNC \ - "PRAGMA synchronous=%i;" - -#define SQL_PRAGMA_JOURNAL_MODE \ - "PRAGMA journal_mode=%s;" - -#define SQL_PRAGMA_LOCKING_MODE \ - "PRAGMA locking_mode=EXCLUSIVE;" - -#endif diff --git a/fluent-bit/plugins/in_tail/win32.h b/fluent-bit/plugins/in_tail/win32.h deleted file mode 100644 index a9414f892..000000000 --- a/fluent-bit/plugins/in_tail/win32.h +++ /dev/null @@ -1,67 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * This is the interface file that replaces POSIX functions - * with our own custom implementation. - */ - -#ifndef FLB_TAIL_WIN32_H -#define FLB_TAIL_WIN32_H - -#include "win32/interface.h" - -#undef open -#undef stat -#undef lstat -#undef fstat -#undef lseek - -#undef S_IFDIR -#undef S_IFCHR -#undef S_IFIFO -#undef S_IFREG -#undef S_IFLNK -#undef S_IFMT -#undef S_ISDIR -#undef S_ISCHR -#undef S_ISFIFO -#undef S_ISREG -#undef S_ISLNK - -#define open win32_open -#define stat win32_stat -#define lstat win32_lstat -#define fstat win32_fstat - -#define lseek _lseeki64 - -#define S_IFDIR WIN32_S_IFDIR -#define S_IFCHR WIN32_S_IFCHR -#define S_IFIFO WIN32_S_IFIFO -#define S_IFREG WIN32_S_IFREG -#define S_IFLNK WIN32_S_IFLNK -#define S_IFMT WIN32_S_IFMT - -#define S_ISDIR(m) (((m) & S_IFMT) == S_IFDIR) -#define S_ISCHR(m) (((m) & S_IFMT) == S_IFCHR) -#define S_ISIFO(m) (((m) & S_IFMT) == S_IFIFO) -#define S_ISREG(m) (((m) & S_IFMT) == S_IFREG) -#define S_ISLNK(m) (((m) & S_IFMT) == S_IFLNK) -#endif diff --git a/fluent-bit/plugins/in_tail/win32/interface.h b/fluent-bit/plugins/in_tail/win32/interface.h deleted file mode 100644 index 73b2ef233..000000000 --- a/fluent-bit/plugins/in_tail/win32/interface.h +++ /dev/null @@ -1,44 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_TAIL_WIN32_INTERFACE_H -#define FLB_TAIL_WIN32_INTERFACE_H - -struct win32_stat { - uint64_t st_ino; - uint16_t st_mode; - int64_t st_mtime; - int16_t st_nlink; - int64_t st_size; -}; - -int win32_stat(const char *path, struct win32_stat *wst); -int win32_lstat(const char *path, struct win32_stat *wst); -int win32_fstat(int fd, struct win32_stat *wst); - -int win32_open(const char *path, int flags); - -#define WIN32_S_IFDIR 0x1000 -#define WIN32_S_IFCHR 0x2000 -#define WIN32_S_IFIFO 0x4000 -#define WIN32_S_IFREG 0x8000 -#define WIN32_S_IFLNK 0xc000 -#define WIN32_S_IFMT 0xf000 - -#endif diff --git a/fluent-bit/plugins/in_tail/win32/io.c b/fluent-bit/plugins/in_tail/win32/io.c deleted file mode 100644 index 45928b04a..000000000 --- a/fluent-bit/plugins/in_tail/win32/io.c +++ /dev/null @@ -1,47 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <Windows.h> -#include <stdlib.h> -#include <stdint.h> -#include <fcntl.h> -#include <io.h> -#include "interface.h" - -/* - * POSIX IO emulation tailored for in_tail's usage. - * - * open(2) that does not acquire an exclusive lock. - */ - -int win32_open(const char *path, int flags) -{ - HANDLE h; - h = CreateFileA(path, - GENERIC_READ, - FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE, - NULL, /* lpSecurityAttributes */ - OPEN_EXISTING, /* dwCreationDisposition */ - 0, /* dwFlagsAndAttributes */ - NULL); /* hTemplateFile */ - if (h == INVALID_HANDLE_VALUE) { - return -1; - } - return _open_osfhandle((intptr_t) h, _O_RDONLY); -} diff --git a/fluent-bit/plugins/in_tail/win32/stat.c b/fluent-bit/plugins/in_tail/win32/stat.c deleted file mode 100644 index bce802749..000000000 --- a/fluent-bit/plugins/in_tail/win32/stat.c +++ /dev/null @@ -1,332 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <Windows.h> -#include <stdlib.h> -#include <stdint.h> -#include <io.h> -#include "interface.h" - -/* - * NTFS stat(2) emulation tailored for in_tail's usage. - * - * (1) Support st_ino (inode) for Windows NTFS. - * (2) Support NTFS symlinks. - * (3) Support large files >= 2GB. - * - * To use it, include "win32.h" and it will transparently - * replace stat(), lstat() and fstat(). - */ - -#define UINT64(high, low) ((uint64_t) (high) << 32 | (low)) -#define WINDOWS_TICKS_TO_SECONDS_RATIO 10000000 -#define WINDOWS_EPOCH_TO_UNIX_EPOCH_DELTA 11644473600 - -/* - * FILETIME timestamps are represented in 100-nanosecond intervals, - * because of this, that's why we need to divide the number by 10000000 - * in order to convert it to seconds. - * - * While UNIX timestamps use January 1, 1970 as epoch Windows FILETIME - * timestamps use January 1, 1601. Because of this we need to subtract - * 11644473600 seconds to account for it. - * - * Note: Even though this does not account for leap seconds it should be - * accurate enough. - */ - -static uint64_t filetime_to_epoch(FILETIME *ft) -{ - ULARGE_INTEGER timestamp; - - if (ft == NULL) { - return 0; - } - - timestamp.HighPart = ft->dwHighDateTime; - timestamp.LowPart = ft->dwLowDateTime; - - timestamp.QuadPart /= WINDOWS_TICKS_TO_SECONDS_RATIO; - timestamp.QuadPart -= WINDOWS_EPOCH_TO_UNIX_EPOCH_DELTA; - - return timestamp.QuadPart; -} - -static void reset_errno() -{ - errno = 0; -} - -static void propagate_last_error_to_errno() -{ - DWORD error_code; - - error_code = GetLastError(); - - switch (error_code) { - case ERROR_INVALID_TARGET_HANDLE: - case ERROR_INVALID_HANDLE: - errno = EBADF; - break; - - case ERROR_TOO_MANY_OPEN_FILES: - errno = EMFILE; - break; - - case ERROR_INVALID_FLAG_NUMBER: - case ERROR_INVALID_PARAMETER: - errno = EINVAL; - break; - - case ERROR_NOT_ENOUGH_MEMORY: - case ERROR_OUTOFMEMORY: - errno = ENOMEM; - break; - - case ERROR_SHARING_VIOLATION: - case ERROR_LOCK_VIOLATION: - case ERROR_PATH_BUSY: - case ERROR_BUSY: - errno = EBUSY; - break; - - case ERROR_HANDLE_DISK_FULL: - case ERROR_DISK_FULL: - errno = ENOSPC; - break; - - case ERROR_INVALID_ADDRESS: - errno = EFAULT; - break; - - case ERROR_FILE_TOO_LARGE: - errno = EFBIG; - break; - - case ERROR_ALREADY_EXISTS: - case ERROR_FILE_EXISTS: - errno = EEXIST; - break; - - case ERROR_FILE_NOT_FOUND: - case ERROR_PATH_NOT_FOUND: - case ERROR_INVALID_DRIVE: - case ERROR_BAD_PATHNAME: - case ERROR_INVALID_NAME: - case ERROR_BAD_UNIT: - errno = ENOENT; - break; - - case ERROR_SEEK_ON_DEVICE: - case ERROR_NEGATIVE_SEEK: - errno = ESPIPE; - break; - - case ERROR_ACCESS_DENIED: - errno = EACCES; - break; - - case ERROR_DIR_NOT_EMPTY: - errno = ENOTEMPTY; - break; - - case ERROR_BROKEN_PIPE: - errno = EPIPE; - break; - - case ERROR_GEN_FAILURE: - errno = EIO; - break; - - case ERROR_OPEN_FAILED: - errno = EIO; - break; - - case ERROR_SUCCESS: - errno = 0; - break; - - default: - /* This is just a canary, if you find this - * error then it means we need to expand the - * translation list. - */ - - errno = EOWNERDEAD; - break; - } -} - -static int get_mode(unsigned int attr) -{ - if (attr & FILE_ATTRIBUTE_DIRECTORY) { - return WIN32_S_IFDIR; - } - return WIN32_S_IFREG; -} - - - -static int is_symlink(const char *path) -{ - WIN32_FIND_DATA data; - HANDLE h; - - SetLastError(0); - reset_errno(); - - h = FindFirstFileA(path, &data); - - if (h == INVALID_HANDLE_VALUE) { - propagate_last_error_to_errno(); - - return 0; - } - - FindClose(h); - - /* - * A NTFS symlink is a file with a bit of metadata ("reparse point"), - * So (1) check if the file has metadata and then (2) confirm that - * it is indeed a symlink. - */ - if (data.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) { - if (data.dwReserved0 == IO_REPARSE_TAG_SYMLINK) { - return 1; - } - } - - return 0; -} - -static int hstat(HANDLE h, struct win32_stat *wst) -{ - BY_HANDLE_FILE_INFORMATION info; - FILE_STANDARD_INFO std; - - SetLastError(0); - reset_errno(); - - if (!GetFileInformationByHandle(h, &info)) { - propagate_last_error_to_errno(); - - return -1; - } - - if (!GetFileInformationByHandleEx(h, FileStandardInfo, - &std, sizeof(std))) { - propagate_last_error_to_errno(); - - return -1; - } - - wst->st_nlink = std.NumberOfLinks; - if (std.DeletePending) { - wst->st_nlink = 0; - } - - wst->st_mode = get_mode(info.dwFileAttributes); - wst->st_size = UINT64(info.nFileSizeHigh, info.nFileSizeLow); - wst->st_ino = UINT64(info.nFileIndexHigh, info.nFileIndexLow); - wst->st_mtime = filetime_to_epoch(&info.ftLastWriteTime); - - return 0; -} - -int win32_stat(const char *path, struct win32_stat *wst) -{ - HANDLE h; - - SetLastError(0); - reset_errno(); - - h = CreateFileA(path, - GENERIC_READ, - FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE, - NULL, /* lpSecurityAttributes */ - OPEN_EXISTING, /* dwCreationDisposition */ - 0, /* dwFlagsAndAttributes */ - NULL); /* hTemplateFile */ - - if (h == INVALID_HANDLE_VALUE) { - propagate_last_error_to_errno(); - - return -1; - } - - if (hstat(h, wst)) { - CloseHandle(h); - return -1; - } - - CloseHandle(h); - return 0; -} - -int win32_lstat(const char *path, struct win32_stat *wst) -{ - HANDLE h; - - SetLastError(0); - reset_errno(); - - h = CreateFileA(path, - GENERIC_READ, - FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE, - NULL, /* lpSecurityAttributes */ - OPEN_EXISTING, /* dwCreationDisposition */ - FILE_FLAG_OPEN_REPARSE_POINT, - NULL); /* hTemplateFile */ - - if (h == INVALID_HANDLE_VALUE) { - propagate_last_error_to_errno(); - - return -1; - } - - if (hstat(h, wst)) { - CloseHandle(h); - return -1; - } - - if (is_symlink(path)) { - wst->st_mode = WIN32_S_IFLNK; - } - - CloseHandle(h); - return 0; -} - -int win32_fstat(int fd, struct win32_stat *wst) -{ - HANDLE h; - - SetLastError(0); - reset_errno(); - - h = (HANDLE) _get_osfhandle(fd); - - if (h == INVALID_HANDLE_VALUE) { - propagate_last_error_to_errno(); - - return -1; - } - - return hstat(h, wst); -} |