diff options
Diffstat (limited to 'fluent-bit/plugins/in_tail')
29 files changed, 7244 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_tail/CMakeLists.txt b/fluent-bit/plugins/in_tail/CMakeLists.txt new file mode 100644 index 000000000..31d865218 --- /dev/null +++ b/fluent-bit/plugins/in_tail/CMakeLists.txt @@ -0,0 +1,37 @@ +set(src + tail_file.c + tail_dockermode.c + tail_scan.c + tail_config.c + tail_fs_stat.c + tail.c) + +if(FLB_HAVE_INOTIFY) +set(src + ${src} + tail_fs_inotify.c) +endif() + +if(FLB_SQLDB) +set(src + ${src} + tail_db.c) +endif() + +if(FLB_PARSER) + set(src + ${src} + tail_multiline.c + ) +endif() + +if(MSVC) + set(src + ${src} + win32/stat.c + win32/io.c + ) + FLB_PLUGIN(in_tail "${src}" "Shlwapi") +else() + FLB_PLUGIN(in_tail "${src}" "") +endif() diff --git a/fluent-bit/plugins/in_tail/tail.c b/fluent-bit/plugins/in_tail/tail.c new file mode 100644 index 000000000..34a0fec3d --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail.c @@ -0,0 +1,783 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_config_map.h> +#include <fluent-bit/flb_error.h> +#include <fluent-bit/flb_utils.h> + +#include "tail.h" +#include "tail_fs.h" +#include "tail_db.h" +#include "tail_file.h" +#include "tail_scan.h" +#include "tail_signal.h" +#include "tail_config.h" +#include "tail_dockermode.h" +#include "tail_multiline.h" + +static inline int consume_byte(flb_pipefd_t fd) +{ + int ret; + uint64_t val; + + /* We need to consume the byte */ + ret = flb_pipe_r(fd, (char *) &val, sizeof(val)); + if (ret <= 0) { + flb_errno(); + return -1; + } + + return 0; +} + +/* cb_collect callback */ +static int in_tail_collect_pending(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int ret; + int active = 0; + struct mk_list *tmp; + struct mk_list *head; + struct flb_tail_config *ctx = in_context; + struct flb_tail_file *file; + struct stat st; + uint64_t pre; + uint64_t total_processed = 0; + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach_safe(head, tmp, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + + if (file->watch_fd == -1 || + (file->offset >= file->size)) { + ret = fstat(file->fd, &st); + + if (ret == -1) { + flb_errno(); + flb_tail_file_remove(file); + continue; + } + + file->size = st.st_size; + file->pending_bytes = (file->size - file->offset); + } + else { + memset(&st, 0, sizeof(struct stat)); + } + + if (file->pending_bytes <= 0) { + file->pending_bytes = (file->size - file->offset); + } + + if (file->pending_bytes <= 0) { + continue; + } + + if (ctx->event_batch_size > 0 && + total_processed >= ctx->event_batch_size) { + break; + } + + /* get initial offset to calculate the number of processed bytes later */ + pre = file->offset; + + ret = flb_tail_file_chunk(file); + + /* Update the total number of bytes processed */ + if (file->offset > pre) { + total_processed += (file->offset - pre); + } + + switch (ret) { + case FLB_TAIL_ERROR: + /* Could not longer read the file */ + flb_tail_file_remove(file); + break; + case FLB_TAIL_OK: + case FLB_TAIL_BUSY: + /* + * Adjust counter to verify if we need a further read(2) later. + * For more details refer to tail_fs_inotify.c:96. + */ + if (file->offset < file->size) { + file->pending_bytes = (file->size - file->offset); + active++; + } + else { + file->pending_bytes = 0; + } + break; + } + } + + /* If no more active files, consume pending signal so we don't get called again. */ + if (active == 0) { + tail_consume_pending(ctx); + } + + return 0; +} + +/* cb_collect callback */ +static int in_tail_collect_static(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int ret; + int active = 0; + int pre_size; + int pos_size; + int alter_size = 0; + int completed = FLB_FALSE; + char s_size[32]; + struct mk_list *tmp; + struct mk_list *head; + struct flb_tail_config *ctx = in_context; + struct flb_tail_file *file; + uint64_t pre; + uint64_t total_processed = 0; + + /* Do a data chunk collection for each file */ + mk_list_foreach_safe(head, tmp, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + + /* + * The list 'files_static' represents all the files that were discovered + * on startup that already contains data: these are called 'static files'. + * + * When processing static files, we don't know what kind of content they + * have and what kind of 'latency' might add to process all of them in + * a row. Despite we always 'try' to do a full round and process a + * fraction of them on every invocation of this function if we have a + * huge number of files we will face latency and make the main pipeline + * to degrade performance. + * + * In order to avoid this situation, we added a new option to the plugin + * called 'static_batch_size' which basically defines how many bytes can + * be processed on every invocation to process the static files. + * + * When the limit is reached, we just break the loop and as a side effect + * we allow other events keep processing. + */ + if (ctx->static_batch_size > 0 && + total_processed >= ctx->static_batch_size) { + break; + } + + /* get initial offset to calculate the number of processed bytes later */ + pre = file->offset; + + /* Process the file */ + ret = flb_tail_file_chunk(file); + + /* Update the total number of bytes processed */ + if (file->offset > pre) { + total_processed += (file->offset - pre); + } + + switch (ret) { + case FLB_TAIL_ERROR: + /* Could not longer read the file */ + flb_plg_debug(ctx->ins, "inode=%"PRIu64" collect static ERROR", + file->inode); + flb_tail_file_remove(file); + break; + case FLB_TAIL_OK: + case FLB_TAIL_BUSY: + active++; + break; + case FLB_TAIL_WAIT: + if (file->config->exit_on_eof) { + flb_plg_info(ctx->ins, "inode=%"PRIu64" file=%s ended, stop", + file->inode, file->name); + if (ctx->files_static_count == 1) { + flb_engine_exit(config); + } + } + /* Promote file to 'events' type handler */ + flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s promote to TAIL_EVENT", + file->inode, file->name); + + /* + * When promoting a file from 'static' to 'event' mode, the promoter + * will check if the file has been rotated while it was being + * processed on this function, if so, it will try to check for the + * following condition: + * + * "discover a new possible file created due to rotation" + * + * If the condition above is met, a new file entry will be added to + * the list that we are processing and a 'new signal' will be send + * to the signal manager. But the signal manager will trigger the + * message only if no pending messages exists (to avoid queue size + * exhaustion). + * + * All good, but there is a corner case where if no 'active' files + * exists, the signal will be read and this function will not be + * called again and since the signal did not triggered the + * message, the 'new file' enqueued by the nested function + * might stay in stale mode (note that altering the length of this + * list will not be reflected yet) + * + * To fix the corner case, we use a variable called 'alter_size' + * that determinate if the size of the list keeps the same after + * a rotation, so it means: a new file was added. + * + * We use 'alter_size' as a helper in the conditional below to know + * when to stop processing the static list. + */ + if (alter_size == 0) { + pre_size = ctx->files_static_count; + } + ret = flb_tail_file_to_event(file); + if (ret == -1) { + flb_plg_debug(ctx->ins, "file=%s cannot promote, unregistering", + file->name); + flb_tail_file_remove(file); + } + + if (alter_size == 0) { + pos_size = ctx->files_static_count; + if (pre_size == pos_size) { + alter_size++; + } + } + break; + } + } + + /* + * If there are no more active static handlers, we consume the 'byte' that + * triggered this event so this is not longer called again. + */ + if (active == 0 && alter_size == 0) { + consume_byte(ctx->ch_manager[0]); + ctx->ch_reads++; + completed = FLB_TRUE; + } + + /* Debugging number of processed bytes */ + if (flb_log_check_level(ctx->ins->log_level, FLB_LOG_DEBUG)) { + flb_utils_bytes_to_human_readable_size(total_processed, + s_size, sizeof(s_size)); + if (completed) { + flb_plg_debug(ctx->ins, "[static files] processed %s, done", s_size); + } + else { + flb_plg_debug(ctx->ins, "[static files] processed %s", s_size); + } + } + + return 0; +} + +static int in_tail_watcher_callback(struct flb_input_instance *ins, + struct flb_config *config, void *context) +{ + int ret = 0; + struct mk_list *tmp; + struct mk_list *head; + struct flb_tail_config *ctx = context; + struct flb_tail_file *file; + (void) config; + + mk_list_foreach_safe(head, tmp, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + if (file->is_link == FLB_TRUE) { + ret = flb_tail_file_is_rotated(ctx, file); + if (ret == FLB_FALSE) { + continue; + } + + /* The symbolic link name has been rotated */ + flb_tail_file_rotated(file); + } + } + return ret; +} + +int in_tail_collect_event(void *file, struct flb_config *config) +{ + int ret; + struct stat st; + struct flb_tail_file *f = file; + + ret = fstat(f->fd, &st); + if (ret == -1) { + flb_tail_file_remove(f); + return 0; + } + + ret = flb_tail_file_chunk(f); + switch (ret) { + case FLB_TAIL_ERROR: + /* Could not longer read the file */ + flb_tail_file_remove(f); + break; + case FLB_TAIL_OK: + case FLB_TAIL_WAIT: + break; + } + + return 0; +} + +/* Initialize plugin */ +static int in_tail_init(struct flb_input_instance *in, + struct flb_config *config, void *data) +{ + int ret = -1; + struct flb_tail_config *ctx = NULL; + + /* Allocate space for the configuration */ + ctx = flb_tail_config_create(in, config); + if (!ctx) { + return -1; + } + ctx->ins = in; + + /* Initialize file-system watcher */ + ret = flb_tail_fs_init(in, ctx, config); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return -1; + } + + /* Scan path */ + flb_tail_scan(ctx->path_list, ctx); + + /* + * After the first scan (on start time), all new files discovered needs to be + * read from head, so we switch the 'read_from_head' flag to true so any + * other file discovered after a scan or a rotation are read from the + * beginning. + */ + ctx->read_from_head = FLB_TRUE; + + /* Set plugin context */ + flb_input_set_context(in, ctx); + + /* Register an event collector */ + ret = flb_input_set_collector_event(in, in_tail_collect_static, + ctx->ch_manager[0], config); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return -1; + } + ctx->coll_fd_static = ret; + + /* Register re-scan: time managed by 'refresh_interval' property */ + ret = flb_input_set_collector_time(in, flb_tail_scan_callback, + ctx->refresh_interval_sec, + ctx->refresh_interval_nsec, + config); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return -1; + } + ctx->coll_fd_scan = ret; + + /* Register watcher, interval managed by 'watcher_interval' property */ + ret = flb_input_set_collector_time(in, in_tail_watcher_callback, + ctx->watcher_interval, 0, + config); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return -1; + } + ctx->coll_fd_watcher = ret; + + /* Register callback to purge rotated files */ + ret = flb_input_set_collector_time(in, flb_tail_file_purge, + ctx->rotate_wait, 0, + config); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return -1; + } + ctx->coll_fd_rotated = ret; + + /* Register callback to process pending bytes in promoted files */ + ret = flb_input_set_collector_event(in, in_tail_collect_pending, + ctx->ch_pending[0], config);//1, 0, config); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return -1; + } + ctx->coll_fd_pending = ret; + + + if (ctx->multiline == FLB_TRUE && ctx->parser) { + ctx->parser = NULL; + flb_plg_warn(in, "on multiline mode 'Parser' is not allowed " + "(parser disabled)"); + } + + /* Register callback to process docker mode queued buffer */ + if (ctx->docker_mode == FLB_TRUE) { + ret = flb_input_set_collector_time(in, flb_tail_dmode_pending_flush, + ctx->docker_mode_flush, 0, + config); + if (ret == -1) { + ctx->docker_mode = FLB_FALSE; + flb_tail_config_destroy(ctx); + return -1; + } + ctx->coll_fd_dmode_flush = ret; + } + +#ifdef FLB_HAVE_PARSER + /* Register callback to process multiline queued buffer */ + if (ctx->multiline == FLB_TRUE) { + ret = flb_input_set_collector_time(in, flb_tail_mult_pending_flush, + ctx->multiline_flush, 0, + config); + if (ret == -1) { + ctx->multiline = FLB_FALSE; + flb_tail_config_destroy(ctx); + return -1; + } + ctx->coll_fd_mult_flush = ret; + } +#endif + + return 0; +} + +/* Pre-run callback / before the event loop */ +static int in_tail_pre_run(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + struct flb_tail_config *ctx = in_context; + (void) ins; + + return tail_signal_manager(ctx); +} + +static int in_tail_exit(void *data, struct flb_config *config) +{ + (void) *config; + struct flb_tail_config *ctx = data; + + flb_tail_file_remove_all(ctx); + flb_tail_fs_exit(ctx); + flb_tail_config_destroy(ctx); + + return 0; +} + +static void in_tail_pause(void *data, struct flb_config *config) +{ + struct flb_tail_config *ctx = data; + + /* + * Pause general collectors: + * + * - static : static files lookup before promotion + */ + flb_input_collector_pause(ctx->coll_fd_static, ctx->ins); + flb_input_collector_pause(ctx->coll_fd_pending, ctx->ins); + + if (ctx->docker_mode == FLB_TRUE) { + flb_input_collector_pause(ctx->coll_fd_dmode_flush, ctx->ins); + if (config->is_ingestion_active == FLB_FALSE) { + flb_plg_info(ctx->ins, "flushing pending docker mode data..."); + flb_tail_dmode_pending_flush_all(ctx); + } + } + + if (ctx->multiline == FLB_TRUE) { + flb_input_collector_pause(ctx->coll_fd_mult_flush, ctx->ins); + if (config->is_ingestion_active == FLB_FALSE) { + flb_plg_info(ctx->ins, "flushing pending multiline data..."); + flb_tail_mult_pending_flush_all(ctx); + } + } + + /* Pause file system backend handlers */ + flb_tail_fs_pause(ctx); +} + +static void in_tail_resume(void *data, struct flb_config *config) +{ + struct flb_tail_config *ctx = data; + + flb_input_collector_resume(ctx->coll_fd_static, ctx->ins); + flb_input_collector_resume(ctx->coll_fd_pending, ctx->ins); + + if (ctx->docker_mode == FLB_TRUE) { + flb_input_collector_resume(ctx->coll_fd_dmode_flush, ctx->ins); + } + + if (ctx->multiline == FLB_TRUE) { + flb_input_collector_resume(ctx->coll_fd_mult_flush, ctx->ins); + } + + /* Pause file system backend handlers */ + flb_tail_fs_resume(ctx); +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_CLIST, "path", NULL, + 0, FLB_TRUE, offsetof(struct flb_tail_config, path_list), + "pattern specifying log files or multiple ones through " + "the use of common wildcards." + }, + { + FLB_CONFIG_MAP_CLIST, "exclude_path", NULL, + 0, FLB_TRUE, offsetof(struct flb_tail_config, exclude_list), + "Set one or multiple shell patterns separated by commas to exclude " + "files matching a certain criteria, e.g: 'exclude_path *.gz,*.zip'" + }, + { + FLB_CONFIG_MAP_STR, "key", "log", + 0, FLB_TRUE, offsetof(struct flb_tail_config, key), + "when a message is unstructured (no parser applied), it's appended " + "as a string under the key name log. This option allows to define an " + "alternative name for that key." + }, + { + FLB_CONFIG_MAP_BOOL, "read_from_head", "false", + 0, FLB_TRUE, offsetof(struct flb_tail_config, read_from_head), + "For new discovered files on start (without a database offset/position), read the " + "content from the head of the file, not tail." + }, + { + FLB_CONFIG_MAP_STR, "refresh_interval", "60", + 0, FLB_FALSE, 0, + "interval to refresh the list of watched files expressed in seconds." + }, + { + FLB_CONFIG_MAP_TIME, "watcher_interval", "2s", + 0, FLB_TRUE, offsetof(struct flb_tail_config, watcher_interval), + }, + { + FLB_CONFIG_MAP_TIME, "progress_check_interval", "2s", + 0, FLB_TRUE, offsetof(struct flb_tail_config, progress_check_interval), + }, + { + FLB_CONFIG_MAP_INT, "progress_check_interval_nsec", "0", + 0, FLB_TRUE, offsetof(struct flb_tail_config, progress_check_interval_nsec), + }, + { + FLB_CONFIG_MAP_TIME, "rotate_wait", FLB_TAIL_ROTATE_WAIT, + 0, FLB_TRUE, offsetof(struct flb_tail_config, rotate_wait), + "specify the number of extra time in seconds to monitor a file once is " + "rotated in case some pending data is flushed." + }, + { + FLB_CONFIG_MAP_BOOL, "docker_mode", "false", + 0, FLB_TRUE, offsetof(struct flb_tail_config, docker_mode), + "If enabled, the plugin will recombine split Docker log lines before " + "passing them to any parser as configured above. This mode cannot be " + "used at the same time as Multiline." + }, + { + FLB_CONFIG_MAP_INT, "docker_mode_flush", "4", + 0, FLB_TRUE, offsetof(struct flb_tail_config, docker_mode_flush), + "wait period time in seconds to flush queued unfinished split lines." + + }, +#ifdef FLB_HAVE_REGEX + { + FLB_CONFIG_MAP_STR, "docker_mode_parser", NULL, + 0, FLB_FALSE, 0, + "specify the parser name to fetch log first line for muliline log" + }, +#endif + { + FLB_CONFIG_MAP_STR, "path_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_tail_config, path_key), + "set the 'key' name where the name of monitored file will be appended." + }, + { + FLB_CONFIG_MAP_STR, "offset_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_tail_config, offset_key), + "set the 'key' name where the offset of monitored file will be appended." + }, + { + FLB_CONFIG_MAP_TIME, "ignore_older", "0", + 0, FLB_TRUE, offsetof(struct flb_tail_config, ignore_older), + "ignore files older than 'ignore_older'. Supports m,h,d (minutes, " + "hours, days) syntax. Default behavior is to read all the files." + }, + { + FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", FLB_TAIL_CHUNK, + 0, FLB_TRUE, offsetof(struct flb_tail_config, buf_chunk_size), + "set the initial buffer size to read data from files. This value is " + "used too to increase buffer size." + }, + { + FLB_CONFIG_MAP_SIZE, "buffer_max_size", FLB_TAIL_CHUNK, + 0, FLB_TRUE, offsetof(struct flb_tail_config, buf_max_size), + "set the limit of the buffer size per monitored file. When a buffer " + "needs to be increased (e.g: very long lines), this value is used to " + "restrict how much the memory buffer can grow. If reading a file exceed " + "this limit, the file is removed from the monitored file list." + }, + { + FLB_CONFIG_MAP_SIZE, "static_batch_size", FLB_TAIL_STATIC_BATCH_SIZE, + 0, FLB_TRUE, offsetof(struct flb_tail_config, static_batch_size), + "On start, Fluent Bit might process files which already contains data, " + "these files are called 'static' files. The configuration property " + "in question set's the maximum number of bytes to process per iteration " + "for the static files monitored." + }, + { + FLB_CONFIG_MAP_SIZE, "event_batch_size", FLB_TAIL_EVENT_BATCH_SIZE, + 0, FLB_TRUE, offsetof(struct flb_tail_config, event_batch_size), + "When Fluent Bit is processing files in event based mode the amount of" + "data available for consumption could be too much and cause the input plugin " + "to over extend and smother other plugins" + "The configuration property sets the maximum number of bytes to process per iteration " + "for the files monitored (in event mode)." + }, + { + FLB_CONFIG_MAP_BOOL, "skip_long_lines", "false", + 0, FLB_TRUE, offsetof(struct flb_tail_config, skip_long_lines), + "if a monitored file reach it buffer capacity due to a very long line " + "(buffer_max_size), the default behavior is to stop monitoring that " + "file. This option alter that behavior and instruct Fluent Bit to skip " + "long lines and continue processing other lines that fits into the buffer." + }, + { + FLB_CONFIG_MAP_BOOL, "exit_on_eof", "false", + 0, FLB_TRUE, offsetof(struct flb_tail_config, exit_on_eof), + "exit Fluent Bit when reaching EOF on a monitored file." + }, + + { + FLB_CONFIG_MAP_BOOL, "skip_empty_lines", "false", + 0, FLB_TRUE, offsetof(struct flb_tail_config, skip_empty_lines), + "Allows to skip empty lines." + }, + +#ifdef FLB_HAVE_INOTIFY + { + FLB_CONFIG_MAP_BOOL, "inotify_watcher", "true", + 0, FLB_TRUE, offsetof(struct flb_tail_config, inotify_watcher), + "set to false to use file stat watcher instead of inotify." + }, +#endif +#ifdef FLB_HAVE_REGEX + { + FLB_CONFIG_MAP_STR, "parser", NULL, + 0, FLB_FALSE, 0, + "specify the parser name to process an unstructured message." + }, + { + FLB_CONFIG_MAP_STR, "tag_regex", NULL, + 0, FLB_FALSE, 0, + "set a regex to extract fields from the file name and use them later to " + "compose the Tag." + }, +#endif + +#ifdef FLB_HAVE_SQLDB + { + FLB_CONFIG_MAP_STR, "db", NULL, + 0, FLB_FALSE, 0, + "set a database file to keep track of monitored files and it offsets." + }, + { + FLB_CONFIG_MAP_STR, "db.sync", "normal", + 0, FLB_FALSE, 0, + "set a database sync method. values: extra, full, normal and off." + }, + { + FLB_CONFIG_MAP_BOOL, "db.locking", "false", + 0, FLB_TRUE, offsetof(struct flb_tail_config, db_locking), + "set exclusive locking mode, increase performance but don't allow " + "external connections to the database file." + }, + { + FLB_CONFIG_MAP_STR, "db.journal_mode", "WAL", + 0, FLB_TRUE, offsetof(struct flb_tail_config, db_journal_mode), + "Option to provide WAL configuration for Work Ahead Logging mechanism (WAL). Enabling WAL " + "provides higher performance. Note that WAL is not compatible with " + "shared network file systems." + }, +#endif + + /* Multiline Options */ +#ifdef FLB_HAVE_PARSER + { + FLB_CONFIG_MAP_BOOL, "multiline", "false", + 0, FLB_TRUE, offsetof(struct flb_tail_config, multiline), + "if enabled, the plugin will try to discover multiline messages and use " + "the proper parsers to compose the outgoing messages. Note that when this " + "option is enabled the Parser option is not used." + }, + { + FLB_CONFIG_MAP_TIME, "multiline_flush", FLB_TAIL_MULT_FLUSH, + 0, FLB_TRUE, offsetof(struct flb_tail_config, multiline_flush), + "wait period time in seconds to process queued multiline messages." + }, + { + FLB_CONFIG_MAP_STR, "parser_firstline", NULL, + 0, FLB_FALSE, 0, + "name of the parser that matches the beginning of a multiline message. " + "Note that the regular expression defined in the parser must include a " + "group name (named capture)." + }, + { + FLB_CONFIG_MAP_STR_PREFIX, "parser_", NULL, + 0, FLB_FALSE, 0, + "optional extra parser to interpret and structure multiline entries. This " + "option can be used to define multiple parsers, e.g: Parser_1 ab1, " + "Parser_2 ab2, Parser_N abN." + }, + + /* Multiline Core Engine based API */ + { + FLB_CONFIG_MAP_CLIST, "multiline.parser", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_tail_config, multiline_parsers), + "specify one or multiple multiline parsers: docker, cri, go, java, etc." + }, +#endif + + /* EOF */ + {0} +}; + +struct flb_input_plugin in_tail_plugin = { + .name = "tail", + .description = "Tail files", + .cb_init = in_tail_init, + .cb_pre_run = in_tail_pre_run, + .cb_collect = NULL, + .cb_flush_buf = NULL, + .cb_pause = in_tail_pause, + .cb_resume = in_tail_resume, + .cb_exit = in_tail_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/fluent-bit/plugins/in_tail/tail.h b/fluent-bit/plugins/in_tail/tail.h new file mode 100644 index 000000000..074f7a49f --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail.h @@ -0,0 +1,45 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_H +#define FLB_TAIL_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> + +/* Internal return values */ +#define FLB_TAIL_ERROR -1 +#define FLB_TAIL_OK 0 +#define FLB_TAIL_WAIT 1 +#define FLB_TAIL_BUSY 2 + +/* Consuming mode */ +#define FLB_TAIL_STATIC 0 /* Data is being consumed through read(2) */ +#define FLB_TAIL_EVENT 1 /* Data is being consumed through inotify */ + +/* Config */ +#define FLB_TAIL_CHUNK "32768" /* buffer chunk = 32KB */ +#define FLB_TAIL_REFRESH 60 /* refresh every 60 seconds */ +#define FLB_TAIL_ROTATE_WAIT "5" /* time to monitor after rotation */ +#define FLB_TAIL_STATIC_BATCH_SIZE "50M" /* static batch size */ +#define FLB_TAIL_EVENT_BATCH_SIZE "50M" /* event batch size */ + +int in_tail_collect_event(void *file, struct flb_config *config); + +#endif diff --git a/fluent-bit/plugins/in_tail/tail_config.c b/fluent-bit/plugins/in_tail/tail_config.c new file mode 100644 index 000000000..360b3dbea --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_config.c @@ -0,0 +1,472 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_parser.h> + +#include <stdlib.h> +#include <fcntl.h> + +#include "tail_fs.h" +#include "tail_db.h" +#include "tail_config.h" +#include "tail_scan.h" +#include "tail_sql.h" +#include "tail_dockermode.h" + +#ifdef FLB_HAVE_PARSER +#include "tail_multiline.h" +#endif + +static int multiline_load_parsers(struct flb_tail_config *ctx) +{ + struct mk_list *head; + struct mk_list *head_p; + struct flb_config_map_val *mv; + struct flb_slist_entry *val = NULL; + struct flb_ml_parser_ins *parser_i; + + if (!ctx->multiline_parsers) { + return 0; + } + + /* Create Multiline context using the plugin instance name */ + ctx->ml_ctx = flb_ml_create(ctx->config, ctx->ins->name); + if (!ctx->ml_ctx) { + return -1; + } + + /* + * Iterate all 'multiline.parser' entries. Every entry is considered + * a group which can have multiple multiline parser instances. + */ + flb_config_map_foreach(head, mv, ctx->multiline_parsers) { + mk_list_foreach(head_p, mv->val.list) { + val = mk_list_entry(head_p, struct flb_slist_entry, _head); + + /* Create an instance of the defined parser */ + parser_i = flb_ml_parser_instance_create(ctx->ml_ctx, val->str); + if (!parser_i) { + return -1; + } + } + } + + return 0; +} + +struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, + struct flb_config *config) +{ + int ret; + int sec; + int i; + long nsec; + const char *tmp; + struct flb_tail_config *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_tail_config)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->config = config; + ctx->ins = ins; + ctx->ignore_older = 0; + ctx->skip_long_lines = FLB_FALSE; +#ifdef FLB_HAVE_SQLDB + ctx->db_sync = 1; /* sqlite sync 'normal' */ +#endif + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Create the channel manager */ + ret = flb_pipe_create(ctx->ch_manager); + if (ret == -1) { + flb_errno(); + flb_free(ctx); + return NULL; + } + ctx->ch_reads = 0; + ctx->ch_writes = 0; + + /* Create the pending channel */ + ret = flb_pipe_create(ctx->ch_pending); + if (ret == -1) { + flb_errno(); + flb_tail_config_destroy(ctx); + return NULL; + } + /* Make pending channel non-blocking */ + for (i = 0; i <= 1; i++) { + ret = flb_pipe_set_nonblocking(ctx->ch_pending[i]); + if (ret == -1) { + flb_errno(); + flb_tail_config_destroy(ctx); + return NULL; + } + } + + /* Config: path/pattern to read files */ + if (!ctx->path_list || mk_list_size(ctx->path_list) == 0) { + flb_plg_error(ctx->ins, "no input 'path' was given"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* Config: seconds interval before to re-scan the path */ + tmp = flb_input_get_property("refresh_interval", ins); + if (!tmp) { + ctx->refresh_interval_sec = FLB_TAIL_REFRESH; + ctx->refresh_interval_nsec = 0; + } + else { + ret = flb_utils_time_split(tmp, &sec, &nsec); + if (ret == 0) { + ctx->refresh_interval_sec = sec; + ctx->refresh_interval_nsec = nsec; + + if (sec == 0 && nsec == 0) { + flb_plg_error(ctx->ins, "invalid 'refresh_interval' config " + "value (%s)", tmp); + flb_free(ctx); + return NULL; + } + + if (sec == 0 && nsec <= 1000000) { + flb_plg_warn(ctx->ins, "very low refresh_interval " + "(%i.%lu nanoseconds) might cause high CPU usage", + sec, nsec); + } + } + else { + flb_plg_error(ctx->ins, + "invalid 'refresh_interval' config value (%s)", + tmp); + flb_tail_config_destroy(ctx); + return NULL; + } + } + + /* Config: seconds interval to monitor file after rotation */ + if (ctx->rotate_wait <= 0) { + flb_plg_error(ctx->ins, "invalid 'rotate_wait' config value"); + flb_free(ctx); + return NULL; + } + +#ifdef FLB_HAVE_PARSER + /* Config: multi-line support */ + if (ctx->multiline == FLB_TRUE) { + ret = flb_tail_mult_create(ctx, ins, config); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return NULL; + } + } +#endif + + /* Config: Docker mode */ + if(ctx->docker_mode == FLB_TRUE) { + ret = flb_tail_dmode_create(ctx, ins, config); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return NULL; + } + } + + /* Validate buffer limit */ + if (ctx->buf_chunk_size > ctx->buf_max_size) { + flb_plg_error(ctx->ins, "buffer_max_size must be >= buffer_chunk"); + flb_free(ctx); + return NULL; + } + +#ifdef FLB_HAVE_REGEX + /* Parser / Format */ + tmp = flb_input_get_property("parser", ins); + if (tmp) { + ctx->parser = flb_parser_get(tmp, config); + if (!ctx->parser) { + flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp); + } + } +#endif + + mk_list_init(&ctx->files_static); + mk_list_init(&ctx->files_event); + mk_list_init(&ctx->files_rotated); + + /* hash table for files lookups */ + ctx->static_hash = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1000, 0); + if (!ctx->static_hash) { + flb_plg_error(ctx->ins, "could not create static hash"); + flb_tail_config_destroy(ctx); + return NULL; + } + + ctx->event_hash = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1000, 0); + if (!ctx->event_hash) { + flb_plg_error(ctx->ins, "could not create event hash"); + flb_tail_config_destroy(ctx); + return NULL; + } + +#ifdef FLB_HAVE_SQLDB + ctx->db = NULL; +#endif + +#ifdef FLB_HAVE_REGEX + tmp = flb_input_get_property("tag_regex", ins); + if (tmp) { + ctx->tag_regex = flb_regex_create(tmp); + if (ctx->tag_regex) { + ctx->dynamic_tag = FLB_TRUE; + } + else { + flb_plg_error(ctx->ins, "invalid 'tag_regex' config value"); + } + } + else { + ctx->tag_regex = NULL; + } +#endif + + /* Check if it should use dynamic tags */ + tmp = strchr(ins->tag, '*'); + if (tmp) { + ctx->dynamic_tag = FLB_TRUE; + } + +#ifdef FLB_HAVE_SQLDB + /* Database options (needs to be set before the context) */ + tmp = flb_input_get_property("db.sync", ins); + if (tmp) { + if (strcasecmp(tmp, "extra") == 0) { + ctx->db_sync = 3; + } + else if (strcasecmp(tmp, "full") == 0) { + ctx->db_sync = 2; + } + else if (strcasecmp(tmp, "normal") == 0) { + ctx->db_sync = 1; + } + else if (strcasecmp(tmp, "off") == 0) { + ctx->db_sync = 0; + } + else { + flb_plg_error(ctx->ins, "invalid database 'db.sync' value"); + } + } + + /* Initialize database */ + tmp = flb_input_get_property("db", ins); + if (tmp) { + ctx->db = flb_tail_db_open(tmp, ins, ctx, config); + if (!ctx->db) { + flb_plg_error(ctx->ins, "could not open/create database"); + flb_tail_config_destroy(ctx); + return NULL; + } + } + + /* Journal mode check */ + tmp = flb_input_get_property("db.journal_mode", ins); + if (tmp) { + if (strcasecmp(tmp, "DELETE") != 0 && + strcasecmp(tmp, "TRUNCATE") != 0 && + strcasecmp(tmp, "PERSIST") != 0 && + strcasecmp(tmp, "MEMORY") != 0 && + strcasecmp(tmp, "WAL") != 0 && + strcasecmp(tmp, "OFF") != 0) { + + flb_plg_error(ctx->ins, "invalid db.journal_mode=%s", tmp); + flb_tail_config_destroy(ctx); + return NULL; + } + } + + /* Prepare Statement */ + if (ctx->db) { + /* SQL_GET_FILE */ + ret = sqlite3_prepare_v2(ctx->db->handler, + SQL_GET_FILE, + -1, + &ctx->stmt_get_file, + 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* SQL_INSERT_FILE */ + ret = sqlite3_prepare_v2(ctx->db->handler, + SQL_INSERT_FILE, + -1, + &ctx->stmt_insert_file, + 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* SQL_ROTATE_FILE */ + ret = sqlite3_prepare_v2(ctx->db->handler, + SQL_ROTATE_FILE, + -1, + &ctx->stmt_rotate_file, + 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* SQL_UPDATE_OFFSET */ + ret = sqlite3_prepare_v2(ctx->db->handler, + SQL_UPDATE_OFFSET, + -1, + &ctx->stmt_offset, + 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* SQL_DELETE_FILE */ + ret = sqlite3_prepare_v2(ctx->db->handler, + SQL_DELETE_FILE, + -1, + &ctx->stmt_delete_file, + 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement"); + flb_tail_config_destroy(ctx); + return NULL; + } + + } +#endif + +#ifdef FLB_HAVE_PARSER + /* Multiline core API */ + if (ctx->multiline_parsers && mk_list_size(ctx->multiline_parsers) > 0) { + ret = multiline_load_parsers(ctx); + if (ret != 0) { + flb_plg_error(ctx->ins, "could not load multiline parsers"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* Enable auto-flush routine */ + ret = flb_ml_auto_flush_init(ctx->ml_ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not start multiline auto-flush"); + flb_tail_config_destroy(ctx); + return NULL; + } + flb_plg_info(ctx->ins, "multiline core started"); + } +#endif + +#ifdef FLB_HAVE_METRICS + ctx->cmt_files_opened = cmt_counter_create(ins->cmt, + "fluentbit", "input", + "files_opened_total", + "Total number of opened files", + 1, (char *[]) {"name"}); + + ctx->cmt_files_closed = cmt_counter_create(ins->cmt, + "fluentbit", "input", + "files_closed_total", + "Total number of closed files", + 1, (char *[]) {"name"}); + + ctx->cmt_files_rotated = cmt_counter_create(ins->cmt, + "fluentbit", "input", + "files_rotated_total", + "Total number of rotated files", + 1, (char *[]) {"name"}); + + /* OLD metrics */ + flb_metrics_add(FLB_TAIL_METRIC_F_OPENED, + "files_opened", ctx->ins->metrics); + flb_metrics_add(FLB_TAIL_METRIC_F_CLOSED, + "files_closed", ctx->ins->metrics); + flb_metrics_add(FLB_TAIL_METRIC_F_ROTATED, + "files_rotated", ctx->ins->metrics); +#endif + + return ctx; +} + +int flb_tail_config_destroy(struct flb_tail_config *config) +{ + +#ifdef FLB_HAVE_PARSER + flb_tail_mult_destroy(config); + + if (config->ml_ctx) { + flb_ml_destroy(config->ml_ctx); + } +#endif + + /* Close pipe ends */ + flb_pipe_close(config->ch_manager[0]); + flb_pipe_close(config->ch_manager[1]); + flb_pipe_close(config->ch_pending[0]); + flb_pipe_close(config->ch_pending[1]); + +#ifdef FLB_HAVE_REGEX + if (config->tag_regex) { + flb_regex_destroy(config->tag_regex); + } +#endif + +#ifdef FLB_HAVE_SQLDB + if (config->db != NULL) { + sqlite3_finalize(config->stmt_get_file); + sqlite3_finalize(config->stmt_insert_file); + sqlite3_finalize(config->stmt_delete_file); + sqlite3_finalize(config->stmt_rotate_file); + sqlite3_finalize(config->stmt_offset); + flb_tail_db_close(config->db); + } +#endif + + if (config->static_hash) { + flb_hash_table_destroy(config->static_hash); + } + if (config->event_hash) { + flb_hash_table_destroy(config->event_hash); + } + + flb_free(config); + return 0; +} diff --git a/fluent-bit/plugins/in_tail/tail_config.h b/fluent-bit/plugins/in_tail/tail_config.h new file mode 100644 index 000000000..dcfa54e02 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_config.h @@ -0,0 +1,168 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_CONFIG_H +#define FLB_TAIL_CONFIG_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_parser.h> +#include <fluent-bit/flb_macros.h> +#include <fluent-bit/flb_sqldb.h> +#include <fluent-bit/flb_metrics.h> +#include <fluent-bit/flb_log_event.h> +#ifdef FLB_HAVE_REGEX +#include <fluent-bit/flb_regex.h> +#endif +#ifdef FLB_HAVE_PARSER +#include <fluent-bit/multiline/flb_ml.h> +#endif + +#include <xxhash.h> + +/* Metrics */ +#ifdef FLB_HAVE_METRICS +#define FLB_TAIL_METRIC_F_OPENED 100 /* number of opened files */ +#define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */ +#define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */ +#endif + +struct flb_tail_config { + int fd_notify; /* inotify fd */ + flb_pipefd_t ch_manager[2]; /* pipe: channel manager */ + flb_pipefd_t ch_pending[2]; /* pipe: pending events */ + int ch_reads; /* count number if signal reads */ + int ch_writes; /* count number of signal writes */ + + /* Buffer Config */ + size_t buf_chunk_size; /* allocation chunks */ + size_t buf_max_size; /* max size of a buffer */ + + /* Static files processor */ + size_t static_batch_size; + + /* Event files processor */ + size_t event_batch_size; + + /* Collectors */ + int coll_fd_static; + int coll_fd_scan; + int coll_fd_watcher; + int coll_fd_rotated; + int coll_fd_pending; + int coll_fd_inactive; + int coll_fd_dmode_flush; + int coll_fd_mult_flush; + int coll_fd_progress_check; + + /* Backend collectors */ + int coll_fd_fs1; /* used by fs_inotify & fs_stat */ + int coll_fd_fs2; /* only used by fs_stat */ + + /* Configuration */ + int dynamic_tag; /* dynamic tag ? e.g: abc.* */ +#ifdef FLB_HAVE_REGEX + struct flb_regex *tag_regex;/* path to tag regex */ +#endif + int refresh_interval_sec; /* seconds to re-scan */ + long refresh_interval_nsec;/* nanoseconds to re-scan */ + int read_from_head; /* read new files from head */ + int rotate_wait; /* sec to wait on rotated files */ + int watcher_interval; /* watcher interval */ + int ignore_older; /* ignore fields older than X seconds */ + time_t last_pending; /* last time a 'pending signal' was emitted' */ + struct mk_list *path_list; /* list of paths to scan (glob) */ + flb_sds_t path_key; /* key name of file path */ + flb_sds_t key; /* key for unstructured record */ + int skip_long_lines; /* skip long lines */ + int skip_empty_lines; /* skip empty lines (off) */ + int exit_on_eof; /* exit fluent-bit on EOF, test */ + + int progress_check_interval; /* watcher interval */ + int progress_check_interval_nsec; /* watcher interval */ + +#ifdef FLB_HAVE_INOTIFY + int inotify_watcher; /* enable/disable inotify monitor */ +#endif + flb_sds_t offset_key; /* key name of file offset */ + + /* Database */ +#ifdef FLB_HAVE_SQLDB + struct flb_sqldb *db; + int db_sync; + int db_locking; + flb_sds_t db_journal_mode; + sqlite3_stmt *stmt_get_file; + sqlite3_stmt *stmt_insert_file; + sqlite3_stmt *stmt_delete_file; + sqlite3_stmt *stmt_rotate_file; + sqlite3_stmt *stmt_offset; +#endif + + /* Parser / Format */ + struct flb_parser *parser; + + /* Multiline */ + int multiline; /* multiline enabled ? */ + int multiline_flush; /* multiline flush/wait */ + struct flb_parser *mult_parser_firstline; + struct mk_list mult_parsers; + + /* Docker mode */ + int docker_mode; /* Docker mode enabled ? */ + int docker_mode_flush; /* Docker mode flush/wait */ + struct flb_parser *docker_mode_parser; /* Parser for separate multiline logs */ + + /* Multiline core engine */ + struct flb_ml *ml_ctx; + struct mk_list *multiline_parsers; + + uint64_t files_static_count; /* number of items in the static file list */ + struct mk_list files_static; + struct mk_list files_event; + + /* List of rotated files that needs to be removed after 'rotate_wait' */ + struct mk_list files_rotated; + + /* List of shell patterns used to exclude certain file names */ + struct mk_list *exclude_list; + + /* Plugin input instance */ + struct flb_input_instance *ins; + + struct flb_log_event_encoder log_event_encoder; + struct flb_log_event_decoder log_event_decoder; + + /* Metrics */ + struct cmt_counter *cmt_files_opened; + struct cmt_counter *cmt_files_closed; + struct cmt_counter *cmt_files_rotated; + + /* Hash: hash tables for quick acess to registered files */ + struct flb_hash_table *static_hash; + struct flb_hash_table *event_hash; + + struct flb_config *config; +}; + +struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, + struct flb_config *config); +int flb_tail_config_destroy(struct flb_tail_config *config); + +#endif diff --git a/fluent-bit/plugins/in_tail/tail_db.c b/fluent-bit/plugins/in_tail/tail_db.c new file mode 100644 index 000000000..664963b6d --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_db.c @@ -0,0 +1,277 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_sqldb.h> + +#include "tail_db.h" +#include "tail_sql.h" +#include "tail_file.h" + +struct query_status { + int id; + int rows; + int64_t offset; +}; + +/* Open or create database required by tail plugin */ +struct flb_sqldb *flb_tail_db_open(const char *path, + struct flb_input_instance *in, + struct flb_tail_config *ctx, + struct flb_config *config) +{ + int ret; + char tmp[64]; + struct flb_sqldb *db; + + /* Open/create the database */ + db = flb_sqldb_open(path, in->name, config); + if (!db) { + return NULL; + } + + /* Create table schema if it don't exists */ + ret = flb_sqldb_query(db, SQL_CREATE_FILES, NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db: could not create 'in_tail_files' table"); + flb_sqldb_close(db); + return NULL; + } + + if (ctx->db_sync >= 0) { + snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC, + ctx->db_sync); + ret = flb_sqldb_query(db, tmp, NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db could not set pragma 'sync'"); + flb_sqldb_close(db); + return NULL; + } + } + + if (ctx->db_locking == FLB_TRUE) { + ret = flb_sqldb_query(db, SQL_PRAGMA_LOCKING_MODE, NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db: could not set pragma 'locking_mode'"); + flb_sqldb_close(db); + return NULL; + } + } + + if (ctx->db_journal_mode) { + snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_JOURNAL_MODE, + ctx->db_journal_mode); + ret = flb_sqldb_query(db, tmp, NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db could not set pragma 'journal_mode'"); + flb_sqldb_close(db); + return NULL; + } + } + + return db; +} + +int flb_tail_db_close(struct flb_sqldb *db) +{ + flb_sqldb_close(db); + return 0; +} + +/* + * Check if an file inode exists in the database. Return FLB_TRUE or + * FLB_FALSE + */ +static int db_file_exists(struct flb_tail_file *file, + struct flb_tail_config *ctx, + uint64_t *id, uint64_t *inode, off_t *offset) +{ + int ret; + int exists = FLB_FALSE; + + /* Bind parameters */ + sqlite3_bind_int64(ctx->stmt_get_file, 1, file->inode); + ret = sqlite3_step(ctx->stmt_get_file); + + if (ret == SQLITE_ROW) { + exists = FLB_TRUE; + + /* id: column 0 */ + *id = sqlite3_column_int64(ctx->stmt_get_file, 0); + + /* offset: column 2 */ + *offset = sqlite3_column_int64(ctx->stmt_get_file, 2); + + /* inode: column 3 */ + *inode = sqlite3_column_int64(ctx->stmt_get_file, 3); + } + else if (ret == SQLITE_DONE) { + /* all good */ + } + else { + exists = -1; + } + + sqlite3_clear_bindings(ctx->stmt_get_file); + sqlite3_reset(ctx->stmt_get_file); + + return exists; + +} + +static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ctx) + +{ + int ret; + time_t created; + + /* Register the file */ + created = time(NULL); + + /* Bind parameters */ + sqlite3_bind_text(ctx->stmt_insert_file, 1, file->name, -1, 0); + sqlite3_bind_int64(ctx->stmt_insert_file, 2, file->offset); + sqlite3_bind_int64(ctx->stmt_insert_file, 3, file->inode); + sqlite3_bind_int64(ctx->stmt_insert_file, 4, created); + + /* Run the insert */ + ret = sqlite3_step(ctx->stmt_insert_file); + if (ret != SQLITE_DONE) { + sqlite3_clear_bindings(ctx->stmt_insert_file); + sqlite3_reset(ctx->stmt_insert_file); + flb_plg_error(ctx->ins, "cannot execute insert file %s inode=%lu", + file->name, file->inode); + return -1; + } + + sqlite3_clear_bindings(ctx->stmt_insert_file); + sqlite3_reset(ctx->stmt_insert_file); + + /* Get the database ID for this file */ + return flb_sqldb_last_id(ctx->db); +} + +int flb_tail_db_file_set(struct flb_tail_file *file, + struct flb_tail_config *ctx) +{ + int ret; + uint64_t id = 0; + off_t offset = 0; + uint64_t inode = 0; + + /* Check if the file exists */ + ret = db_file_exists(file, ctx, &id, &inode, &offset); + if (ret == -1) { + flb_plg_error(ctx->ins, "cannot execute query to check inode: %lu", + file->inode); + return -1; + } + + if (ret == FLB_FALSE) { + /* Get the database ID for this file */ + file->db_id = db_file_insert(file, ctx); + } + else { + file->db_id = id; + file->offset = offset; + } + + return 0; +} + +/* Update Offset v2 */ +int flb_tail_db_file_offset(struct flb_tail_file *file, + struct flb_tail_config *ctx) +{ + int ret; + + /* Bind parameters */ + sqlite3_bind_int64(ctx->stmt_offset, 1, file->offset); + sqlite3_bind_int64(ctx->stmt_offset, 2, file->db_id); + + ret = sqlite3_step(ctx->stmt_offset); + + if (ret != SQLITE_DONE) { + sqlite3_clear_bindings(ctx->stmt_offset); + sqlite3_reset(ctx->stmt_offset); + return -1; + } + + /* Verify number of updated rows */ + ret = sqlite3_changes(ctx->db->handler); + if (ret == 0) { + /* + * 'someone' like you 'the reader' or another user has deleted the database + * entry, just restore it. + */ + file->db_id = db_file_insert(file, ctx); + } + + sqlite3_clear_bindings(ctx->stmt_offset); + sqlite3_reset(ctx->stmt_offset); + + return 0; +} + +/* Mark a file as rotated v2 */ +int flb_tail_db_file_rotate(const char *new_name, + struct flb_tail_file *file, + struct flb_tail_config *ctx) +{ + int ret; + + /* Bind parameters */ + sqlite3_bind_text(ctx->stmt_rotate_file, 1, new_name, -1, 0); + sqlite3_bind_int64(ctx->stmt_rotate_file, 2, file->db_id); + + ret = sqlite3_step(ctx->stmt_rotate_file); + + sqlite3_clear_bindings(ctx->stmt_rotate_file); + sqlite3_reset(ctx->stmt_rotate_file); + + if (ret != SQLITE_DONE) { + return -1; + } + + return 0; +} + +/* Delete file entry from the database */ +int flb_tail_db_file_delete(struct flb_tail_file *file, + struct flb_tail_config *ctx) +{ + int ret; + + /* Bind parameters */ + sqlite3_bind_int64(ctx->stmt_delete_file, 1, file->db_id); + ret = sqlite3_step(ctx->stmt_delete_file); + + sqlite3_clear_bindings(ctx->stmt_delete_file); + sqlite3_reset(ctx->stmt_delete_file); + + if (ret != SQLITE_DONE) { + flb_plg_error(ctx->ins, "db: error deleting entry from database: %s", + file->name); + return -1; + } + + flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name); + return 0; +} diff --git a/fluent-bit/plugins/in_tail/tail_db.h b/fluent-bit/plugins/in_tail/tail_db.h new file mode 100644 index 000000000..7b5355d22 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_db.h @@ -0,0 +1,43 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_DB_H +#define FLB_TAIL_DB_H + +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_sqldb.h> + +#include "tail_file.h" + +struct flb_sqldb *flb_tail_db_open(const char *path, + struct flb_input_instance *in, + struct flb_tail_config *ctx, + struct flb_config *config); + +int flb_tail_db_close(struct flb_sqldb *db); +int flb_tail_db_file_set(struct flb_tail_file *file, + struct flb_tail_config *ctx); +int flb_tail_db_file_offset(struct flb_tail_file *file, + struct flb_tail_config *ctx); +int flb_tail_db_file_rotate(const char *new_name, + struct flb_tail_file *file, + struct flb_tail_config *ctx); +int flb_tail_db_file_delete(struct flb_tail_file *file, + struct flb_tail_config *ctx); +#endif diff --git a/fluent-bit/plugins/in_tail/tail_dockermode.c b/fluent-bit/plugins/in_tail/tail_dockermode.c new file mode 100644 index 000000000..a8f20f9cd --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_dockermode.c @@ -0,0 +1,459 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_unescape.h> + +#include "tail_config.h" +#include "tail_dockermode.h" +#include "tail_file_internal.h" + +int flb_tail_dmode_create(struct flb_tail_config *ctx, + struct flb_input_instance *ins, + struct flb_config *config) +{ + const char *tmp; + + if (ctx->multiline == FLB_TRUE) { + flb_plg_error(ctx->ins, "Docker mode cannot be enabled when multiline " + "is enabled"); + return -1; + } + +#ifdef FLB_HAVE_REGEX + /* First line Parser */ + tmp = flb_input_get_property("docker_mode_parser", ins); + if (tmp) { + ctx->docker_mode_parser = flb_parser_get(tmp, config); + if (!ctx->docker_mode_parser) { + flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp); + } + } + else { + ctx->docker_mode_parser = NULL; + } +#endif + + tmp = flb_input_get_property("docker_mode_flush", ins); + if (!tmp) { + ctx->docker_mode_flush = FLB_TAIL_DMODE_FLUSH; + } + else { + ctx->docker_mode_flush = atoi(tmp); + if (ctx->docker_mode_flush <= 0) { + ctx->docker_mode_flush = 1; + } + } + + return 0; +} + +static int modify_json_cond(char *js, size_t js_len, + char **val, size_t *val_len, + char **out, size_t *out_len, + int cond(char*, size_t), + int mod(char*, size_t, char**, size_t*, void*), void *data) +{ + int ret; + struct flb_pack_state state; + jsmntok_t *t; + jsmntok_t *t_val = NULL; + int i; + int i_root = -1; + int i_key = -1; + char *old_val; + size_t old_val_len; + char *new_val = NULL; + size_t new_val_len = 0; + size_t mod_len; + + ret = flb_pack_state_init(&state); + if (ret != 0) { + ret = -1; + goto modify_json_cond_end; + } + + ret = flb_json_tokenise(js, js_len, &state); + if (ret != 0 || state.tokens_count == 0) { + ret = -1; + goto modify_json_cond_end; + } + + for (i = 0; i < state.tokens_count; i++) { + t = &state.tokens[i]; + + if (i_key >= 0) { + if (t->parent == i_key) { + if (t->type == JSMN_STRING) { + t_val = t; + } + break; + } + continue; + } + + if (t->start == 0 && t->parent == -1 && t->type == JSMN_OBJECT) { + i_root = i; + continue; + } + if (i_root == -1) { + continue; + } + + if (t->parent == i_root && t->type == JSMN_STRING && t->end - t->start == 3 && strncmp(js + t->start, "log", 3) == 0) { + i_key = i; + } + } + + if (!t_val) { + ret = -1; + goto modify_json_cond_end; + } + + *out = js; + *out_len = js_len; + + if (val) { + *val = js + t_val->start; + } + if (val_len) { + *val_len = t_val->end - t_val->start; + } + + if (!cond || cond(js + t_val->start, t_val->end - t_val->start)) { + old_val = js + t_val->start; + old_val_len = t_val->end - t_val->start; + ret = mod(old_val, old_val_len, &new_val, &new_val_len, data); + if (ret != 0) { + ret = -1; + goto modify_json_cond_end; + } + + ret = 1; + + if (new_val == old_val) { + goto modify_json_cond_end; + } + + mod_len = js_len + new_val_len - old_val_len; + *out = flb_malloc(mod_len); + if (!*out) { + flb_errno(); + flb_free(new_val); + ret = -1; + goto modify_json_cond_end; + } + *out_len = mod_len; + + memcpy(*out, js, t_val->start); + memcpy(*out + t_val->start, new_val, new_val_len); + memcpy(*out + t_val->start + new_val_len, js + t_val->end, js_len - t_val->end); + + flb_free(new_val); + } + + modify_json_cond_end: + flb_pack_state_reset(&state); + if (ret < 0) { + *out = NULL; + } + return ret; +} + +static int unesc_ends_with_nl(char *str, size_t len) +{ + char* unesc; + int unesc_len; + int nl; + + unesc = flb_malloc(len + 1); + if (!unesc) { + flb_errno(); + return FLB_FALSE; + } + unesc_len = flb_unescape_string(str, len, &unesc); + nl = unesc[unesc_len - 1] == '\n'; + flb_free(unesc); + return nl; +} + +static int prepend_sds_to_str(char *str, size_t len, char **out, size_t *out_len, void *data) +{ + flb_sds_t sds = data; + + if (flb_sds_len(sds) == 0) { + *out = str; + *out_len = len; + return 0; + } + + size_t mod_len = flb_sds_len(sds) + len; + *out = flb_malloc(mod_len); + if (!*out) { + flb_errno(); + return -1; + } + *out_len = mod_len; + + memcpy(*out, sds, flb_sds_len(sds)); + memcpy(*out + flb_sds_len(sds), str, len); + return 0; +} + +static int use_sds(char *str, size_t len, char **out, size_t *out_len, void *data) +{ + flb_sds_t sds = data; + size_t mod_len = flb_sds_len(sds); + *out = flb_malloc(mod_len); + if (!*out) { + flb_errno(); + return -1; + } + *out_len = mod_len; + + memcpy(*out, sds, flb_sds_len(sds)); + return 0; +} + +int flb_tail_dmode_process_content(time_t now, + char* line, size_t line_len, + char **repl_line, size_t *repl_line_len, + struct flb_tail_file *file, + struct flb_tail_config *ctx) +{ + char* val = NULL; + size_t val_len; + int ret; + void *out_buf = NULL; + size_t out_size; + struct flb_time out_time = {0}; + *repl_line = NULL; + *repl_line_len = 0; + flb_sds_t tmp; + flb_sds_t tmp_copy; + +#ifdef FLB_HAVE_REGEX + if (ctx->docker_mode_parser) { + ret = flb_parser_do(ctx->docker_mode_parser, line, line_len, + &out_buf, &out_size, &out_time); + flb_free(out_buf); + + /* + * Set dmode_firstline if the line meets the first-line requirement + */ + if(ret >= 0) { + file->dmode_firstline = true; + } + + /* + * Process if buffer contains full log line + */ + if (flb_sds_len(file->dmode_lastline) > 0 && file->dmode_complete) { + /* + * Buffered log should be flushed out + * as current line meets first-line requirement + */ + if(ret >= 0) { + flb_tail_dmode_flush(file, ctx); + } + + /* + * Flush the buffer if multiline has not been detected yet + */ + if (!file->dmode_firstline) { + flb_tail_dmode_flush(file, ctx); + } + } + } +#endif + + ret = modify_json_cond(line, line_len, + &val, &val_len, + repl_line, repl_line_len, + unesc_ends_with_nl, + prepend_sds_to_str, file->dmode_buf); + if (ret >= 0) { + /* line is a valid json */ + flb_sds_len_set(file->dmode_lastline, 0); + + /* concatenate current log line with buffered one */ + tmp = flb_sds_cat(file->dmode_buf, val, val_len); + if (!tmp) { + flb_errno(); + return -1; + } + file->dmode_buf = tmp; + + tmp_copy = flb_sds_copy(file->dmode_lastline, line, line_len); + if (!tmp_copy) { + flb_errno(); + return -1; + } + + file->dmode_lastline = tmp_copy; + file->dmode_flush_timeout = now + (ctx->docker_mode_flush - 1); + + if (ret == 0) { + /* Line not ended with newline */ + file->dmode_complete = false; + } + else { + /* Line ended with newline */ + file->dmode_complete = true; +#ifdef FLB_HAVE_REGEX + if (!ctx->docker_mode_parser) { + flb_tail_dmode_flush(file, ctx); + } +#else + flb_tail_dmode_flush(file, ctx); +#endif + } + } + return ret; +} + +void flb_tail_dmode_flush(struct flb_tail_file *file, struct flb_tail_config *ctx) +{ + int ret; + char *repl_line = NULL; + size_t repl_line_len = 0; + void *out_buf = NULL; + size_t out_size; + struct flb_time out_time = {0}; + time_t now = time(NULL); + + if (flb_sds_len(file->dmode_lastline) == 0) { + return; + } + + flb_time_zero(&out_time); + + ret = modify_json_cond(file->dmode_lastline, + flb_sds_len(file->dmode_lastline), + NULL, NULL, + &repl_line, &repl_line_len, + NULL, + use_sds, file->dmode_buf); + if (ret < 0) { + return; + } + + flb_sds_len_set(file->dmode_buf, 0); + flb_sds_len_set(file->dmode_lastline, 0); + file->dmode_flush_timeout = 0; + +#ifdef FLB_HAVE_REGEX + if (ctx->parser) { + ret = flb_parser_do(ctx->parser, repl_line, repl_line_len, + &out_buf, &out_size, &out_time); + if (ret >= 0) { + if (flb_time_to_double(&out_time) == 0) { + flb_time_get(&out_time); + } + if (ctx->ignore_older > 0 && (now - ctx->ignore_older) > out_time.tm.tv_sec) { + goto dmode_flush_end; + } + + flb_tail_pack_line_map(&out_time, (char**) &out_buf, &out_size, file, 0); + + goto dmode_flush_end; + } + } +#endif + + flb_tail_file_pack_line(NULL, repl_line, repl_line_len, file, 0); + + dmode_flush_end: + flb_free(repl_line); + flb_free(out_buf); +} + +static void file_pending_flush(struct flb_tail_config *ctx, + struct flb_tail_file *file, time_t now) +{ + if (file->dmode_flush_timeout > now) { + return; + } + + if (flb_sds_len(file->dmode_lastline) == 0) { + return; + } + + flb_tail_dmode_flush(file, ctx); + + if (file->sl_log_event_encoder->output_length > 0) { + flb_input_log_append(ctx->ins, + file->tag_buf, + file->tag_len, + file->sl_log_event_encoder->output_buffer, + file->sl_log_event_encoder->output_length); + + flb_log_event_encoder_reset(file->sl_log_event_encoder); + } +} + +int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx) +{ + time_t expired; + struct mk_list *head; + struct flb_tail_file *file; + + expired = time(NULL) + 3600; + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, expired); + } + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, expired); + } + + return 0; +} + +int flb_tail_dmode_pending_flush(struct flb_input_instance *ins, + struct flb_config *config, void *context) +{ + time_t now; + struct mk_list *head; + struct flb_tail_file *file; + struct flb_tail_config *ctx = context; + + now = time(NULL); + + /* Iterate static event files with pending bytes */ + mk_list_foreach(head, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, now); + } + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, now); + } + + return 0; +} diff --git a/fluent-bit/plugins/in_tail/tail_dockermode.h b/fluent-bit/plugins/in_tail/tail_dockermode.h new file mode 100644 index 000000000..50869ff62 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_dockermode.h @@ -0,0 +1,38 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_DOCKERMODE_H +#define FLB_TAIL_DOCKERMODE_H + +#include "tail_file.h" +#define FLB_TAIL_DMODE_FLUSH 4 + +int flb_tail_dmode_create(struct flb_tail_config *ctx, + struct flb_input_instance *ins, struct flb_config *config); +int flb_tail_dmode_process_content(time_t now, + char* line, size_t line_len, + char **repl_line, size_t *repl_line_len, + struct flb_tail_file *file, + struct flb_tail_config *ctx); +void flb_tail_dmode_flush(struct flb_tail_file *file, struct flb_tail_config *ctx); +int flb_tail_dmode_pending_flush(struct flb_input_instance *ins, + struct flb_config *config, void *context); +int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx); + +#endif diff --git a/fluent-bit/plugins/in_tail/tail_file.c b/fluent-bit/plugins/in_tail/tail_file.c new file mode 100644 index 000000000..2385f0626 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_file.c @@ -0,0 +1,1860 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <time.h> +#ifdef FLB_SYSTEM_FREEBSD +#include <sys/user.h> +#include <libutil.h> +#endif + +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_parser.h> +#ifdef FLB_HAVE_REGEX +#include <fluent-bit/flb_regex.h> +#include <fluent-bit/flb_hash_table.h> +#endif + +#include "tail.h" +#include "tail_file.h" +#include "tail_config.h" +#include "tail_db.h" +#include "tail_signal.h" +#include "tail_dockermode.h" +#include "tail_multiline.h" +#include "tail_scan.h" + +#ifdef FLB_SYSTEM_WINDOWS +#include "win32.h" +#endif + +#include <cfl/cfl.h> + +static inline void consume_bytes(char *buf, int bytes, int length) +{ + memmove(buf, buf + bytes, length - bytes); +} + +static uint64_t stat_get_st_dev(struct stat *st) +{ +#ifdef FLB_SYSTEM_WINDOWS + /* do you want to contribute with a way to extract volume serial number ? */ + return 0; +#else + return st->st_dev; +#endif +} + +static int stat_to_hash_bits(struct flb_tail_config *ctx, struct stat *st, + uint64_t *out_hash) +{ + int len; + uint64_t st_dev; + char tmp[64]; + + st_dev = stat_get_st_dev(st); + + len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 ":%" PRIu64, + st_dev, (uint64_t)st->st_ino); + + *out_hash = cfl_hash_64bits(tmp, len); + return 0; +} + +static int stat_to_hash_key(struct flb_tail_config *ctx, struct stat *st, + flb_sds_t *key) +{ + uint64_t st_dev; + flb_sds_t tmp; + flb_sds_t buf; + + buf = flb_sds_create_size(64); + if (!buf) { + return -1; + } + + st_dev = stat_get_st_dev(st); + tmp = flb_sds_printf(&buf, "%" PRIu64 ":%" PRIu64, + st_dev, (uint64_t)st->st_ino); + if (!tmp) { + flb_sds_destroy(buf); + return -1; + } + + *key = buf; + return 0; +} + +/* Append custom keys and report the number of records processed */ +static int record_append_custom_keys(struct flb_tail_file *file, + char *in_data, size_t in_size, + char **out_data, size_t *out_size) +{ + int i; + int ret; + int records = 0; + msgpack_object k; + msgpack_object v; + struct flb_log_event event; + struct flb_tail_config *ctx; + struct flb_log_event_encoder encoder; + struct flb_log_event_decoder decoder; + + ctx = (struct flb_tail_config *) file->config; + + ret = flb_log_event_decoder_init(&decoder, in_data, in_size); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_decoder_destroy(&decoder); + + return -2; + } + + while (flb_log_event_decoder_next(&decoder, &event) == + FLB_EVENT_DECODER_SUCCESS) { + + ret = flb_log_event_encoder_begin_record(&encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp(&encoder, &event.timestamp); + } + + /* append previous map keys */ + for (i = 0; i < event.body->via.map.size; i++) { + k = event.body->via.map.ptr[i].key; + v = event.body->via.map.ptr[i].val; + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_msgpack_object( + &encoder, + &k); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_msgpack_object( + &encoder, + &v); + } + } + + /* path_key */ + if (ctx->path_key != NULL) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_cstring( + &encoder, + file->config->path_key); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_cstring( + &encoder, + file->orig_name); + } + } + + /* offset_key */ + if (ctx->offset_key != NULL) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_cstring( + &encoder, + file->config->offset_key); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_uint64( + &encoder, + file->offset + + file->last_processed_bytes); + } + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(&encoder); + } + else { + flb_plg_error(file->config->ins, "error packing event : %d", ret); + + flb_log_event_encoder_rollback_record(&encoder); + } + + /* counter */ + records++; + } + + *out_data = encoder.output_buffer; + *out_size = encoder.output_length; + + /* This function transfers ownership of the internal memory allocated by + * sbuffer using msgpack_sbuffer_release which means the caller is + * responsible for releasing the memory. + */ + flb_log_event_encoder_claim_internal_buffer_ownership(&encoder); + + flb_log_event_decoder_destroy(&decoder); + flb_log_event_encoder_destroy(&encoder); + + return records; +} + +static int flb_tail_repack_map(struct flb_log_event_encoder *encoder, + char *data, + size_t data_size) +{ + msgpack_unpacked source_map; + size_t offset; + int result; + size_t index; + msgpack_object value; + msgpack_object key; + + result = FLB_EVENT_ENCODER_SUCCESS; + + if (data_size > 0) { + msgpack_unpacked_init(&source_map); + + offset = 0; + result = msgpack_unpack_next(&source_map, + data, + data_size, + &offset); + + if (result == MSGPACK_UNPACK_SUCCESS) { + result = FLB_EVENT_ENCODER_SUCCESS; + } + else { + result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; + } + + for (index = 0; + index < source_map.data.via.map.size && + result == FLB_EVENT_ENCODER_SUCCESS; + index++) { + key = source_map.data.via.map.ptr[index].key; + value = source_map.data.via.map.ptr[index].val; + + result = flb_log_event_encoder_append_body_msgpack_object( + encoder, + &key); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_msgpack_object( + encoder, + &value); + } + } + + msgpack_unpacked_destroy(&source_map); + } + + return result; +} + +int flb_tail_pack_line_map(struct flb_time *time, char **data, + size_t *data_size, struct flb_tail_file *file, + size_t processed_bytes) +{ + int result; + + result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_set_timestamp( + file->sl_log_event_encoder, time); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_tail_repack_map(file->sl_log_event_encoder, + *data, + *data_size); + } + + /* path_key */ + if (file->config->path_key != NULL) { + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + file->sl_log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key), + FLB_LOG_EVENT_STRING_VALUE(file->orig_name, + file->orig_name_len)); + } + } + + /* offset_key */ + if (file->config->offset_key != NULL) { + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + file->sl_log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key), + FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes)); + } + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder); + } + else { + flb_log_event_encoder_rollback_record(file->sl_log_event_encoder); + } + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(file->config->ins, "error packing event"); + + return -1; + } + + return 0; +} + +int flb_tail_file_pack_line(struct flb_time *time, char *data, size_t data_size, + struct flb_tail_file *file, size_t processed_bytes) +{ + int result; + + result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_set_timestamp( + file->sl_log_event_encoder, time); + } + + /* path_key */ + if (file->config->path_key != NULL) { + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + file->sl_log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key), + FLB_LOG_EVENT_STRING_VALUE(file->orig_name, + file->orig_name_len)); + } + } + + /* offset_key */ + if (file->config->offset_key != NULL) { + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + file->sl_log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key), + FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes)); + } + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + file->sl_log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(file->config->key), + FLB_LOG_EVENT_STRING_VALUE(data, + data_size)); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder); + } + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(file->config->ins, "error packing event : %d", result); + + return -1; + } + + return 0; +} + +static int ml_stream_buffer_append(struct flb_tail_file *file, char *buf_data, size_t buf_size) +{ + int result; + + result = flb_log_event_encoder_emit_raw_record( + file->ml_log_event_encoder, + buf_data, buf_size); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(file->config->ins, + "log event raw append error : %d", + result); + + return -1; + } + + return 0; +} + +static int ml_stream_buffer_flush(struct flb_tail_config *ctx, struct flb_tail_file *file) +{ + if (file->ml_log_event_encoder->output_length > 0) { + flb_input_log_append(ctx->ins, + file->tag_buf, + file->tag_len, + file->ml_log_event_encoder->output_buffer, + file->ml_log_event_encoder->output_length); + + flb_log_event_encoder_reset(file->ml_log_event_encoder); + } + + return 0; +} + +static int process_content(struct flb_tail_file *file, size_t *bytes) +{ + size_t len; + int lines = 0; + int ret; + size_t processed_bytes = 0; + char *data; + char *end; + char *p; + void *out_buf; + size_t out_size; + int crlf; + char *line; + size_t line_len; + char *repl_line; + size_t repl_line_len; + time_t now = time(NULL); + struct flb_time out_time = {0}; + struct flb_tail_config *ctx; + + ctx = (struct flb_tail_config *) file->config; + + /* Parse the data content */ + data = file->buf_data; + end = data + file->buf_len; + + /* reset last processed bytes */ + file->last_processed_bytes = 0; + + /* Skip null characters from the head (sometimes introduced by copy-truncate log rotation) */ + while (data < end && *data == '\0') { + data++; + processed_bytes++; + } + + while (data < end && (p = memchr(data, '\n', end - data))) { + len = (p - data); + crlf = 0; + if (file->skip_next == FLB_TRUE) { + data += len + 1; + processed_bytes += len + 1; + file->skip_next = FLB_FALSE; + continue; + } + + /* + * Empty line (just breakline) + * --------------------------- + * [NOTE] with the new Multiline core feature and Multiline Filter on + * Fluent Bit v1.8.2, there are a couple of cases where stack traces + * or multi line patterns expects an empty line (meaning only the + * breakline), skipping empty lines on this plugin will break that + * functionality. + * + * We are introducing 'skip_empty_lines=off' configuration + * property to revert this behavior if some user is affected by + * this change. + */ + + if (len == 0 && ctx->skip_empty_lines) { + data++; + processed_bytes++; + continue; + } + + /* Process '\r\n' */ + if (len >= 2) { + crlf = (data[len-1] == '\r'); + if (len == 1 && crlf) { + data += 2; + processed_bytes += 2; + continue; + } + } + + /* Reset time for each line */ + flb_time_zero(&out_time); + + line = data; + line_len = len - crlf; + repl_line = NULL; + + if (ctx->ml_ctx) { + ret = flb_ml_append_text(ctx->ml_ctx, + file->ml_stream_id, + &out_time, + line, + line_len); + goto go_next; + } + else if (ctx->docker_mode) { + ret = flb_tail_dmode_process_content(now, line, line_len, + &repl_line, &repl_line_len, + file, ctx); + if (ret >= 0) { + if (repl_line == line) { + repl_line = NULL; + } + else { + line = repl_line; + line_len = repl_line_len; + } + /* Skip normal parsers flow */ + goto go_next; + } + else { + flb_tail_dmode_flush(file, ctx); + } + } + +#ifdef FLB_HAVE_PARSER + if (ctx->parser) { + /* Common parser (non-multiline) */ + ret = flb_parser_do(ctx->parser, line, line_len, + &out_buf, &out_size, &out_time); + if (ret >= 0) { + if (flb_time_to_nanosec(&out_time) == 0L) { + flb_time_get(&out_time); + } + + /* If multiline is enabled, flush any buffered data */ + if (ctx->multiline == FLB_TRUE) { + flb_tail_mult_flush(file, ctx); + } + + flb_tail_pack_line_map(&out_time, + (char**) &out_buf, &out_size, file, + processed_bytes); + + flb_free(out_buf); + } + else { + /* Parser failed, pack raw text */ + flb_tail_file_pack_line(NULL, data, len, file, processed_bytes); + } + } + else if (ctx->multiline == FLB_TRUE) { + ret = flb_tail_mult_process_content(now, + line, line_len, + file, ctx, processed_bytes); + + /* No multiline */ + if (ret == FLB_TAIL_MULT_NA) { + flb_tail_mult_flush(file, ctx); + + flb_tail_file_pack_line(NULL, + line, line_len, file, processed_bytes); + } + else if (ret == FLB_TAIL_MULT_MORE) { + /* we need more data, do nothing */ + goto go_next; + } + else if (ret == FLB_TAIL_MULT_DONE) { + /* Finalized */ + } + } + else { + flb_tail_file_pack_line(NULL, + line, line_len, file, processed_bytes); + } +#else + flb_tail_file_pack_line(NULL, + line, line_len, file, processed_bytes); +#endif + + go_next: + flb_free(repl_line); + repl_line = NULL; + /* Adjust counters */ + data += len + 1; + processed_bytes += len + 1; + lines++; + file->parsed = 0; + file->last_processed_bytes += processed_bytes; + } + file->parsed = file->buf_len; + + if (lines > 0) { + /* Append buffer content to a chunk */ + *bytes = processed_bytes; + + if (file->sl_log_event_encoder->output_length > 0) { + flb_input_log_append_records(ctx->ins, + lines, + file->tag_buf, + file->tag_len, + file->sl_log_event_encoder->output_buffer, + file->sl_log_event_encoder->output_length); + + flb_log_event_encoder_reset(file->sl_log_event_encoder); + } + } + else if (file->skip_next) { + *bytes = file->buf_len; + } + else { + *bytes = processed_bytes; + } + + if (ctx->ml_ctx) { + ml_stream_buffer_flush(ctx, file); + } + + return lines; +} + +static inline void drop_bytes(char *buf, size_t len, int pos, int bytes) +{ + memmove(buf + pos, + buf + pos + bytes, + len - pos - bytes); +} + +#ifdef FLB_HAVE_REGEX +static void cb_results(const char *name, const char *value, + size_t vlen, void *data) +{ + struct flb_hash_table *ht = data; + + if (vlen == 0) { + return; + } + + flb_hash_table_add(ht, name, strlen(name), (void *) value, vlen); +} +#endif + +#ifdef FLB_HAVE_REGEX +static int tag_compose(char *tag, struct flb_regex *tag_regex, char *fname, + char *out_buf, size_t *out_size, + struct flb_tail_config *ctx) +#else +static int tag_compose(char *tag, char *fname, char *out_buf, size_t *out_size, + struct flb_tail_config *ctx) +#endif +{ + int i; + size_t len; + char *p; + size_t buf_s = 0; +#ifdef FLB_HAVE_REGEX + ssize_t n; + struct flb_regex_search result; + struct flb_hash_table *ht; + char *beg; + char *end; + int ret; + const char *tmp; + size_t tmp_s; +#endif + +#ifdef FLB_HAVE_REGEX + if (tag_regex) { + n = flb_regex_do(tag_regex, fname, strlen(fname), &result); + if (n <= 0) { + flb_plg_error(ctx->ins, "invalid tag_regex pattern for file %s", + fname); + return -1; + } + else { + ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, + FLB_HASH_TABLE_SIZE, FLB_HASH_TABLE_SIZE); + flb_regex_parse(tag_regex, &result, cb_results, ht); + + for (p = tag, beg = p; (beg = strchr(p, '<')); p = end + 2) { + if (beg != p) { + len = (beg - p); + memcpy(out_buf + buf_s, p, len); + buf_s += len; + } + + beg++; + + end = strchr(beg, '>'); + if (end && !memchr(beg, '<', end - beg)) { + end--; + + len = end - beg + 1; + ret = flb_hash_table_get(ht, beg, len, (void *) &tmp, &tmp_s); + if (ret != -1) { + memcpy(out_buf + buf_s, tmp, tmp_s); + buf_s += tmp_s; + } + else { + memcpy(out_buf + buf_s, "_", 1); + buf_s++; + } + } + else { + flb_plg_error(ctx->ins, + "missing closing angle bracket in tag %s " + "at position %lu", tag, beg - tag); + flb_hash_table_destroy(ht); + return -1; + } + } + + flb_hash_table_destroy(ht); + if (*p) { + len = strlen(p); + memcpy(out_buf + buf_s, p, len); + buf_s += len; + } + } + } + else { +#endif + p = strchr(tag, '*'); + if (!p) { + return -1; + } + + /* Copy tag prefix if any */ + len = (p - tag); + if (len > 0) { + memcpy(out_buf, tag, len); + buf_s += len; + } + + /* Append file name */ + len = strlen(fname); + memcpy(out_buf + buf_s, fname, len); + buf_s += len; + + /* Tag suffix (if any) */ + p++; + if (*p) { + len = strlen(tag); + memcpy(out_buf + buf_s, p, (len - (p - tag))); + buf_s += (len - (p - tag)); + } + + /* Sanitize buffer */ + for (i = 0; i < buf_s; i++) { + if (out_buf[i] == '/' || out_buf[i] == '\\' || out_buf[i] == ':') { + if (i > 0) { + out_buf[i] = '.'; + } + else { + drop_bytes(out_buf, buf_s, i, 1); + buf_s--; + i--; + } + } + + if (i > 0 && out_buf[i] == '.') { + if (out_buf[i - 1] == '.') { + drop_bytes(out_buf, buf_s, i, 1); + buf_s--; + i--; + } + } + else if (out_buf[i] == '*') { + drop_bytes(out_buf, buf_s, i, 1); + buf_s--; + i--; + } + } + + /* Check for an ending '.' */ + if (out_buf[buf_s - 1] == '.') { + drop_bytes(out_buf, buf_s, buf_s - 1, 1); + buf_s--; + } +#ifdef FLB_HAVE_REGEX + } +#endif + + out_buf[buf_s] = '\0'; + *out_size = buf_s; + + return 0; +} + +static inline int flb_tail_file_exists(struct stat *st, + struct flb_tail_config *ctx) +{ + int ret; + uint64_t hash; + + ret = stat_to_hash_bits(ctx, st, &hash); + if (ret != 0) { + return -1; + } + + /* static hash */ + if (flb_hash_table_exists(ctx->static_hash, hash)) { + return FLB_TRUE; + } + + /* event hash */ + if (flb_hash_table_exists(ctx->event_hash, hash)) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +/* + * Based in the configuration or database offset, set the proper 'offset' for the + * file in question. + */ +static int set_file_position(struct flb_tail_config *ctx, + struct flb_tail_file *file) +{ + int64_t ret; + +#ifdef FLB_HAVE_SQLDB + /* + * If the database option is enabled, try to gather the file position. The + * database function updates the file->offset entry. + */ + if (ctx->db) { + ret = flb_tail_db_file_set(file, ctx); + if (ret == 0) { + if (file->offset > 0) { + ret = lseek(file->fd, file->offset, SEEK_SET); + if (ret == -1) { + flb_errno(); + return -1; + } + } + else if (ctx->read_from_head == FLB_FALSE) { + ret = lseek(file->fd, 0, SEEK_END); + if (ret == -1) { + flb_errno(); + return -1; + } + file->offset = ret; + flb_tail_db_file_offset(file, ctx); + } + return 0; + } + } +#endif + + if (ctx->read_from_head == FLB_TRUE) { + /* no need to seek, offset position is already zero */ + return 0; + } + + /* tail... */ + ret = lseek(file->fd, 0, SEEK_END); + if (ret == -1) { + flb_errno(); + return -1; + } + file->offset = ret; + + return 0; +} + +/* Multiline flush callback: invoked every time some content is complete */ +static int ml_flush_callback(struct flb_ml_parser *parser, + struct flb_ml_stream *mst, + void *data, char *buf_data, size_t buf_size) +{ + int result; + size_t mult_size = 0; + char *mult_buf = NULL; + struct flb_tail_file *file = data; + struct flb_tail_config *ctx = file->config; + + if (ctx->path_key == NULL && ctx->offset_key == NULL) { + ml_stream_buffer_append(file, buf_data, buf_size); + } + else { + /* adjust the records in a new buffer */ + result = record_append_custom_keys(file, + buf_data, + buf_size, + &mult_buf, + &mult_size); + + if (result < 0) { + ml_stream_buffer_append(file, buf_data, buf_size); + } + else { + ml_stream_buffer_append(file, mult_buf, mult_size); + + flb_free(mult_buf); + } + } + + if (mst->forced_flush) { + ml_stream_buffer_flush(ctx, file); + } + + return 0; +} + +int flb_tail_file_append(char *path, struct stat *st, int mode, + struct flb_tail_config *ctx) +{ + int fd; + int ret; + uint64_t stream_id; + uint64_t ts; + uint64_t hash_bits; + flb_sds_t hash_key; + size_t len; + char *tag; + char *name; + size_t tag_len; + struct flb_tail_file *file; + struct stat lst; + flb_sds_t inode_str; + + if (!S_ISREG(st->st_mode)) { + return -1; + } + + if (flb_tail_file_exists(st, ctx) == FLB_TRUE) { + return -1; + } + + fd = open(path, O_RDONLY); + if (fd == -1) { + flb_errno(); + flb_plg_error(ctx->ins, "cannot open %s", path); + return -1; + } + + file = flb_calloc(1, sizeof(struct flb_tail_file)); + if (!file) { + flb_errno(); + goto error; + } + + /* Initialize */ + file->watch_fd = -1; + file->fd = fd; + + /* On non-windows environments check if the original path is a link */ + ret = lstat(path, &lst); + if (ret == 0) { + if (S_ISLNK(lst.st_mode)) { + file->is_link = FLB_TRUE; + file->link_inode = lst.st_ino; + } + } + + /* get unique hash for this file */ + ret = stat_to_hash_bits(ctx, st, &hash_bits); + if (ret != 0) { + flb_plg_error(ctx->ins, "error procesisng hash bits for file %s", path); + goto error; + } + file->hash_bits = hash_bits; + + /* store the hash key used for hash_bits */ + ret = stat_to_hash_key(ctx, st, &hash_key); + if (ret != 0) { + flb_plg_error(ctx->ins, "error procesisng hash key for file %s", path); + goto error; + } + file->hash_key = hash_key; + + file->inode = st->st_ino; + file->offset = 0; + file->size = st->st_size; + file->buf_len = 0; + file->parsed = 0; + file->config = ctx; + file->tail_mode = mode; + file->tag_len = 0; + file->tag_buf = NULL; + file->rotated = 0; + file->pending_bytes = 0; + file->mult_firstline = FLB_FALSE; + file->mult_keys = 0; + file->mult_flush_timeout = 0; + file->mult_skipping = FLB_FALSE; + + /* + * Duplicate string into 'file' structure, the called function + * take cares to resolve real-name of the file in case we are + * running in a non-Linux system. + * + * Depending of the operating system, the way to obtain the file + * name associated to it file descriptor can have different behaviors + * specifically if it root path it's under a symbolic link. On Linux + * we can trust the file name but in others it's better to solve it + * with some extra calls. + */ + ret = flb_tail_file_name_dup(path, file); + if (!file->name) { + flb_errno(); + goto error; + } + + /* We keep a copy of the initial filename in orig_name. This is required + * for path_key to continue working after rotation. */ + file->orig_name = flb_strdup(file->name); + if (!file->orig_name) { + flb_free(file->name); + flb_errno(); + goto error; + } + file->orig_name_len = file->name_len; + + /* multiline msgpack buffers */ + file->mult_records = 0; + msgpack_sbuffer_init(&file->mult_sbuf); + msgpack_packer_init(&file->mult_pck, &file->mult_sbuf, + msgpack_sbuffer_write); + + /* docker mode */ + file->dmode_flush_timeout = 0; + file->dmode_complete = true; + file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0); + file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0); + file->dmode_firstline = false; +#ifdef FLB_HAVE_SQLDB + file->db_id = 0; +#endif + file->skip_next = FLB_FALSE; + file->skip_warn = FLB_FALSE; + + /* Multiline core mode */ + if (ctx->ml_ctx) { + /* + * Create inode str to get stream_id. + * + * If stream_id is created by filename, + * it will be same after file rotation and it causes invalid destruction: + * + * - https://github.com/fluent/fluent-bit/issues/4190 + */ + inode_str = flb_sds_create_size(64); + flb_sds_printf(&inode_str, "%"PRIu64, file->inode); + + /* Create a stream for this file */ + ret = flb_ml_stream_create(ctx->ml_ctx, + inode_str, flb_sds_len(inode_str), + ml_flush_callback, file, + &stream_id); + if (ret != 0) { + flb_plg_error(ctx->ins, + "could not create multiline stream for file: %s", + inode_str); + flb_sds_destroy(inode_str); + goto error; + } + file->ml_stream_id = stream_id; + flb_sds_destroy(inode_str); + + /* + * Multiline core file buffer: the multiline core functionality invokes a callback everytime a message is ready + * to be processed by the caller, this can be a multiline message or a message that is considered 'complete'. In + * the previous version of Tail, when it received a message this message was automatically ingested into the pipeline + * without any previous buffering which leads to performance degradation. + * + * The msgpack buffer 'ml_sbuf' keeps all ML provided records and it's flushed just when the file processor finish + * processing the "read() bytes". + */ + } + + /* Local buffer */ + file->buf_size = ctx->buf_chunk_size; + file->buf_data = flb_malloc(file->buf_size); + if (!file->buf_data) { + flb_errno(); + goto error; + } + + /* Initialize (optional) dynamic tag */ + if (ctx->dynamic_tag == FLB_TRUE) { + len = ctx->ins->tag_len + strlen(path) + 1; + tag = flb_malloc(len); + if (!tag) { + flb_errno(); + flb_plg_error(ctx->ins, "failed to allocate tag buffer"); + goto error; + } +#ifdef FLB_HAVE_REGEX + ret = tag_compose(ctx->ins->tag, ctx->tag_regex, path, tag, &tag_len, ctx); +#else + ret = tag_compose(ctx->ins->tag, path, tag, &tag_len, ctx); +#endif + if (ret == 0) { + file->tag_len = tag_len; + file->tag_buf = flb_strdup(tag); + } + flb_free(tag); + if (ret != 0) { + flb_plg_error(ctx->ins, "failed to compose tag for file: %s", path); + goto error; + } + } + else { + file->tag_len = strlen(ctx->ins->tag); + file->tag_buf = flb_strdup(ctx->ins->tag); + } + if (!file->tag_buf) { + flb_plg_error(ctx->ins, "failed to set tag for file: %s", path); + flb_errno(); + goto error; + } + + if (mode == FLB_TAIL_STATIC) { + mk_list_add(&file->_head, &ctx->files_static); + ctx->files_static_count++; + flb_hash_table_add(ctx->static_hash, file->hash_key, flb_sds_len(file->hash_key), + file, sizeof(file)); + tail_signal_manager(file->config); + } + else if (mode == FLB_TAIL_EVENT) { + mk_list_add(&file->_head, &ctx->files_event); + flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key), + file, sizeof(file)); + + /* Register this file into the fs_event monitoring */ + ret = flb_tail_fs_add(ctx, file); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not register file into fs_events"); + goto error; + } + } + + /* Set the file position (database offset, head or tail) */ + ret = set_file_position(ctx, file); + if (ret == -1) { + flb_tail_file_remove(file); + goto error; + } + + /* Remaining bytes to read */ + file->pending_bytes = file->size - file->offset; + +#ifdef FLB_HAVE_METRICS + name = (char *) flb_input_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_files_opened, ts, 1, (char *[]) {name}); + + /* Old api */ + flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics); +#endif + + file->sl_log_event_encoder = flb_log_event_encoder_create( + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (file->sl_log_event_encoder == NULL) { + flb_tail_file_remove(file); + + goto error; + } + + file->ml_log_event_encoder = flb_log_event_encoder_create( + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (file->ml_log_event_encoder == NULL) { + flb_tail_file_remove(file); + + goto error; + } + + flb_plg_debug(ctx->ins, + "inode=%"PRIu64" with offset=%"PRId64" appended as %s", + file->inode, file->offset, path); + return 0; + +error: + if (file) { + if (file->buf_data) { + flb_free(file->buf_data); + } + if (file->name) { + flb_free(file->name); + } + flb_free(file); + } + close(fd); + + return -1; +} + +void flb_tail_file_remove(struct flb_tail_file *file) +{ + uint64_t ts; + char *name; + struct flb_tail_config *ctx; + + ctx = file->config; + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" removing file name %s", + file->inode, file->name); + + /* remove the multiline.core stream */ + if (ctx->ml_ctx && file->ml_stream_id > 0) { + /* destroy ml stream */ + flb_ml_stream_id_destroy_all(ctx->ml_ctx, file->ml_stream_id); + } + + if (file->rotated > 0) { +#ifdef FLB_HAVE_SQLDB + /* + * Make sure to remove a the file entry from the database if the file + * was rotated and it's not longer being monitored. + */ + if (ctx->db) { + flb_tail_db_file_delete(file, file->config); + } +#endif + mk_list_del(&file->_rotate_head); + } + + msgpack_sbuffer_destroy(&file->mult_sbuf); + + if (file->sl_log_event_encoder != NULL) { + flb_log_event_encoder_destroy(file->sl_log_event_encoder); + } + + if (file->ml_log_event_encoder != NULL) { + flb_log_event_encoder_destroy(file->ml_log_event_encoder); + } + + flb_sds_destroy(file->dmode_buf); + flb_sds_destroy(file->dmode_lastline); + mk_list_del(&file->_head); + flb_tail_fs_remove(ctx, file); + + /* avoid deleting file with -1 fd */ + if (file->fd != -1) { + close(file->fd); + } + if (file->tag_buf) { + flb_free(file->tag_buf); + } + + /* remove any potential entry from the hash tables */ + flb_hash_table_del(ctx->static_hash, file->hash_key); + flb_hash_table_del(ctx->event_hash, file->hash_key); + + flb_free(file->buf_data); + flb_free(file->name); + flb_free(file->orig_name); + flb_free(file->real_name); + flb_sds_destroy(file->hash_key); + +#ifdef FLB_HAVE_METRICS + name = (char *) flb_input_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_files_closed, ts, 1, (char *[]) {name}); + + /* old api */ + flb_metrics_sum(FLB_TAIL_METRIC_F_CLOSED, 1, ctx->ins->metrics); +#endif + + flb_free(file); +} + +int flb_tail_file_remove_all(struct flb_tail_config *ctx) +{ + int count = 0; + struct mk_list *head; + struct mk_list *tmp; + struct flb_tail_file *file; + + mk_list_foreach_safe(head, tmp, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + flb_tail_file_remove(file); + count++; + } + + mk_list_foreach_safe(head, tmp, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + flb_tail_file_remove(file); + count++; + } + + return count; +} + +static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *file) +{ + int ret; + int64_t offset; + struct stat st; + + ret = fstat(file->fd, &st); + if (ret == -1) { + flb_errno(); + return FLB_TAIL_ERROR; + } + + /* Check if the file was truncated */ + if (file->offset > st.st_size) { + offset = lseek(file->fd, 0, SEEK_SET); + if (offset == -1) { + flb_errno(); + return FLB_TAIL_ERROR; + } + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s", + file->inode, file->name); + file->offset = offset; + file->buf_len = 0; + + /* Update offset in the database file */ +#ifdef FLB_HAVE_SQLDB + if (ctx->db) { + flb_tail_db_file_offset(file, ctx); + } +#endif + } + else { + file->size = st.st_size; + file->pending_bytes = (st.st_size - file->offset); + } + + return FLB_TAIL_OK; +} + +int flb_tail_file_chunk(struct flb_tail_file *file) +{ + int ret; + char *tmp; + size_t size; + size_t capacity; + size_t processed_bytes; + ssize_t bytes; + struct flb_tail_config *ctx; + + /* Check if we the engine issued a pause */ + ctx = file->config; + if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { + return FLB_TAIL_BUSY; + } + + capacity = (file->buf_size - file->buf_len) - 1; + if (capacity < 1) { + /* + * If there is no more room for more data, try to increase the + * buffer under the limit of buffer_max_size. + */ + if (file->buf_size >= ctx->buf_max_size) { + if (ctx->skip_long_lines == FLB_FALSE) { + flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, " + "lines are too long. Skipping file.", file->name); + return FLB_TAIL_ERROR; + } + + /* Warn the user */ + if (file->skip_warn == FLB_FALSE) { + flb_plg_warn(ctx->ins, "file=%s have long lines. " + "Skipping long lines.", file->name); + file->skip_warn = FLB_TRUE; + } + + /* Do buffer adjustments */ + file->offset += file->buf_len; + file->buf_len = 0; + file->skip_next = FLB_TRUE; + } + else { + size = file->buf_size + ctx->buf_chunk_size; + if (size > ctx->buf_max_size) { + size = ctx->buf_max_size; + } + + /* Increase the buffer size */ + tmp = flb_realloc(file->buf_data, size); + if (tmp) { + flb_plg_trace(ctx->ins, "file=%s increase buffer size " + "%lu => %lu bytes", + file->name, file->buf_size, size); + file->buf_data = tmp; + file->buf_size = size; + } + else { + flb_errno(); + flb_plg_error(ctx->ins, "cannot increase buffer size for %s, " + "skipping file.", file->name); + return FLB_TAIL_ERROR; + } + } + capacity = (file->buf_size - file->buf_len) - 1; + } + + bytes = read(file->fd, file->buf_data + file->buf_len, capacity); + if (bytes > 0) { + /* we read some data, let the content processor take care of it */ + file->buf_len += bytes; + file->buf_data[file->buf_len] = '\0'; + + /* Now that we have some data in the buffer, call the data processor + * which aims to cut lines and register the entries into the engine. + * + * The returned value is the absolute offset the file must be seek + * now. It may need to get back a few bytes at the beginning of a new + * line. + */ + ret = process_content(file, &processed_bytes); + if (ret < 0) { + flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR", + file->inode, file->name); + return FLB_TAIL_ERROR; + } + + /* Adjust the file offset and buffer */ + file->offset += processed_bytes; + consume_bytes(file->buf_data, processed_bytes, file->buf_len); + file->buf_len -= processed_bytes; + file->buf_data[file->buf_len] = '\0'; + +#ifdef FLB_HAVE_SQLDB + if (file->config->db) { + flb_tail_db_file_offset(file, file->config); + } +#endif + + /* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */ + ret = adjust_counters(ctx, file); + + /* Data was consumed but likely some bytes still remain */ + return ret; + } + else if (bytes == 0) { + /* We reached the end of file, let's wait for some incoming data */ + ret = adjust_counters(ctx, file); + if (ret == FLB_TAIL_OK) { + return FLB_TAIL_WAIT; + } + else { + return FLB_TAIL_ERROR; + } + } + else { + /* error */ + flb_errno(); + flb_plg_error(ctx->ins, "error reading %s", file->name); + return FLB_TAIL_ERROR; + } + + return FLB_TAIL_ERROR; +} + +/* Returns FLB_TRUE if a file has been rotated, otherwise FLB_FALSE */ +int flb_tail_file_is_rotated(struct flb_tail_config *ctx, + struct flb_tail_file *file) +{ + int ret; + char *name; + struct stat st; + + /* + * Do not double-check already rotated files since the caller of this + * function will trigger a rotation. + */ + if (file->rotated != 0) { + return FLB_FALSE; + } + + /* Check if the 'original monitored file' is a link and rotated */ + if (file->is_link == FLB_TRUE) { + ret = lstat(file->name, &st); + if (ret == -1) { + /* Broken link or missing file */ + if (errno == ENOENT) { + flb_plg_info(ctx->ins, "inode=%"PRIu64" link_rotated: %s", + file->link_inode, file->name); + return FLB_TRUE; + } + else { + flb_errno(); + flb_plg_error(ctx->ins, + "link_inode=%"PRIu64" cannot detect if file: %s", + file->link_inode, file->name); + return -1; + } + } + else { + /* The file name is there, check if the same that we have */ + if (st.st_ino != file->link_inode) { + return FLB_TRUE; + } + } + } + + /* Retrieve the real file name, operating system lookup */ + name = flb_tail_file_name(file); + if (!name) { + flb_plg_error(ctx->ins, + "inode=%"PRIu64" cannot detect if file was rotated: %s", + file->inode, file->name); + return -1; + } + + + /* Get stats from the file name */ + ret = stat(name, &st); + if (ret == -1) { + flb_errno(); + flb_free(name); + return -1; + } + + /* Compare inodes and names */ + if (file->inode == st.st_ino && + flb_tail_target_file_name_cmp(name, file) == 0) { + flb_free(name); + return FLB_FALSE; + } + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated: %s => %s", + file->inode, file->name, name); + + flb_free(name); + return FLB_TRUE; +} + +/* Promote a event in the static list to the dynamic 'events' interface */ +int flb_tail_file_to_event(struct flb_tail_file *file) +{ + int ret; + struct stat st; + struct flb_tail_config *ctx = file->config; + + /* Check if the file promoted have pending bytes */ + ret = fstat(file->fd, &st); + if (ret != 0) { + flb_errno(); + return -1; + } + + if (file->offset < st.st_size) { + file->pending_bytes = (st.st_size - file->offset); + tail_signal_pending(file->config); + } + else { + file->pending_bytes = 0; + } + + /* Check if the file has been rotated */ + ret = flb_tail_file_is_rotated(ctx, file); + if (ret == FLB_TRUE) { + flb_tail_file_rotated(file); + } + + /* Notify the fs-event handler that we will start monitoring this 'file' */ + ret = flb_tail_fs_add(ctx, file); + if (ret == -1) { + return -1; + } + + /* List swap: change from 'static' to 'event' list */ + mk_list_del(&file->_head); + ctx->files_static_count--; + flb_hash_table_del(ctx->static_hash, file->hash_key); + + mk_list_add(&file->_head, &file->config->files_event); + flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key), + file, sizeof(file)); + + file->tail_mode = FLB_TAIL_EVENT; + + return 0; +} + +/* + * Given an open file descriptor, return the filename. This function is a + * bit slow and it aims to be used only when a file is rotated. + */ +char *flb_tail_file_name(struct flb_tail_file *file) +{ + int ret; + char *buf; +#ifdef __linux__ + ssize_t s; + char tmp[128]; +#elif defined(__APPLE__) + char path[PATH_MAX]; +#elif defined(FLB_SYSTEM_WINDOWS) + HANDLE h; +#elif defined(FLB_SYSTEM_FREEBSD) + struct kinfo_file *file_entries; + int file_count; + int file_index; +#endif + + buf = flb_malloc(PATH_MAX); + if (!buf) { + flb_errno(); + return NULL; + } + +#ifdef __linux__ + ret = snprintf(tmp, sizeof(tmp) - 1, "/proc/%i/fd/%i", getpid(), file->fd); + if (ret == -1) { + flb_errno(); + flb_free(buf); + return NULL; + } + + s = readlink(tmp, buf, PATH_MAX); + if (s == -1) { + flb_free(buf); + flb_errno(); + return NULL; + } + buf[s] = '\0'; + +#elif __APPLE__ + int len; + + ret = fcntl(file->fd, F_GETPATH, path); + if (ret == -1) { + flb_errno(); + flb_free(buf); + return NULL; + } + + len = strlen(path); + memcpy(buf, path, len); + buf[len] = '\0'; + +#elif defined(FLB_SYSTEM_WINDOWS) + int len; + + h = (HANDLE) _get_osfhandle(file->fd); + if (h == INVALID_HANDLE_VALUE) { + flb_errno(); + flb_free(buf); + return NULL; + } + + /* This function returns the length of the string excluding "\0" + * and the resulting path has a "\\?\" prefix. + */ + len = GetFinalPathNameByHandleA(h, buf, PATH_MAX, FILE_NAME_NORMALIZED); + if (len == 0 || len >= PATH_MAX) { + flb_free(buf); + return NULL; + } + + if (strstr(buf, "\\\\?\\")) { + memmove(buf, buf + 4, len + 1); + } +#elif defined(FLB_SYSTEM_FREEBSD) + if ((file_entries = kinfo_getfile(getpid(), &file_count)) == NULL) { + flb_free(buf); + return NULL; + } + + for (file_index=0; file_index < file_count; file_index++) { + if (file_entries[file_index].kf_fd == file->fd) { + strncpy(buf, file_entries[file_index].kf_path, PATH_MAX - 1); + buf[PATH_MAX - 1] = 0; + break; + } + } + free(file_entries); +#endif + return buf; +} + +int flb_tail_file_name_dup(char *path, struct flb_tail_file *file) +{ + file->name = flb_strdup(path); + if (!file->name) { + flb_errno(); + return -1; + } + file->name_len = strlen(file->name); + + if (file->real_name) { + flb_free(file->real_name); + } + + file->real_name = flb_tail_file_name(file); + if (!file->real_name) { + flb_errno(); + flb_free(file->name); + file->name = NULL; + return -1; + } + + return 0; +} + +/* Invoked every time a file was rotated */ +int flb_tail_file_rotated(struct flb_tail_file *file) +{ + int ret; + uint64_t ts; + char *name; + char *i_name; + char *tmp; + struct stat st; + struct flb_tail_config *ctx = file->config; + + /* Get the new file name */ + name = flb_tail_file_name(file); + if (!name) { + return -1; + } + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated %s -> %s", + file->inode, file->name, name); + + /* Update local file entry */ + tmp = file->name; + flb_tail_file_name_dup(name, file); + flb_plg_info(ctx->ins, "inode=%"PRIu64" handle rotation(): %s => %s", + file->inode, tmp, file->name); + if (file->rotated == 0) { + file->rotated = time(NULL); + mk_list_add(&file->_rotate_head, &file->config->files_rotated); + + /* Rotate the file in the database */ +#ifdef FLB_HAVE_SQLDB + if (file->config->db) { + ret = flb_tail_db_file_rotate(name, file, file->config); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not rotate file %s->%s in database", + file->name, name); + } + } +#endif + +#ifdef FLB_HAVE_METRICS + i_name = (char *) flb_input_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_files_rotated, ts, 1, (char *[]) {i_name}); + + /* OLD api */ + flb_metrics_sum(FLB_TAIL_METRIC_F_ROTATED, + 1, file->config->ins->metrics); +#endif + + /* Check if a new file has been created */ + ret = stat(tmp, &st); + if (ret == 0 && st.st_ino != file->inode) { + if (flb_tail_file_exists(&st, ctx) == FLB_FALSE) { + ret = flb_tail_file_append(tmp, &st, FLB_TAIL_STATIC, ctx); + if (ret == -1) { + flb_tail_scan(ctx->path_list, ctx); + } + else { + tail_signal_manager(file->config); + } + } + } + } + flb_free(tmp); + flb_free(name); + + return 0; +} + +static int check_purge_deleted_file(struct flb_tail_config *ctx, + struct flb_tail_file *file, time_t ts) +{ + int ret; + struct stat st; + + ret = fstat(file->fd, &st); + if (ret == -1) { + flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name); + flb_tail_file_remove(file); + return FLB_TRUE; + } + + if (st.st_nlink == 0) { + flb_plg_debug(ctx->ins, "purge: monitored file has been deleted: %s", + file->name); +#ifdef FLB_HAVE_SQLDB + if (ctx->db) { + /* Remove file entry from the database */ + flb_tail_db_file_delete(file, file->config); + } +#endif + /* Remove file from the monitored list */ + flb_tail_file_remove(file); + return FLB_TRUE; + } + + return FLB_FALSE; +} + +/* Purge rotated and deleted files */ +int flb_tail_file_purge(struct flb_input_instance *ins, + struct flb_config *config, void *context) +{ + int ret; + int count = 0; + struct mk_list *tmp; + struct mk_list *head; + struct flb_tail_file *file; + struct flb_tail_config *ctx = context; + time_t now; + struct stat st; + + /* Rotated files */ + now = time(NULL); + mk_list_foreach_safe(head, tmp, &ctx->files_rotated) { + file = mk_list_entry(head, struct flb_tail_file, _rotate_head); + if ((file->rotated + ctx->rotate_wait) <= now) { + ret = fstat(file->fd, &st); + if (ret == 0) { + flb_plg_debug(ctx->ins, + "inode=%"PRIu64" purge rotated file %s " \ + "(offset=%"PRId64" / size = %"PRIu64")", + file->inode, file->name, file->offset, (uint64_t)st.st_size); + if (file->pending_bytes > 0 && flb_input_buf_paused(ins)) { + flb_plg_warn(ctx->ins, "purged rotated file while data " + "ingestion is paused, consider increasing " + "rotate_wait"); + } + } + else { + flb_plg_debug(ctx->ins, + "inode=%"PRIu64" purge rotated file %s (offset=%"PRId64")", + file->inode, file->name, file->offset); + } + + flb_tail_file_remove(file); + count++; + } + } + + /* + * Deleted files: under high load scenarios, exists the chances that in + * our event loop we miss some notifications about a file. In order to + * sanitize our list of monitored files we will iterate all of them and check + * if they have been deleted or not. + */ + mk_list_foreach_safe(head, tmp, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + check_purge_deleted_file(ctx, file, now); + } + mk_list_foreach_safe(head, tmp, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + check_purge_deleted_file(ctx, file, now); + } + + return count; +} diff --git a/fluent-bit/plugins/in_tail/tail_file.h b/fluent-bit/plugins/in_tail/tail_file.h new file mode 100644 index 000000000..796224c37 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_file.h @@ -0,0 +1,137 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_FILE_H +#define FLB_TAIL_FILE_H + +#include <sys/types.h> +#include <sys/stat.h> + +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_input.h> + +#include "tail.h" +#include "tail_fs.h" +#include "tail_config.h" +#include "tail_file_internal.h" + +#ifdef FLB_SYSTEM_WINDOWS +#include "win32.h" +#endif + +#ifdef FLB_HAVE_REGEX +#define FLB_HASH_TABLE_SIZE 50 +#endif + +/* return the file modification time in seconds since epoch */ +static inline int64_t flb_tail_stat_mtime(struct stat *st) +{ +#if defined(FLB_HAVE_WINDOWS) + return (int64_t) st->st_mtime; +#elif defined(__APPLE__) && !defined(_POSIX_C_SOURCE) + return (int64_t) st->st_mtimespec.tv_sec; +#elif (_POSIX_C_SOURCE >= 200809L || \ + defined(_BSD_SOURCE) || defined(_SVID_SOURCE) || \ + defined(__BIONIC__) || (defined (__SVR4) && defined (__sun)) || \ + defined(__FreeBSD__) || defined (__linux__)) + return (int64_t) st->st_mtim.tv_sec; +#elif defined(_AIX) + return (int64_t) st->st_mtime; +#else + return (int64_t) st->st_mtime; +#endif + + /* backend unsupported: submit a PR :) */ + return -1; +} + +static inline int flb_tail_target_file_name_cmp(char *name, + struct flb_tail_file *file) +{ + int ret; + char *name_a = NULL; + char *name_b = NULL; + char *base_a = NULL; + char *base_b = NULL; + + name_a = flb_strdup(name); + if (!name_a) { + flb_errno(); + ret = -1; + goto out; + } + + base_a = flb_strdup(basename(name_a)); + if (!base_a) { + flb_errno(); + ret = -1; + goto out; + } + +#if defined(FLB_SYSTEM_WINDOWS) + name_b = flb_strdup(file->real_name); + if (!name_b) { + flb_errno(); + ret = -1; + goto out; + } + + base_b = basename(name_b); + ret = _stricmp(base_a, base_b); +#else + name_b = flb_strdup(file->real_name); + if (!name_b) { + flb_errno(); + ret = -1; + goto out; + } + base_b = basename(name_b); + ret = strcmp(base_a, base_b); +#endif + + out: + flb_free(name_a); + flb_free(name_b); + flb_free(base_a); + + /* FYI: 'base_b' never points to a new allocation, no flb_free is needed */ + + return ret; +} + +int flb_tail_file_name_dup(char *path, struct flb_tail_file *file); +int flb_tail_file_to_event(struct flb_tail_file *file); +int flb_tail_file_chunk(struct flb_tail_file *file); +int flb_tail_file_append(char *path, struct stat *st, int mode, + struct flb_tail_config *ctx); +void flb_tail_file_remove(struct flb_tail_file *file); +int flb_tail_file_remove_all(struct flb_tail_config *ctx); +char *flb_tail_file_name(struct flb_tail_file *file); +int flb_tail_file_is_rotated(struct flb_tail_config *ctx, + struct flb_tail_file *file); +int flb_tail_file_rotated(struct flb_tail_file *file); +int flb_tail_file_purge(struct flb_input_instance *ins, + struct flb_config *config, void *context); +int flb_tail_pack_line_map(struct flb_time *time, char **data, + size_t *data_size, struct flb_tail_file *file, + size_t processed_bytes); +int flb_tail_file_pack_line(struct flb_time *time, char *data, size_t data_size, + struct flb_tail_file *file, size_t processed_bytes); + +#endif diff --git a/fluent-bit/plugins/in_tail/tail_file_internal.h b/fluent-bit/plugins/in_tail/tail_file_internal.h new file mode 100644 index 000000000..6d95c87c1 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_file_internal.h @@ -0,0 +1,130 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_INTERNAL_H +#define FLB_TAIL_INTERNAL_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_log_event_encoder.h> + +#ifdef FLB_HAVE_PARSER +#include <fluent-bit/multiline/flb_ml.h> +#endif + +#include "tail.h" +#include "tail_config.h" + +struct flb_tail_file { + /* Inotify */ + int watch_fd; + /* file lookup info */ + int fd; + int64_t size; + int64_t offset; + int64_t last_line; + uint64_t dev_id; + uint64_t inode; + uint64_t link_inode; + int is_link; + char *name; /* target file name given by scan routine */ + char *real_name; /* real file name in the file system */ + char *orig_name; /* original file name (before rotation) */ + size_t name_len; + size_t orig_name_len; + time_t rotated; + int64_t pending_bytes; + + /* dynamic tag for this file */ + int tag_len; + char *tag_buf; + + /* OLD multiline */ + time_t mult_flush_timeout; /* time when multiline started */ + int mult_firstline; /* bool: mult firstline found ? */ + int mult_firstline_append; /* bool: mult firstline appendable ? */ + int mult_skipping; /* skipping because ignode_older than ? */ + int mult_keys; /* total number of buffered keys */ + + int mult_records; /* multiline records counter mult_sbuf */ + msgpack_sbuffer mult_sbuf; /* temporary msgpack buffer */ + msgpack_packer mult_pck; /* temporary msgpack packer */ + struct flb_time mult_time; /* multiline time parsed from first line */ + + /* OLD docker mode */ + time_t dmode_flush_timeout; /* time when docker mode started */ + flb_sds_t dmode_buf; /* buffer for docker mode */ + flb_sds_t dmode_lastline; /* last incomplete line */ + bool dmode_complete; /* buffer contains completed log */ + bool dmode_firstline; /* dmode mult firstline found ? */ + + /* multiline engine: file stream_id and local buffers */ + uint64_t ml_stream_id; + + /* content parsing, positions and buffer */ + size_t parsed; + size_t buf_len; + size_t buf_size; + char *buf_data; + + /* + * This value represent the number of bytes procesed by process_content() + * in the last iteration. + */ + size_t last_processed_bytes; + + /* + * Long-lines handling: this flag is enabled when a previous line was + * too long and the buffer did not contain a \n, so when reaching the + * missing \n, skip that content and move forward. + * + * This flag is only set when Skip_Long_Lines is On. + */ + int skip_next; + + /* Did the plugin already warn the user about long lines ? */ + int skip_warn; + + /* Opaque data type for specific fs-event backend data */ + void *fs_backend; + + /* database reference */ + uint64_t db_id; + + uint64_t hash_bits; + flb_sds_t hash_key; + + /* There are dedicated log event encoders for + * single and multi line events because I am respecting + * the old behavior which resulted in grouping both types + * of logs in tail_file.c but I don't know if this is + * strictly necessary. + */ + struct flb_log_event_encoder *ml_log_event_encoder; + struct flb_log_event_encoder *sl_log_event_encoder; + + /* reference */ + int tail_mode; + struct flb_tail_config *config; + struct mk_list _head; + struct mk_list _rotate_head; +}; +#endif diff --git a/fluent-bit/plugins/in_tail/tail_fs.h b/fluent-bit/plugins/in_tail/tail_fs.h new file mode 100644 index 000000000..948954333 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_fs.h @@ -0,0 +1,96 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_FS_H +#define FLB_TAIL_FS_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> + +#include "tail_config.h" +#include "tail_file_internal.h" + +#include "tail_fs_stat.h" +#ifdef FLB_HAVE_INOTIFY +#include "tail_fs_inotify.h" +#endif + +static inline int flb_tail_fs_init(struct flb_input_instance *in, + struct flb_tail_config *ctx, struct flb_config *config) +{ +#ifdef FLB_HAVE_INOTIFY + if (ctx->inotify_watcher) { + return flb_tail_fs_inotify_init(in, ctx, config); + } +#endif + return flb_tail_fs_stat_init(in, ctx, config); +} + +static inline void flb_tail_fs_pause(struct flb_tail_config *ctx) +{ +#ifdef FLB_HAVE_INOTIFY + if (ctx->inotify_watcher) { + return flb_tail_fs_inotify_pause(ctx); + } +#endif + return flb_tail_fs_stat_pause(ctx); +} + +static inline void flb_tail_fs_resume(struct flb_tail_config *ctx) +{ +#ifdef FLB_HAVE_INOTIFY + if (ctx->inotify_watcher) { + return flb_tail_fs_inotify_resume(ctx); + } +#endif + return flb_tail_fs_stat_resume(ctx); +} + +static inline int flb_tail_fs_add(struct flb_tail_config *ctx, struct flb_tail_file *file) +{ +#ifdef FLB_HAVE_INOTIFY + if (ctx->inotify_watcher) { + return flb_tail_fs_inotify_add(file); + } +#endif + return flb_tail_fs_stat_add(file); +} + +static inline int flb_tail_fs_remove(struct flb_tail_config *ctx, struct flb_tail_file *file) +{ +#ifdef FLB_HAVE_INOTIFY + if (ctx->inotify_watcher) { + return flb_tail_fs_inotify_remove(file); + } +#endif + return flb_tail_fs_stat_remove(file); +} + +static inline int flb_tail_fs_exit(struct flb_tail_config *ctx) +{ +#ifdef FLB_HAVE_INOTIFY + if (ctx->inotify_watcher) { + return flb_tail_fs_inotify_exit(ctx); + } +#endif + return flb_tail_fs_stat_exit(ctx); +} + + +#endif diff --git a/fluent-bit/plugins/in_tail/tail_fs_inotify.c b/fluent-bit/plugins/in_tail/tail_fs_inotify.c new file mode 100644 index 000000000..59d10ca08 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_fs_inotify.c @@ -0,0 +1,433 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _DEFAULT_SOURCE + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_input_plugin.h> + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/inotify.h> + +#include "tail_config.h" +#include "tail_file.h" +#include "tail_db.h" +#include "tail_signal.h" + +#include <limits.h> +#include <fcntl.h> + +#include <sys/ioctl.h> + +static int debug_event_mask(struct flb_tail_config *ctx, + struct flb_tail_file *file, + uint32_t mask) +{ + flb_sds_t buf; + int buf_size = 256; + + /* Only enter this function if debug mode is allowed */ + if (flb_log_check(FLB_LOG_DEBUG) == 0) { + return 0; + } + + if (file) { + buf_size = file->name_len + 128; + } + + if (buf_size < 256) { + buf_size = 256; + } + + /* Create buffer */ + buf = flb_sds_create_size(buf_size); + if (!buf) { + return -1; + } + + /* Print info into sds */ + if (file) { + flb_sds_printf(&buf, "inode=%"PRIu64", %s, events: ", file->inode, file->name); + } + else { + flb_sds_printf(&buf, "events: "); + } + + if (mask & IN_ATTRIB) { + flb_sds_printf(&buf, "IN_ATTRIB "); + } + if (mask & IN_IGNORED) { + flb_sds_printf(&buf, "IN_IGNORED "); + } + if (mask & IN_MODIFY) { + flb_sds_printf(&buf, "IN_MODIFY "); + } + if (mask & IN_MOVE_SELF) { + flb_sds_printf(&buf, "IN_MOVE_SELF "); + } + if (mask & IN_Q_OVERFLOW) { + flb_sds_printf(&buf, "IN_Q_OVERFLOW "); + } + + flb_plg_debug(ctx->ins, "%s", buf); + flb_sds_destroy(buf); + + return 0; +} + +static int tail_fs_add(struct flb_tail_file *file, int check_rotated) +{ + int flags; + int watch_fd; + char *name; + struct flb_tail_config *ctx = file->config; + + /* + * If there is no watcher associated, we only want to monitor events if + * this file is rotated to somewhere. Note at this point we are polling + * lines from the file and once we reach EOF (and a watch_fd exists), + * we update the flags to receive notifications. + */ + flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW; + + if (check_rotated == FLB_TRUE) { + flags |= IN_MOVE_SELF; + } + + /* + * Double check real name of the file associated to the inode: + * + * Inotify interface in the Kernel uses the inode number as a real reference + * for the file we have opened. If for some reason the file we are pointing + * out in file->name has been rotated and not been updated, we might not add + * the watch to the real file we aim to. + * + * A case like this can generate the issue: + * + * 1. inode=1 : file a.log is being watched + * 2. inode=1 : file a.log is rotated to a.log.1, but notification not + * delivered yet. + * 3. inode=2 : new file 'a.log' is created + * 4. inode=2 : the scan_path routine discover the new 'a.log' file + * 5. inode=2 : add an inotify watcher for 'a.log' + * 6. conflict: inotify_add_watch() receives the path 'a.log', + */ + + name = flb_tail_file_name(file); + if (!name) { + flb_plg_error(ctx->ins, "inode=%"PRIu64" cannot get real filename for inotify", + file->inode); + return -1; + } + + /* Register or update the flags */ + watch_fd = inotify_add_watch(ctx->fd_notify, name, flags); + flb_free(name); + + if (watch_fd == -1) { + flb_errno(); + if (errno == ENOSPC) { + flb_plg_error(ctx->ins, "inotify: The user limit on the total " + "number of inotify watches was reached or the kernel " + "failed to allocate a needed resource (ENOSPC)"); + } + return -1; + } + file->watch_fd = watch_fd; + flb_plg_info(ctx->ins, "inotify_fs_add(): inode=%"PRIu64" watch_fd=%i name=%s", + file->inode, watch_fd, file->name); + return 0; +} + +static int flb_tail_fs_add_rotated(struct flb_tail_file *file) +{ + return tail_fs_add(file, FLB_FALSE); +} + +static int tail_fs_event(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int ret; + int64_t offset; + struct mk_list *head; + struct mk_list *tmp; + struct flb_tail_config *ctx = in_context; + struct flb_tail_file *file = NULL; + struct inotify_event ev; + struct stat st; + + /* Read the event */ + ret = read(ctx->fd_notify, &ev, sizeof(struct inotify_event)); + if (ret < 1) { + return -1; + } + + /* Lookup watched file */ + mk_list_foreach_safe(head, tmp, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + if (file->watch_fd != ev.wd) { + file = NULL; + continue; + } + break; + } + + if (!file) { + return -1; + } + + /* Debug event */ + debug_event_mask(ctx, file, ev.mask); + + if (ev.mask & IN_IGNORED) { + flb_plg_debug(ctx->ins, "inode=%"PRIu64" watch_fd=%i IN_IGNORED", + file->inode, ev.wd); + return -1; + } + + /* Check file rotation (only if it has not been rotated before) */ + if (ev.mask & IN_MOVE_SELF && file->rotated == 0) { + flb_plg_debug(ins, "inode=%"PRIu64" rotated IN_MOVE SELF '%s'", + file->inode, file->name); + + /* A rotated file must be re-registered */ + flb_tail_file_rotated(file); + flb_tail_fs_remove(ctx, file); + flb_tail_fs_add_rotated(file); + } + + ret = fstat(file->fd, &st); + if (ret == -1) { + flb_plg_debug(ins, "inode=%"PRIu64" error stat(2) %s, removing", + file->inode, file->name); + flb_tail_file_remove(file); + return 0; + } + file->size = st.st_size; + file->pending_bytes = (file->size - file->offset); + + /* File was removed ? */ + if (ev.mask & IN_ATTRIB) { + /* Check if the file have been deleted */ + if (st.st_nlink == 0) { + flb_plg_debug(ins, "inode=%"PRIu64" file has been deleted: %s", + file->inode, file->name); + +#ifdef FLB_HAVE_SQLDB + if (ctx->db) { + /* Remove file entry from the database */ + flb_tail_db_file_delete(file, ctx); + } +#endif + /* Remove file from the monitored list */ + flb_tail_file_remove(file); + return 0; + } + } + + if (ev.mask & IN_MODIFY) { + /* + * The file was modified, check how many new bytes do + * we have. + */ + + /* Check if the file was truncated */ + if (file->offset > st.st_size) { + offset = lseek(file->fd, 0, SEEK_SET); + if (offset == -1) { + flb_errno(); + return -1; + } + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s", + file->inode, file->name); + file->offset = offset; + file->buf_len = 0; + + /* Update offset in the database file */ +#ifdef FLB_HAVE_SQLDB + if (ctx->db) { + flb_tail_db_file_offset(file, ctx); + } +#endif + } + } + + /* Collect the data */ + ret = in_tail_collect_event(file, config); + if (ret != FLB_TAIL_ERROR) { + /* + * Due to read buffer size capacity, there are some cases where the + * read operation cannot consume all new data available on one + * round; upon successfull read(2) some data can still remain. + * + * If that is the case, we set in the structure how + * many bytes are available 'now', so then the further + * routine that check pending bytes and then the inotified-file + * can process them properly after an internal signal. + * + * The goal to defer this routine is to avoid a blocking + * read(2) operation, that might kill performance. Just let's + * wait a second and do a good job. + */ + tail_signal_pending(ctx); + } + else { + return ret; + } + + return 0; +} + +static int in_tail_progress_check_callback(struct flb_input_instance *ins, + struct flb_config *config, void *context) +{ + int ret = 0; + struct mk_list *tmp; + struct mk_list *head; + struct flb_tail_config *ctx = context; + struct flb_tail_file *file; + int pending_data_detected; + struct stat st; + + (void) config; + + pending_data_detected = FLB_FALSE; + + mk_list_foreach_safe(head, tmp, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + + if (file->offset < file->size) { + pending_data_detected = FLB_TRUE; + + continue; + } + + ret = fstat(file->fd, &st); + if (ret == -1) { + flb_errno(); + flb_plg_error(ins, "fstat error"); + + continue; + } + + if (file->offset < st.st_size) { + file->size = st.st_size; + file->pending_bytes = (file->size - file->offset); + + pending_data_detected = FLB_TRUE; + } + } + + if (pending_data_detected) { + tail_signal_pending(ctx); + } + + return 0; +} + +/* File System events based on Inotify(2). Linux >= 2.6.32 is suggested */ +int flb_tail_fs_inotify_init(struct flb_input_instance *in, + struct flb_tail_config *ctx, struct flb_config *config) +{ + int fd; + int ret; + + flb_plg_debug(ctx->ins, "flb_tail_fs_inotify_init() initializing inotify tail input"); + + /* Create inotify instance */ + fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); + if (fd == -1) { + flb_errno(); + return -1; + } + flb_plg_debug(ctx->ins, "inotify watch fd=%i", fd); + ctx->fd_notify = fd; + + /* This backend use Fluent Bit event-loop to trigger notifications */ + ret = flb_input_set_collector_event(in, tail_fs_event, + ctx->fd_notify, config); + if (ret < 0) { + close(fd); + return -1; + } + ctx->coll_fd_fs1 = ret; + + /* Register callback to check current tail offsets */ + ret = flb_input_set_collector_time(in, in_tail_progress_check_callback, + ctx->progress_check_interval, + ctx->progress_check_interval_nsec, + config); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return -1; + } + ctx->coll_fd_progress_check = ret; + + return 0; +} + +void flb_tail_fs_inotify_pause(struct flb_tail_config *ctx) +{ + flb_input_collector_pause(ctx->coll_fd_fs1, ctx->ins); +} + +void flb_tail_fs_inotify_resume(struct flb_tail_config *ctx) +{ + flb_input_collector_resume(ctx->coll_fd_fs1, ctx->ins); +} + +int flb_tail_fs_inotify_add(struct flb_tail_file *file) +{ + int ret; + struct flb_tail_config *ctx = file->config; + + ret = tail_fs_add(file, FLB_TRUE); + if (ret == -1) { + flb_plg_error(ctx->ins, "inode=%"PRIu64" cannot register file %s", + file->inode, file->name); + return -1; + } + + return 0; +} + +int flb_tail_fs_inotify_remove(struct flb_tail_file *file) +{ + struct flb_tail_config *ctx = file->config; + + if (file->watch_fd == -1) { + return 0; + } + + flb_plg_info(ctx->ins, "inotify_fs_remove(): inode=%"PRIu64" watch_fd=%i", + file->inode, file->watch_fd); + + inotify_rm_watch(file->config->fd_notify, file->watch_fd); + file->watch_fd = -1; + return 0; +} + +int flb_tail_fs_inotify_exit(struct flb_tail_config *ctx) +{ + return close(ctx->fd_notify); +} diff --git a/fluent-bit/plugins/in_tail/tail_fs_inotify.h b/fluent-bit/plugins/in_tail/tail_fs_inotify.h new file mode 100644 index 000000000..128ab0624 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_fs_inotify.h @@ -0,0 +1,37 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_FS_INOTIFY_H +#define FLB_TAIL_FS_INOTIFY_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> + +#include "tail_config.h" +#include "tail_file_internal.h" + +int flb_tail_fs_inotify_init(struct flb_input_instance *in, + struct flb_tail_config *ctx, struct flb_config *config); +int flb_tail_fs_inotify_add(struct flb_tail_file *file); +int flb_tail_fs_inotify_remove(struct flb_tail_file *file); +int flb_tail_fs_inotify_exit(struct flb_tail_config *ctx); +void flb_tail_fs_inotify_pause(struct flb_tail_config *ctx); +void flb_tail_fs_inotify_resume(struct flb_tail_config *ctx); + +#endif diff --git a/fluent-bit/plugins/in_tail/tail_fs_stat.c b/fluent-bit/plugins/in_tail/tail_fs_stat.c new file mode 100644 index 000000000..6b312c9bd --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_fs_stat.c @@ -0,0 +1,253 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _DEFAULT_SOURCE + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> + +#include <sys/types.h> +#include <sys/stat.h> + +#include "tail_file.h" +#include "tail_db.h" +#include "tail_config.h" +#include "tail_signal.h" + +#ifdef FLB_SYSTEM_WINDOWS +#include "win32.h" +#endif + +struct fs_stat { + /* last time check */ + time_t checked; + + /* previous status */ + struct stat st; +}; + +static int tail_fs_event(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int ret; + struct mk_list *head; + struct mk_list *tmp; + struct flb_tail_config *ctx = in_context; + struct flb_tail_file *file = NULL; + struct fs_stat *fst; + struct stat st; + time_t t; + + t = time(NULL); + + /* Lookup watched file */ + mk_list_foreach_safe(head, tmp, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + fst = file->fs_backend; + + /* Check current status of the file */ + ret = fstat(file->fd, &st); + if (ret == -1) { + flb_errno(); + continue; + } + + /* Check if the file was modified */ + if ((fst->st.st_mtime != st.st_mtime) || + (fst->st.st_size != st.st_size)) { + /* Update stat info and trigger the notification */ + memcpy(&fst->st, &st, sizeof(struct stat)); + fst->checked = t; + in_tail_collect_event(file, config); + } + } + + return 0; +} + +static int tail_fs_check(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int ret; + int64_t offset; + char *name; + struct mk_list *tmp; + struct mk_list *head; + struct flb_tail_config *ctx = in_context; + struct flb_tail_file *file = NULL; + struct fs_stat *fst; + struct stat st; + + /* Lookup watched file */ + mk_list_foreach_safe(head, tmp, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + fst = file->fs_backend; + + ret = fstat(file->fd, &st); + if (ret == -1) { + flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name); + flb_tail_file_remove(file); + continue; + } + + /* Check if the file have been deleted */ + if (st.st_nlink == 0) { + flb_plg_debug(ctx->ins, "file has been deleted: %s", file->name); +#ifdef FLB_HAVE_SQLDB + if (ctx->db) { + /* Remove file entry from the database */ + flb_tail_db_file_delete(file, ctx); + } +#endif + flb_tail_file_remove(file); + continue; + } + + /* Check if the file was truncated */ + if (file->offset > st.st_size) { + offset = lseek(file->fd, 0, SEEK_SET); + if (offset == -1) { + flb_errno(); + return -1; + } + + flb_plg_debug(ctx->ins, "file truncated %s", file->name); + file->offset = offset; + file->buf_len = 0; + memcpy(&fst->st, &st, sizeof(struct stat)); + +#ifdef FLB_HAVE_SQLDB + /* Update offset in database file */ + if (ctx->db) { + flb_tail_db_file_offset(file, ctx); + } +#endif + } + + if (file->offset < st.st_size) { + file->pending_bytes = (st.st_size - file->offset); + tail_signal_pending(ctx); + } + else { + file->pending_bytes = 0; + } + + + /* Discover the current file name for the open file descriptor */ + name = flb_tail_file_name(file); + if (!name) { + flb_plg_debug(ctx->ins, "could not resolve %s, removing", file->name); + flb_tail_file_remove(file); + continue; + } + + /* + * Check if file still exists. This method requires explicity that the + * user is using an absolute path, otherwise we will be rotating the + * wrong file. + * + * flb_tail_target_file_name_cmp is a deeper compare than + * flb_tail_file_name_cmp. If applicable, it compares to the underlying + * real_name of the file. + */ + if (flb_tail_file_is_rotated(ctx, file) == FLB_TRUE) { + flb_tail_file_rotated(file); + } + flb_free(name); + + } + + return 0; +} + +/* File System events based on stat(2) */ +int flb_tail_fs_stat_init(struct flb_input_instance *in, + struct flb_tail_config *ctx, struct flb_config *config) +{ + int ret; + + flb_plg_debug(ctx->ins, "flb_tail_fs_stat_init() initializing stat tail input"); + + /* Set a manual timer to collect events every 0.250 seconds */ + ret = flb_input_set_collector_time(in, tail_fs_event, + 0, 250000000, config); + if (ret < 0) { + return -1; + } + ctx->coll_fd_fs1 = ret; + + /* Set a manual timer to check deleted/rotated files every 2.5 seconds */ + ret = flb_input_set_collector_time(in, tail_fs_check, + 2, 500000000, config); + if (ret < 0) { + return -1; + } + ctx->coll_fd_fs2 = ret; + + return 0; +} + +void flb_tail_fs_stat_pause(struct flb_tail_config *ctx) +{ + flb_input_collector_pause(ctx->coll_fd_fs1, ctx->ins); + flb_input_collector_pause(ctx->coll_fd_fs2, ctx->ins); +} + +void flb_tail_fs_stat_resume(struct flb_tail_config *ctx) +{ + flb_input_collector_resume(ctx->coll_fd_fs1, ctx->ins); + flb_input_collector_resume(ctx->coll_fd_fs2, ctx->ins); +} + +int flb_tail_fs_stat_add(struct flb_tail_file *file) +{ + int ret; + struct fs_stat *fst; + + fst = flb_malloc(sizeof(struct fs_stat)); + if (!fst) { + flb_errno(); + return -1; + } + + fst->checked = time(NULL); + ret = stat(file->name, &fst->st); + if (ret == -1) { + flb_errno(); + flb_free(fst); + return -1; + } + file->fs_backend = fst; + + return 0; +} + +int flb_tail_fs_stat_remove(struct flb_tail_file *file) +{ + if (file->tail_mode == FLB_TAIL_EVENT) { + flb_free(file->fs_backend); + } + return 0; +} + +int flb_tail_fs_stat_exit(struct flb_tail_config *ctx) +{ + (void) ctx; + return 0; +} diff --git a/fluent-bit/plugins/in_tail/tail_fs_stat.h b/fluent-bit/plugins/in_tail/tail_fs_stat.h new file mode 100644 index 000000000..21a0704cb --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_fs_stat.h @@ -0,0 +1,37 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_FS_STAT_H +#define FLB_TAIL_FS_STAT_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> + +#include "tail_config.h" +#include "tail_file_internal.h" + +int flb_tail_fs_stat_init(struct flb_input_instance *in, + struct flb_tail_config *ctx, struct flb_config *config); +int flb_tail_fs_stat_add(struct flb_tail_file *file); +int flb_tail_fs_stat_remove(struct flb_tail_file *file); +int flb_tail_fs_stat_exit(struct flb_tail_config *ctx); +void flb_tail_fs_stat_pause(struct flb_tail_config *ctx); +void flb_tail_fs_stat_resume(struct flb_tail_config *ctx); + +#endif diff --git a/fluent-bit/plugins/in_tail/tail_multiline.c b/fluent-bit/plugins/in_tail/tail_multiline.c new file mode 100644 index 000000000..71c031014 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_multiline.c @@ -0,0 +1,606 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_kv.h> + +#include "tail_config.h" +#include "tail_multiline.h" + +static int tail_mult_append(struct flb_parser *parser, + struct flb_tail_config *ctx) +{ + struct flb_tail_mult *mp; + + mp = flb_malloc(sizeof(struct flb_tail_mult)); + if (!mp) { + flb_errno(); + return -1; + } + + mp->parser = parser; + mk_list_add(&mp->_head, &ctx->mult_parsers); + + return 0; +} + +int flb_tail_mult_create(struct flb_tail_config *ctx, + struct flb_input_instance *ins, + struct flb_config *config) +{ + int ret; + const char *tmp; + struct mk_list *head; + struct flb_parser *parser; + struct flb_kv *kv; + + if (ctx->multiline_flush <= 0) { + ctx->multiline_flush = 1; + } + + mk_list_init(&ctx->mult_parsers); + + /* Get firstline parser */ + tmp = flb_input_get_property("parser_firstline", ins); + if (!tmp) { + flb_plg_error(ctx->ins, "multiline: no parser defined for firstline"); + return -1; + } + parser = flb_parser_get(tmp, config); + if (!parser) { + flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", tmp); + return -1; + } + + ctx->mult_parser_firstline = parser; + + /* Read all multiline rules */ + mk_list_foreach(head, &ins->properties) { + kv = mk_list_entry(head, struct flb_kv, _head); + if (strcasecmp("parser_firstline", kv->key) == 0) { + continue; + } + + if (strncasecmp("parser_", kv->key, 7) == 0) { + parser = flb_parser_get(kv->val, config); + if (!parser) { + flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", kv->val); + return -1; + } + + ret = tail_mult_append(parser, ctx); + if (ret == -1) { + return -1; + } + } + } + + return 0; +} + +int flb_tail_mult_destroy(struct flb_tail_config *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_tail_mult *mp; + + if (ctx->multiline == FLB_FALSE) { + return 0; + } + + mk_list_foreach_safe(head, tmp, &ctx->mult_parsers) { + mp = mk_list_entry(head, struct flb_tail_mult, _head); + mk_list_del(&mp->_head); + flb_free(mp); + } + + return 0; +} + +/* Process the result of a firstline match */ +int flb_tail_mult_process_first(time_t now, + char *buf, size_t size, + struct flb_time *out_time, + struct flb_tail_file *file, + struct flb_tail_config *ctx) +{ + int ret; + size_t off; + msgpack_object map; + msgpack_unpacked result; + + /* If a previous multiline context already exists, flush first */ + if (file->mult_firstline && !file->mult_skipping) { + flb_tail_mult_flush(file, ctx); + } + + /* Remark as first multiline message */ + file->mult_firstline = FLB_TRUE; + + /* Validate obtained time, if not set, set the current time */ + if (flb_time_to_nanosec(out_time) == 0L) { + flb_time_get(out_time); + } + + /* Should we skip this multiline record ? */ + if (ctx->ignore_older > 0) { + if ((now - ctx->ignore_older) > out_time->tm.tv_sec) { + flb_free(buf); + file->mult_skipping = FLB_TRUE; + file->mult_firstline = FLB_TRUE; + + /* we expect more data to skip */ + return FLB_TAIL_MULT_MORE; + } + } + + /* Re-initiate buffers */ + msgpack_sbuffer_init(&file->mult_sbuf); + msgpack_packer_init(&file->mult_pck, &file->mult_sbuf, msgpack_sbuffer_write); + + /* + * flb_parser_do() always return a msgpack buffer, so we tweak our + * local msgpack reference to avoid an extra allocation. The only + * concern is that we don't know what's the real size of the memory + * allocated, so we assume it's just 'out_size'. + */ + file->mult_flush_timeout = now + (ctx->multiline_flush - 1); + file->mult_sbuf.data = buf; + file->mult_sbuf.size = size; + file->mult_sbuf.alloc = size; + + /* Set multiline status */ + file->mult_firstline = FLB_TRUE; + file->mult_skipping = FLB_FALSE; + flb_time_copy(&file->mult_time, out_time); + + off = 0; + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buf, size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + msgpack_sbuffer_destroy(&file->mult_sbuf); + msgpack_unpacked_destroy(&result); + return FLB_TAIL_MULT_NA; + } + + map = result.data; + file->mult_keys = map.via.map.size; + msgpack_unpacked_destroy(&result); + + /* We expect more data */ + return FLB_TAIL_MULT_MORE; +} + +/* Append a raw log entry to the last structured field in the mult buffer */ +static inline void flb_tail_mult_append_raw(char *buf, int size, + struct flb_tail_file *file, + struct flb_tail_config *config) +{ + /* Append the raw string */ + msgpack_pack_str(&file->mult_pck, size); + msgpack_pack_str_body(&file->mult_pck, buf, size); +} + +/* Check if the last key value type of a map is string or not */ +static inline int is_last_key_val_string(char *buf, size_t size) +{ + int ret = FLB_FALSE; + size_t off; + msgpack_unpacked result; + msgpack_object v; + msgpack_object root; + + off = 0; + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buf, size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + return ret; + } + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + ret = FLB_FALSE; + } + else { + if (root.via.map.size == 0) { + ret = FLB_FALSE; + } + else { + v = root.via.map.ptr[root.via.map.size - 1].val; + if (v.type == MSGPACK_OBJECT_STR) { + ret = FLB_TRUE; + } + } + } + + msgpack_unpacked_destroy(&result); + return ret; +} + +int flb_tail_mult_process_content(time_t now, + char *buf, size_t len, + struct flb_tail_file *file, + struct flb_tail_config *ctx, + size_t processed_bytes) +{ + int ret; + size_t off; + void *out_buf; + size_t out_size = 0; + struct mk_list *head; + struct flb_tail_mult *mult_parser = NULL; + struct flb_time out_time = {0}; + msgpack_object map; + msgpack_unpacked result; + + /* Always check if this line is the beginning of a new multiline message */ + ret = flb_parser_do(ctx->mult_parser_firstline, + buf, len, + &out_buf, &out_size, &out_time); + if (ret >= 0) { + /* + * The content is a candidate for a firstline, but we need to perform + * the extra-mandatory check where the last key value type must be + * a string, otherwise no string concatenation with continuation lines + * will be possible. + */ + ret = is_last_key_val_string(out_buf, out_size); + if (ret == FLB_TRUE) + file->mult_firstline_append = FLB_TRUE; + else + file->mult_firstline_append = FLB_FALSE; + + flb_tail_mult_process_first(now, out_buf, out_size, &out_time, + file, ctx); + return FLB_TAIL_MULT_MORE; + } + + if (file->mult_skipping == FLB_TRUE) { + return FLB_TAIL_MULT_MORE; + } + + /* + * Once here means we have some data that is a continuation, iterate + * parsers trying to find a match + */ + out_buf = NULL; + mk_list_foreach(head, &ctx->mult_parsers) { + mult_parser = mk_list_entry(head, struct flb_tail_mult, _head); + + /* Process line text with current parser */ + out_buf = NULL; + out_size = 0; + ret = flb_parser_do(mult_parser->parser, + buf, len, + &out_buf, &out_size, &out_time); + if (ret < 0) { + mult_parser = NULL; + continue; + } + + /* The line was processed, break the loop and buffer the data */ + break; + } + + if (!mult_parser) { + /* + * If no parser was found means the string log must be appended + * to the last structured field. + */ + if (file->mult_firstline && file->mult_firstline_append) { + flb_tail_mult_append_raw(buf, len, file, ctx); + } + else { + flb_tail_file_pack_line(NULL, buf, len, file, processed_bytes); + } + + return FLB_TAIL_MULT_MORE; + } + + off = 0; + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, out_buf, out_size, &off); + map = result.data; + + /* Append new map to our local msgpack buffer */ + file->mult_keys += map.via.map.size; + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_write(&file->mult_sbuf, out_buf, out_size); + flb_free(out_buf); + + return FLB_TAIL_MULT_MORE; +} + +static int flb_tail_mult_pack_line_body( + struct flb_log_event_encoder *context, + struct flb_tail_file *file) +{ + size_t adjacent_object_offset; + size_t continuation_length; + msgpack_unpacked adjacent_object; + msgpack_unpacked current_object; + size_t entry_index; + msgpack_object entry_value; + msgpack_object entry_key; + msgpack_object_map *data_map; + int map_size; + size_t offset; + struct flb_tail_config *config; + int result; + + result = FLB_EVENT_ENCODER_SUCCESS; + config = (struct flb_tail_config *) file->config; + + /* New Map size */ + map_size = file->mult_keys; + + if (file->config->path_key != NULL) { + map_size++; + + result = flb_log_event_encoder_append_body_values( + context, + FLB_LOG_EVENT_CSTRING_VALUE(config->path_key), + FLB_LOG_EVENT_CSTRING_VALUE(file->name)); + } + + + msgpack_unpacked_init(¤t_object); + msgpack_unpacked_init(&adjacent_object); + + offset = 0; + + while (result == FLB_EVENT_ENCODER_SUCCESS && + msgpack_unpack_next(¤t_object, + file->mult_sbuf.data, + file->mult_sbuf.size, + &offset) == MSGPACK_UNPACK_SUCCESS) { + if (current_object.data.type != MSGPACK_OBJECT_MAP) { + continue; + } + + data_map = ¤t_object.data.via.map; + + continuation_length = 0; + + for (entry_index = 0; entry_index < data_map->size; entry_index++) { + entry_key = data_map->ptr[entry_index].key; + entry_value = data_map->ptr[entry_index].val; + + result = flb_log_event_encoder_append_body_msgpack_object(context, + &entry_key); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + + /* Check if this is the last entry in the map and if that is + * the case then add the lengths of all the trailing string + * objects after the map in order to append them to the value + * but only if the value object is a string + */ + if (entry_index + 1 == data_map->size && + entry_value.type == MSGPACK_OBJECT_STR) { + adjacent_object_offset = offset; + + while (msgpack_unpack_next( + &adjacent_object, + file->mult_sbuf.data, + file->mult_sbuf.size, + &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) { + if (adjacent_object.data.type != MSGPACK_OBJECT_STR) { + break; + } + + /* Sum total bytes to append */ + continuation_length += adjacent_object.data.via.str.size + 1; + } + + result = flb_log_event_encoder_append_body_string_length( + context, + entry_value.via.str.size + + continuation_length); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + + result = flb_log_event_encoder_append_body_string_body( + context, + (char *) entry_value.via.str.ptr, + entry_value.via.str.size); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + + if (continuation_length > 0) { + adjacent_object_offset = offset; + + while (msgpack_unpack_next( + &adjacent_object, + file->mult_sbuf.data, + file->mult_sbuf.size, + &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) { + if (adjacent_object.data.type != MSGPACK_OBJECT_STR) { + break; + } + + result = flb_log_event_encoder_append_body_string_body( + context, + "\n", + 1); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + + result = flb_log_event_encoder_append_body_string_body( + context, + (char *) adjacent_object.data.via.str.ptr, + adjacent_object.data.via.str.size); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + } + } + } + else { + result = flb_log_event_encoder_append_body_msgpack_object(context, + &entry_value); + } + } + } + + msgpack_unpacked_destroy(¤t_object); + msgpack_unpacked_destroy(&adjacent_object); + + /* Reset status */ + file->mult_firstline = FLB_FALSE; + file->mult_skipping = FLB_FALSE; + file->mult_keys = 0; + file->mult_flush_timeout = 0; + + msgpack_sbuffer_destroy(&file->mult_sbuf); + + file->mult_sbuf.data = NULL; + + flb_time_zero(&file->mult_time); + + return result; +} + +/* Flush any multiline context data into outgoing buffers */ +int flb_tail_mult_flush(struct flb_tail_file *file, struct flb_tail_config *ctx) +{ + int result; + + /* nothing to flush */ + if (file->mult_firstline == FLB_FALSE) { + return -1; + } + + if (file->mult_keys == 0) { + return -1; + } + + result = flb_log_event_encoder_begin_record(file->ml_log_event_encoder); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_set_timestamp( + file->ml_log_event_encoder, &file->mult_time); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_tail_mult_pack_line_body( + file->ml_log_event_encoder, + file); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_record( + file->ml_log_event_encoder); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + flb_input_log_append(ctx->ins, + file->tag_buf, + file->tag_len, + file->ml_log_event_encoder->output_buffer, + file->ml_log_event_encoder->output_length); + result = 0; + } + else { + flb_plg_error(file->config->ins, "error packing event : %d", result); + + result = -1; + } + + flb_log_event_encoder_reset(file->ml_log_event_encoder); + + return result; +} + +static void file_pending_flush(struct flb_tail_config *ctx, + struct flb_tail_file *file, time_t now) +{ + if (file->mult_flush_timeout > now) { + return; + } + + if (file->mult_firstline == FLB_FALSE) { + if (file->mult_sbuf.data == NULL || file->mult_sbuf.size <= 0) { + return; + } + } + + flb_tail_mult_flush(file, ctx); +} + +int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx) +{ + time_t expired; + struct mk_list *head; + struct flb_tail_file *file; + + expired = time(NULL) + 3600; + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, expired); + } + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, expired); + } + + return 0; +} + +int flb_tail_mult_pending_flush(struct flb_input_instance *ins, + struct flb_config *config, void *context) +{ + time_t now; + struct mk_list *head; + struct flb_tail_file *file; + struct flb_tail_config *ctx = context; + + now = time(NULL); + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + + file_pending_flush(ctx, file, now); + } + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + + file_pending_flush(ctx, file, now); + } + + return 0; +} diff --git a/fluent-bit/plugins/in_tail/tail_multiline.h b/fluent-bit/plugins/in_tail/tail_multiline.h new file mode 100644 index 000000000..d7f7539b1 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_multiline.h @@ -0,0 +1,57 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_TAIL_MULT_H +#define FLB_TAIL_TAIL_MULT_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> + +#include "tail_config.h" +#include "tail_file.h" + +#define FLB_TAIL_MULT_NA -1 /* not applicable as a multiline stream */ +#define FLB_TAIL_MULT_DONE 0 /* finished a multiline stream */ +#define FLB_TAIL_MULT_MORE 1 /* expect more lines to come */ +#define FLB_TAIL_MULT_FLUSH "4" /* max flush time for multiline: 4 seconds */ + +struct flb_tail_mult { + struct flb_parser *parser; + struct mk_list _head; +}; + +int flb_tail_mult_create(struct flb_tail_config *ctx, + struct flb_input_instance *ins, + struct flb_config *config); + +int flb_tail_mult_destroy(struct flb_tail_config *ctx); + +int flb_tail_mult_process_content(time_t now, + char *buf, size_t len, + struct flb_tail_file *file, + struct flb_tail_config *ctx, + size_t processed_bytes); +int flb_tail_mult_flush(struct flb_tail_file *file, + struct flb_tail_config *ctx); + +int flb_tail_mult_pending_flush(struct flb_input_instance *ins, + struct flb_config *config, void *context); +int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx); + +#endif diff --git a/fluent-bit/plugins/in_tail/tail_scan.c b/fluent-bit/plugins/in_tail/tail_scan.c new file mode 100644 index 000000000..ccb8e070a --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_scan.c @@ -0,0 +1,71 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input_plugin.h> +#include "tail.h" +#include "tail_config.h" + +/* + * Include proper scan backend + */ +#ifdef FLB_SYSTEM_WINDOWS +#include "tail_scan_win32.c" +#else +#include "tail_scan_glob.c" +#endif + +int flb_tail_scan(struct mk_list *path_list, struct flb_tail_config *ctx) +{ + int ret; + struct mk_list *head; + struct flb_slist_entry *pattern; + + mk_list_foreach(head, path_list) { + pattern = mk_list_entry(head, struct flb_slist_entry, _head); + ret = tail_scan_path(pattern->str, ctx); + if (ret == -1) { + flb_plg_warn(ctx->ins, "error scanning path: %s", pattern->str); + } + else { + flb_plg_debug(ctx->ins, "%i new files found on path '%s'", + ret, pattern->str); + } + } + + return 0; +} + +/* + * Triggered by refresh_interval, it re-scan the path looking for new files + * that match the original path pattern. + */ +int flb_tail_scan_callback(struct flb_input_instance *ins, + struct flb_config *config, void *context) +{ + int ret; + struct flb_tail_config *ctx = context; + (void) config; + + ret = flb_tail_scan(ctx->path_list, ctx); + if (ret > 0) { + flb_plg_debug(ins, "%i new files found", ret); + } + + return ret; +} diff --git a/fluent-bit/plugins/in_tail/tail_scan.h b/fluent-bit/plugins/in_tail/tail_scan.h new file mode 100644 index 000000000..ec3c96a2a --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_scan.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_SCAN_H +#define FLB_TAIL_SCAN_H + +#include "tail_config.h" + +int flb_tail_scan(struct mk_list *path, struct flb_tail_config *ctx); +int flb_tail_scan_callback(struct flb_input_instance *ins, + struct flb_config *config, void *context); + +#endif diff --git a/fluent-bit/plugins/in_tail/tail_scan_glob.c b/fluent-bit/plugins/in_tail/tail_scan_glob.c new file mode 100644 index 000000000..b330b7c3b --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_scan_glob.c @@ -0,0 +1,278 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/types.h> +#include <sys/stat.h> +#include <glob.h> +#include <fnmatch.h> + +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_utils.h> + +#include "tail.h" +#include "tail_file.h" +#include "tail_signal.h" +#include "tail_scan.h" +#include "tail_config.h" + +/* Define missing GLOB_TILDE if not exists */ +#ifndef GLOB_TILDE +#define GLOB_TILDE 1<<2 /* use GNU Libc value */ +#define UNSUP_TILDE 1 + +/* we need these extra headers for path resolution */ +#include <limits.h> +#include <sys/types.h> +#include <pwd.h> + +static char *expand_tilde(const char *path) +{ + int len; + char user[256]; + char *p = NULL; + char *dir = NULL; + char *tmp = NULL; + struct passwd *uinfo = NULL; + + if (path[0] == '~') { + p = strchr(path, '/'); + + if (p) { + /* check case '~/' */ + if ((p - path) == 1) { + dir = getenv("HOME"); + if (!dir) { + return path; + } + } + else { + /* + * it refers to a different user: ~user/abc, first step grab + * the user name. + */ + len = (p - path) - 1; + memcpy(user, path + 1, len); + user[len] = '\0'; + + /* use getpwnam() to resolve user information */ + uinfo = getpwnam(user); + if (!uinfo) { + return path; + } + + dir = uinfo->pw_dir; + } + } + else { + dir = getenv("HOME"); + if (!dir) { + return path; + } + } + + if (p) { + tmp = flb_malloc(PATH_MAX); + if (!tmp) { + flb_errno(); + return NULL; + } + snprintf(tmp, PATH_MAX - 1, "%s%s", dir, p); + } + else { + dir = getenv("HOME"); + if (!dir) { + return path; + } + + tmp = flb_strdup(dir); + if (!tmp) { + return path; + } + } + + return tmp; + } + + return path; +} +#endif + +static int tail_is_excluded(char *path, struct flb_tail_config *ctx) +{ + struct mk_list *head; + struct flb_slist_entry *pattern; + + if (!ctx->exclude_list) { + return FLB_FALSE; + } + + mk_list_foreach(head, ctx->exclude_list) { + pattern = mk_list_entry(head, struct flb_slist_entry, _head); + if (fnmatch(pattern->str, path, 0) == 0) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static inline int do_glob(const char *pattern, int flags, + void *not_used, glob_t *pglob) +{ + int ret; + int new_flags; + char *tmp = NULL; + int tmp_needs_free = FLB_FALSE; + (void) not_used; + + /* Save current values */ + new_flags = flags; + + if (flags & GLOB_TILDE) { +#ifdef UNSUP_TILDE + /* + * Some libc libraries like Musl do not support GLOB_TILDE for tilde + * expansion. A workaround is to use wordexp(3) but looking at it + * implementation in Musl it looks quite expensive: + * + * http://git.musl-libc.org/cgit/musl/tree/src/misc/wordexp.c + * + * the workaround is to do our own tilde expansion in a temporary buffer. + */ + + /* Look for a tilde */ + tmp = expand_tilde(pattern); + if (tmp != pattern) { + /* the path was expanded */ + pattern = tmp; + tmp_needs_free = FLB_TRUE; + } + + /* remove unused flag */ + new_flags &= ~GLOB_TILDE; +#endif + } + + /* invoke glob with new parameters */ + ret = glob(pattern, new_flags, NULL, pglob); + + /* remove temporary buffer, if allocated by expand_tilde above. + * Note that this buffer is only used for libc implementations + * that do not support the GLOB_TILDE flag, like musl. */ + if ((tmp != NULL) && (tmp_needs_free == FLB_TRUE)) { + flb_free(tmp); + } + + return ret; +} + +/* Scan a path, register the entries and return how many */ +static int tail_scan_path(const char *path, struct flb_tail_config *ctx) +{ + int i; + int ret; + int count = 0; + glob_t globbuf; + time_t now; + int64_t mtime; + struct stat st; + + flb_plg_debug(ctx->ins, "scanning path %s", path); + + /* Safe reset for globfree() */ + globbuf.gl_pathv = NULL; + + /* Scan the given path */ + ret = do_glob(path, GLOB_TILDE | GLOB_ERR, NULL, &globbuf); + if (ret != 0) { + switch (ret) { + case GLOB_NOSPACE: + flb_plg_error(ctx->ins, "no memory space available"); + return -1; + case GLOB_ABORTED: + flb_plg_error(ctx->ins, "read error, check permissions: %s", path); + return -1; + case GLOB_NOMATCH: + ret = stat(path, &st); + if (ret == -1) { + flb_plg_debug(ctx->ins, "cannot read info from: %s", path); + } + else { + ret = access(path, R_OK); + if (ret == -1 && errno == EACCES) { + flb_plg_error(ctx->ins, "NO read access for path: %s", path); + } + else { + flb_plg_debug(ctx->ins, "NO matches for path: %s", path); + } + } + return 0; + } + } + + + /* For every entry found, generate an output list */ + now = time(NULL); + for (i = 0; i < globbuf.gl_pathc; i++) { + ret = stat(globbuf.gl_pathv[i], &st); + if (ret == 0 && S_ISREG(st.st_mode)) { + /* Check if this file is blacklisted */ + if (tail_is_excluded(globbuf.gl_pathv[i], ctx) == FLB_TRUE) { + flb_plg_debug(ctx->ins, "excluded=%s", globbuf.gl_pathv[i]); + continue; + } + + if (ctx->ignore_older > 0) { + mtime = flb_tail_stat_mtime(&st); + if (mtime > 0) { + if ((now - ctx->ignore_older) > mtime) { + flb_plg_debug(ctx->ins, "excluded=%s (ignore_older)", + globbuf.gl_pathv[i]); + continue; + } + } + } + + /* Append file to list */ + ret = flb_tail_file_append(globbuf.gl_pathv[i], &st, + FLB_TAIL_STATIC, ctx); + if (ret == 0) { + flb_plg_debug(ctx->ins, "scan_glob add(): %s, inode %li", + globbuf.gl_pathv[i], st.st_ino); + count++; + } + else { + flb_plg_debug(ctx->ins, "scan_blog add(): dismissed: %s, inode %li", + globbuf.gl_pathv[i], st.st_ino); + } + } + else { + flb_plg_debug(ctx->ins, "skip (invalid) entry=%s", + globbuf.gl_pathv[i]); + } + } + + if (count > 0) { + tail_signal_manager(ctx); + } + + globfree(&globbuf); + return count; +} diff --git a/fluent-bit/plugins/in_tail/tail_scan_win32.c b/fluent-bit/plugins/in_tail/tail_scan_win32.c new file mode 100644 index 000000000..94733f065 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_scan_win32.c @@ -0,0 +1,245 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This file implements glob-like patch matching feature for Windows + * based on Win32 API. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_utils.h> + +#include <shlwapi.h> + +#include "tail.h" +#include "tail_file.h" +#include "tail_signal.h" +#include "tail_config.h" + +#include "win32.h" + +static int tail_is_excluded(char *path, struct flb_tail_config *ctx) +{ + struct mk_list *head; + struct flb_slist_entry *pattern; + + if (!ctx->exclude_list) { + return FLB_FALSE; + } + + mk_list_foreach(head, ctx->exclude_list) { + pattern = mk_list_entry(head, struct flb_slist_entry, _head); + if (PathMatchSpecA(path, pattern->str)) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +/* + * This function is a thin wrapper over flb_tail_file_append(), + * adding normalization and sanity checks on top of it. + */ +static int tail_register_file(const char *target, struct flb_tail_config *ctx, + time_t ts) +{ + int64_t mtime; + struct stat st; + char path[MAX_PATH]; + + if (_fullpath(path, target, MAX_PATH) == NULL) { + flb_plg_error(ctx->ins, "cannot get absolute path of %s", target); + return -1; + } + + if (stat(path, &st) != 0 || !S_ISREG(st.st_mode)) { + return -1; + } + + if (ctx->ignore_older > 0) { + mtime = flb_tail_stat_mtime(&st); + if (mtime > 0) { + if ((ts - ctx->ignore_older) > mtime) { + flb_plg_debug(ctx->ins, "excluded=%s (ignore_older)", + target); + return -1; + } + } + } + + if (tail_is_excluded(path, ctx) == FLB_TRUE) { + flb_plg_trace(ctx->ins, "skip '%s' (excluded)", path); + return -1; + } + + return flb_tail_file_append(path, &st, FLB_TAIL_STATIC, ctx); +} + +/* + * Perform patern match on the given path string. This function + * supports patterns with "nested" wildcards like below. + * + * tail_scan_pattern("C:\fluent-bit\*\*.txt", ctx); + * + * On success, the number of files found is returned (zero indicates + * "no file found"). On error, -1 is returned. + */ +static int tail_scan_pattern(const char *path, struct flb_tail_config *ctx) +{ + char *star, *p0, *p1; + char pattern[MAX_PATH]; + char buf[MAX_PATH]; + int ret; + int n_added = 0; + time_t now; + int64_t mtime; + HANDLE h; + WIN32_FIND_DATA data; + + if (strlen(path) > MAX_PATH - 1) { + flb_plg_error(ctx->ins, "path too long '%s'"); + return -1; + } + + star = strchr(path, '*'); + if (star == NULL) { + return -1; + } + + /* + * C:\data\tmp\input_*.conf + * 0<-----| + */ + p0 = star; + while (path <= p0 && *p0 != '\\') { + p0--; + } + + /* + * C:\data\tmp\input_*.conf + * |---->1 + */ + p1 = star; + while (*p1 && *p1 != '\\') { + p1++; + } + + memcpy(pattern, path, (p1 - path)); + pattern[p1 - path] = '\0'; + + h = FindFirstFileA(pattern, &data); + if (h == INVALID_HANDLE_VALUE) { + return 0; /* none matched */ + } + + now = time(NULL); + do { + /* Ignore the current and parent dirs */ + if (!strcmp(".", data.cFileName) || !strcmp("..", data.cFileName)) { + continue; + } + + /* Avoid an infinite loop */ + if (strchr(data.cFileName, '*')) { + continue; + } + + /* Create a path (prefix + filename + suffix) */ + memcpy(buf, path, p0 - path + 1); + buf[p0 - path + 1] = '\0'; + + if (strlen(buf) + strlen(data.cFileName) + strlen(p1) > MAX_PATH - 1) { + flb_plg_warn(ctx->ins, "'%s%s%s' is too long", buf, data.cFileName, p1); + continue; + } + strcat(buf, data.cFileName); + strcat(buf, p1); + + if (strchr(p1, '*')) { + ret = tail_scan_pattern(buf, ctx); /* recursive */ + if (ret >= 0) { + n_added += ret; + } + continue; + } + + /* Try to register the target file */ + ret = tail_register_file(buf, ctx, now); + if (ret == 0) { + n_added++; + } + } while (FindNextFileA(h, &data) != 0); + + FindClose(h); + return n_added; +} + +static int tail_filepath(char *buf, int len, const char *basedir, const char *filename) +{ + char drive[_MAX_DRIVE]; + char dir[_MAX_DIR]; + char fname[_MAX_FNAME]; + char ext[_MAX_EXT]; + char tmp[MAX_PATH]; + int ret; + + ret = _splitpath_s(basedir, drive, _MAX_DRIVE, dir, _MAX_DIR, NULL, 0, NULL, 0); + if (ret) { + return -1; + } + + ret = _splitpath_s(filename, NULL, 0, NULL, 0, fname, _MAX_FNAME, ext, _MAX_EXT); + if (ret) { + return -1; + } + + ret = _makepath_s(tmp, MAX_PATH, drive, dir, fname, ext); + if (ret) { + return -1; + } + + if (_fullpath(buf, tmp, len) == NULL) { + return -1; + } + + return 0; +} + +static int tail_scan_path(const char *path, struct flb_tail_config *ctx) +{ + int ret; + int n_added = 0; + time_t now; + + if (strchr(path, '*')) { + return tail_scan_pattern(path, ctx); + } + + /* No wildcard involved. Let's just handle the file... */ + now = time(NULL); + ret = tail_register_file(path, ctx, now); + if (ret == 0) { + n_added++; + } + + return n_added; +} diff --git a/fluent-bit/plugins/in_tail/tail_signal.h b/fluent-bit/plugins/in_tail/tail_signal.h new file mode 100644 index 000000000..1a81fec64 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_signal.h @@ -0,0 +1,98 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_SIGNAL_H +#define FLB_TAIL_SIGNAL_H + +#include "tail_config.h" + +static inline int tail_signal_manager(struct flb_tail_config *ctx) +{ + int n; + uint64_t val = 0xc001; + + /* + * The number of signal reads might be less than the written signals, this + * means that some event is still pending in the queue. On that case we + * don't need to signal it again. + */ + if (ctx->ch_reads < ctx->ch_writes) { + return 1; + } + + /* Reset counters: prevent an overflow, unlikely..but let's keep safe */ + if (ctx->ch_reads == ctx->ch_writes) { + ctx->ch_reads = 0; + ctx->ch_writes = 0; + } + + /* Insert a dummy event into the channel manager */ + n = flb_pipe_w(ctx->ch_manager[1], (const char *) &val, sizeof(val)); + if (n == -1) { + flb_errno(); + return -1; + } + else { + ctx->ch_writes++; + } + + return n; +} + +static inline int tail_signal_pending(struct flb_tail_config *ctx) +{ + int n; + uint64_t val = 0xc002; + + /* Insert a dummy event into the 'pending' channel */ + n = flb_pipe_w(ctx->ch_pending[1], (const char *) &val, sizeof(val)); + + /* + * If we get EAGAIN, it simply means pending channel is full. As + * notification is already pending, it's safe to ignore. + */ + if (n == -1 && !FLB_PIPE_WOULDBLOCK()) { + flb_errno(); + return -1; + } + + return n; +} + +static inline int tail_consume_pending(struct flb_tail_config *ctx) +{ + int ret; + uint64_t val; + + /* + * We need to consume the pending bytes. Loop until we would have + * blocked (pipe is empty). + */ + do { + ret = flb_pipe_r(ctx->ch_pending[0], (char *) &val, sizeof(val)); + if (ret <= 0 && !FLB_PIPE_WOULDBLOCK()) { + flb_errno(); + return -1; + } + } while (!FLB_PIPE_WOULDBLOCK()); + + return 0; +} + +#endif diff --git a/fluent-bit/plugins/in_tail/tail_sql.h b/fluent-bit/plugins/in_tail/tail_sql.h new file mode 100644 index 000000000..855933a01 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_sql.h @@ -0,0 +1,65 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_SQL_H +#define FLB_TAIL_SQL_H + +/* + * In Fluent Bit we try to have a common convention for table names, + * if the table belong to an input/output plugin, use plugin name + * plus to what it's about, e.g: + * + * in_tail plugin table to track files: in_tail_files + */ +#define SQL_CREATE_FILES \ + "CREATE TABLE IF NOT EXISTS in_tail_files (" \ + " id INTEGER PRIMARY KEY," \ + " name TEXT NOT NULL," \ + " offset INTEGER," \ + " inode INTEGER," \ + " created INTEGER," \ + " rotated INTEGER DEFAULT 0" \ + ");" + +#define SQL_GET_FILE \ + "SELECT * from in_tail_files WHERE inode=@inode order by id desc;" + +#define SQL_INSERT_FILE \ + "INSERT INTO in_tail_files (name, offset, inode, created)" \ + " VALUES (@name, @offset, @inode, @created);" + +#define SQL_ROTATE_FILE \ + "UPDATE in_tail_files set name=@name,rotated=1 WHERE id=@id;" + +#define SQL_UPDATE_OFFSET \ + "UPDATE in_tail_files set offset=@offset WHERE id=@id;" + +#define SQL_DELETE_FILE \ + "DELETE FROM in_tail_files WHERE id=@id;" + +#define SQL_PRAGMA_SYNC \ + "PRAGMA synchronous=%i;" + +#define SQL_PRAGMA_JOURNAL_MODE \ + "PRAGMA journal_mode=%s;" + +#define SQL_PRAGMA_LOCKING_MODE \ + "PRAGMA locking_mode=EXCLUSIVE;" + +#endif diff --git a/fluent-bit/plugins/in_tail/win32.h b/fluent-bit/plugins/in_tail/win32.h new file mode 100644 index 000000000..a9414f892 --- /dev/null +++ b/fluent-bit/plugins/in_tail/win32.h @@ -0,0 +1,67 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This is the interface file that replaces POSIX functions + * with our own custom implementation. + */ + +#ifndef FLB_TAIL_WIN32_H +#define FLB_TAIL_WIN32_H + +#include "win32/interface.h" + +#undef open +#undef stat +#undef lstat +#undef fstat +#undef lseek + +#undef S_IFDIR +#undef S_IFCHR +#undef S_IFIFO +#undef S_IFREG +#undef S_IFLNK +#undef S_IFMT +#undef S_ISDIR +#undef S_ISCHR +#undef S_ISFIFO +#undef S_ISREG +#undef S_ISLNK + +#define open win32_open +#define stat win32_stat +#define lstat win32_lstat +#define fstat win32_fstat + +#define lseek _lseeki64 + +#define S_IFDIR WIN32_S_IFDIR +#define S_IFCHR WIN32_S_IFCHR +#define S_IFIFO WIN32_S_IFIFO +#define S_IFREG WIN32_S_IFREG +#define S_IFLNK WIN32_S_IFLNK +#define S_IFMT WIN32_S_IFMT + +#define S_ISDIR(m) (((m) & S_IFMT) == S_IFDIR) +#define S_ISCHR(m) (((m) & S_IFMT) == S_IFCHR) +#define S_ISIFO(m) (((m) & S_IFMT) == S_IFIFO) +#define S_ISREG(m) (((m) & S_IFMT) == S_IFREG) +#define S_ISLNK(m) (((m) & S_IFMT) == S_IFLNK) +#endif diff --git a/fluent-bit/plugins/in_tail/win32/interface.h b/fluent-bit/plugins/in_tail/win32/interface.h new file mode 100644 index 000000000..73b2ef233 --- /dev/null +++ b/fluent-bit/plugins/in_tail/win32/interface.h @@ -0,0 +1,44 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_TAIL_WIN32_INTERFACE_H +#define FLB_TAIL_WIN32_INTERFACE_H + +struct win32_stat { + uint64_t st_ino; + uint16_t st_mode; + int64_t st_mtime; + int16_t st_nlink; + int64_t st_size; +}; + +int win32_stat(const char *path, struct win32_stat *wst); +int win32_lstat(const char *path, struct win32_stat *wst); +int win32_fstat(int fd, struct win32_stat *wst); + +int win32_open(const char *path, int flags); + +#define WIN32_S_IFDIR 0x1000 +#define WIN32_S_IFCHR 0x2000 +#define WIN32_S_IFIFO 0x4000 +#define WIN32_S_IFREG 0x8000 +#define WIN32_S_IFLNK 0xc000 +#define WIN32_S_IFMT 0xf000 + +#endif diff --git a/fluent-bit/plugins/in_tail/win32/io.c b/fluent-bit/plugins/in_tail/win32/io.c new file mode 100644 index 000000000..45928b04a --- /dev/null +++ b/fluent-bit/plugins/in_tail/win32/io.c @@ -0,0 +1,47 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <Windows.h> +#include <stdlib.h> +#include <stdint.h> +#include <fcntl.h> +#include <io.h> +#include "interface.h" + +/* + * POSIX IO emulation tailored for in_tail's usage. + * + * open(2) that does not acquire an exclusive lock. + */ + +int win32_open(const char *path, int flags) +{ + HANDLE h; + h = CreateFileA(path, + GENERIC_READ, + FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE, + NULL, /* lpSecurityAttributes */ + OPEN_EXISTING, /* dwCreationDisposition */ + 0, /* dwFlagsAndAttributes */ + NULL); /* hTemplateFile */ + if (h == INVALID_HANDLE_VALUE) { + return -1; + } + return _open_osfhandle((intptr_t) h, _O_RDONLY); +} diff --git a/fluent-bit/plugins/in_tail/win32/stat.c b/fluent-bit/plugins/in_tail/win32/stat.c new file mode 100644 index 000000000..bce802749 --- /dev/null +++ b/fluent-bit/plugins/in_tail/win32/stat.c @@ -0,0 +1,332 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <Windows.h> +#include <stdlib.h> +#include <stdint.h> +#include <io.h> +#include "interface.h" + +/* + * NTFS stat(2) emulation tailored for in_tail's usage. + * + * (1) Support st_ino (inode) for Windows NTFS. + * (2) Support NTFS symlinks. + * (3) Support large files >= 2GB. + * + * To use it, include "win32.h" and it will transparently + * replace stat(), lstat() and fstat(). + */ + +#define UINT64(high, low) ((uint64_t) (high) << 32 | (low)) +#define WINDOWS_TICKS_TO_SECONDS_RATIO 10000000 +#define WINDOWS_EPOCH_TO_UNIX_EPOCH_DELTA 11644473600 + +/* + * FILETIME timestamps are represented in 100-nanosecond intervals, + * because of this, that's why we need to divide the number by 10000000 + * in order to convert it to seconds. + * + * While UNIX timestamps use January 1, 1970 as epoch Windows FILETIME + * timestamps use January 1, 1601. Because of this we need to subtract + * 11644473600 seconds to account for it. + * + * Note: Even though this does not account for leap seconds it should be + * accurate enough. + */ + +static uint64_t filetime_to_epoch(FILETIME *ft) +{ + ULARGE_INTEGER timestamp; + + if (ft == NULL) { + return 0; + } + + timestamp.HighPart = ft->dwHighDateTime; + timestamp.LowPart = ft->dwLowDateTime; + + timestamp.QuadPart /= WINDOWS_TICKS_TO_SECONDS_RATIO; + timestamp.QuadPart -= WINDOWS_EPOCH_TO_UNIX_EPOCH_DELTA; + + return timestamp.QuadPart; +} + +static void reset_errno() +{ + errno = 0; +} + +static void propagate_last_error_to_errno() +{ + DWORD error_code; + + error_code = GetLastError(); + + switch (error_code) { + case ERROR_INVALID_TARGET_HANDLE: + case ERROR_INVALID_HANDLE: + errno = EBADF; + break; + + case ERROR_TOO_MANY_OPEN_FILES: + errno = EMFILE; + break; + + case ERROR_INVALID_FLAG_NUMBER: + case ERROR_INVALID_PARAMETER: + errno = EINVAL; + break; + + case ERROR_NOT_ENOUGH_MEMORY: + case ERROR_OUTOFMEMORY: + errno = ENOMEM; + break; + + case ERROR_SHARING_VIOLATION: + case ERROR_LOCK_VIOLATION: + case ERROR_PATH_BUSY: + case ERROR_BUSY: + errno = EBUSY; + break; + + case ERROR_HANDLE_DISK_FULL: + case ERROR_DISK_FULL: + errno = ENOSPC; + break; + + case ERROR_INVALID_ADDRESS: + errno = EFAULT; + break; + + case ERROR_FILE_TOO_LARGE: + errno = EFBIG; + break; + + case ERROR_ALREADY_EXISTS: + case ERROR_FILE_EXISTS: + errno = EEXIST; + break; + + case ERROR_FILE_NOT_FOUND: + case ERROR_PATH_NOT_FOUND: + case ERROR_INVALID_DRIVE: + case ERROR_BAD_PATHNAME: + case ERROR_INVALID_NAME: + case ERROR_BAD_UNIT: + errno = ENOENT; + break; + + case ERROR_SEEK_ON_DEVICE: + case ERROR_NEGATIVE_SEEK: + errno = ESPIPE; + break; + + case ERROR_ACCESS_DENIED: + errno = EACCES; + break; + + case ERROR_DIR_NOT_EMPTY: + errno = ENOTEMPTY; + break; + + case ERROR_BROKEN_PIPE: + errno = EPIPE; + break; + + case ERROR_GEN_FAILURE: + errno = EIO; + break; + + case ERROR_OPEN_FAILED: + errno = EIO; + break; + + case ERROR_SUCCESS: + errno = 0; + break; + + default: + /* This is just a canary, if you find this + * error then it means we need to expand the + * translation list. + */ + + errno = EOWNERDEAD; + break; + } +} + +static int get_mode(unsigned int attr) +{ + if (attr & FILE_ATTRIBUTE_DIRECTORY) { + return WIN32_S_IFDIR; + } + return WIN32_S_IFREG; +} + + + +static int is_symlink(const char *path) +{ + WIN32_FIND_DATA data; + HANDLE h; + + SetLastError(0); + reset_errno(); + + h = FindFirstFileA(path, &data); + + if (h == INVALID_HANDLE_VALUE) { + propagate_last_error_to_errno(); + + return 0; + } + + FindClose(h); + + /* + * A NTFS symlink is a file with a bit of metadata ("reparse point"), + * So (1) check if the file has metadata and then (2) confirm that + * it is indeed a symlink. + */ + if (data.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) { + if (data.dwReserved0 == IO_REPARSE_TAG_SYMLINK) { + return 1; + } + } + + return 0; +} + +static int hstat(HANDLE h, struct win32_stat *wst) +{ + BY_HANDLE_FILE_INFORMATION info; + FILE_STANDARD_INFO std; + + SetLastError(0); + reset_errno(); + + if (!GetFileInformationByHandle(h, &info)) { + propagate_last_error_to_errno(); + + return -1; + } + + if (!GetFileInformationByHandleEx(h, FileStandardInfo, + &std, sizeof(std))) { + propagate_last_error_to_errno(); + + return -1; + } + + wst->st_nlink = std.NumberOfLinks; + if (std.DeletePending) { + wst->st_nlink = 0; + } + + wst->st_mode = get_mode(info.dwFileAttributes); + wst->st_size = UINT64(info.nFileSizeHigh, info.nFileSizeLow); + wst->st_ino = UINT64(info.nFileIndexHigh, info.nFileIndexLow); + wst->st_mtime = filetime_to_epoch(&info.ftLastWriteTime); + + return 0; +} + +int win32_stat(const char *path, struct win32_stat *wst) +{ + HANDLE h; + + SetLastError(0); + reset_errno(); + + h = CreateFileA(path, + GENERIC_READ, + FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE, + NULL, /* lpSecurityAttributes */ + OPEN_EXISTING, /* dwCreationDisposition */ + 0, /* dwFlagsAndAttributes */ + NULL); /* hTemplateFile */ + + if (h == INVALID_HANDLE_VALUE) { + propagate_last_error_to_errno(); + + return -1; + } + + if (hstat(h, wst)) { + CloseHandle(h); + return -1; + } + + CloseHandle(h); + return 0; +} + +int win32_lstat(const char *path, struct win32_stat *wst) +{ + HANDLE h; + + SetLastError(0); + reset_errno(); + + h = CreateFileA(path, + GENERIC_READ, + FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE, + NULL, /* lpSecurityAttributes */ + OPEN_EXISTING, /* dwCreationDisposition */ + FILE_FLAG_OPEN_REPARSE_POINT, + NULL); /* hTemplateFile */ + + if (h == INVALID_HANDLE_VALUE) { + propagate_last_error_to_errno(); + + return -1; + } + + if (hstat(h, wst)) { + CloseHandle(h); + return -1; + } + + if (is_symlink(path)) { + wst->st_mode = WIN32_S_IFLNK; + } + + CloseHandle(h); + return 0; +} + +int win32_fstat(int fd, struct win32_stat *wst) +{ + HANDLE h; + + SetLastError(0); + reset_errno(); + + h = (HANDLE) _get_osfhandle(fd); + + if (h == INVALID_HANDLE_VALUE) { + propagate_last_error_to_errno(); + + return -1; + } + + return hstat(h, wst); +} |