summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/in_tail
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:23 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:44 +0000
commit836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch)
tree1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/plugins/in_tail
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz
netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/in_tail')
-rw-r--r--fluent-bit/plugins/in_tail/CMakeLists.txt37
-rw-r--r--fluent-bit/plugins/in_tail/tail.c783
-rw-r--r--fluent-bit/plugins/in_tail/tail.h45
-rw-r--r--fluent-bit/plugins/in_tail/tail_config.c472
-rw-r--r--fluent-bit/plugins/in_tail/tail_config.h168
-rw-r--r--fluent-bit/plugins/in_tail/tail_db.c277
-rw-r--r--fluent-bit/plugins/in_tail/tail_db.h43
-rw-r--r--fluent-bit/plugins/in_tail/tail_dockermode.c459
-rw-r--r--fluent-bit/plugins/in_tail/tail_dockermode.h38
-rw-r--r--fluent-bit/plugins/in_tail/tail_file.c1860
-rw-r--r--fluent-bit/plugins/in_tail/tail_file.h137
-rw-r--r--fluent-bit/plugins/in_tail/tail_file_internal.h130
-rw-r--r--fluent-bit/plugins/in_tail/tail_fs.h96
-rw-r--r--fluent-bit/plugins/in_tail/tail_fs_inotify.c433
-rw-r--r--fluent-bit/plugins/in_tail/tail_fs_inotify.h37
-rw-r--r--fluent-bit/plugins/in_tail/tail_fs_stat.c253
-rw-r--r--fluent-bit/plugins/in_tail/tail_fs_stat.h37
-rw-r--r--fluent-bit/plugins/in_tail/tail_multiline.c606
-rw-r--r--fluent-bit/plugins/in_tail/tail_multiline.h57
-rw-r--r--fluent-bit/plugins/in_tail/tail_scan.c71
-rw-r--r--fluent-bit/plugins/in_tail/tail_scan.h29
-rw-r--r--fluent-bit/plugins/in_tail/tail_scan_glob.c278
-rw-r--r--fluent-bit/plugins/in_tail/tail_scan_win32.c245
-rw-r--r--fluent-bit/plugins/in_tail/tail_signal.h98
-rw-r--r--fluent-bit/plugins/in_tail/tail_sql.h65
-rw-r--r--fluent-bit/plugins/in_tail/win32.h67
-rw-r--r--fluent-bit/plugins/in_tail/win32/interface.h44
-rw-r--r--fluent-bit/plugins/in_tail/win32/io.c47
-rw-r--r--fluent-bit/plugins/in_tail/win32/stat.c332
29 files changed, 0 insertions, 7244 deletions
diff --git a/fluent-bit/plugins/in_tail/CMakeLists.txt b/fluent-bit/plugins/in_tail/CMakeLists.txt
deleted file mode 100644
index 31d865218..000000000
--- a/fluent-bit/plugins/in_tail/CMakeLists.txt
+++ /dev/null
@@ -1,37 +0,0 @@
-set(src
- tail_file.c
- tail_dockermode.c
- tail_scan.c
- tail_config.c
- tail_fs_stat.c
- tail.c)
-
-if(FLB_HAVE_INOTIFY)
-set(src
- ${src}
- tail_fs_inotify.c)
-endif()
-
-if(FLB_SQLDB)
-set(src
- ${src}
- tail_db.c)
-endif()
-
-if(FLB_PARSER)
- set(src
- ${src}
- tail_multiline.c
- )
-endif()
-
-if(MSVC)
- set(src
- ${src}
- win32/stat.c
- win32/io.c
- )
- FLB_PLUGIN(in_tail "${src}" "Shlwapi")
-else()
- FLB_PLUGIN(in_tail "${src}" "")
-endif()
diff --git a/fluent-bit/plugins/in_tail/tail.c b/fluent-bit/plugins/in_tail/tail.c
deleted file mode 100644
index 34a0fec3d..000000000
--- a/fluent-bit/plugins/in_tail/tail.c
+++ /dev/null
@@ -1,783 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_compat.h>
-#include <fluent-bit/flb_input.h>
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/flb_config.h>
-#include <fluent-bit/flb_config_map.h>
-#include <fluent-bit/flb_error.h>
-#include <fluent-bit/flb_utils.h>
-
-#include "tail.h"
-#include "tail_fs.h"
-#include "tail_db.h"
-#include "tail_file.h"
-#include "tail_scan.h"
-#include "tail_signal.h"
-#include "tail_config.h"
-#include "tail_dockermode.h"
-#include "tail_multiline.h"
-
-static inline int consume_byte(flb_pipefd_t fd)
-{
- int ret;
- uint64_t val;
-
- /* We need to consume the byte */
- ret = flb_pipe_r(fd, (char *) &val, sizeof(val));
- if (ret <= 0) {
- flb_errno();
- return -1;
- }
-
- return 0;
-}
-
-/* cb_collect callback */
-static int in_tail_collect_pending(struct flb_input_instance *ins,
- struct flb_config *config, void *in_context)
-{
- int ret;
- int active = 0;
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_tail_config *ctx = in_context;
- struct flb_tail_file *file;
- struct stat st;
- uint64_t pre;
- uint64_t total_processed = 0;
-
- /* Iterate promoted event files with pending bytes */
- mk_list_foreach_safe(head, tmp, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
-
- if (file->watch_fd == -1 ||
- (file->offset >= file->size)) {
- ret = fstat(file->fd, &st);
-
- if (ret == -1) {
- flb_errno();
- flb_tail_file_remove(file);
- continue;
- }
-
- file->size = st.st_size;
- file->pending_bytes = (file->size - file->offset);
- }
- else {
- memset(&st, 0, sizeof(struct stat));
- }
-
- if (file->pending_bytes <= 0) {
- file->pending_bytes = (file->size - file->offset);
- }
-
- if (file->pending_bytes <= 0) {
- continue;
- }
-
- if (ctx->event_batch_size > 0 &&
- total_processed >= ctx->event_batch_size) {
- break;
- }
-
- /* get initial offset to calculate the number of processed bytes later */
- pre = file->offset;
-
- ret = flb_tail_file_chunk(file);
-
- /* Update the total number of bytes processed */
- if (file->offset > pre) {
- total_processed += (file->offset - pre);
- }
-
- switch (ret) {
- case FLB_TAIL_ERROR:
- /* Could not longer read the file */
- flb_tail_file_remove(file);
- break;
- case FLB_TAIL_OK:
- case FLB_TAIL_BUSY:
- /*
- * Adjust counter to verify if we need a further read(2) later.
- * For more details refer to tail_fs_inotify.c:96.
- */
- if (file->offset < file->size) {
- file->pending_bytes = (file->size - file->offset);
- active++;
- }
- else {
- file->pending_bytes = 0;
- }
- break;
- }
- }
-
- /* If no more active files, consume pending signal so we don't get called again. */
- if (active == 0) {
- tail_consume_pending(ctx);
- }
-
- return 0;
-}
-
-/* cb_collect callback */
-static int in_tail_collect_static(struct flb_input_instance *ins,
- struct flb_config *config, void *in_context)
-{
- int ret;
- int active = 0;
- int pre_size;
- int pos_size;
- int alter_size = 0;
- int completed = FLB_FALSE;
- char s_size[32];
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_tail_config *ctx = in_context;
- struct flb_tail_file *file;
- uint64_t pre;
- uint64_t total_processed = 0;
-
- /* Do a data chunk collection for each file */
- mk_list_foreach_safe(head, tmp, &ctx->files_static) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
-
- /*
- * The list 'files_static' represents all the files that were discovered
- * on startup that already contains data: these are called 'static files'.
- *
- * When processing static files, we don't know what kind of content they
- * have and what kind of 'latency' might add to process all of them in
- * a row. Despite we always 'try' to do a full round and process a
- * fraction of them on every invocation of this function if we have a
- * huge number of files we will face latency and make the main pipeline
- * to degrade performance.
- *
- * In order to avoid this situation, we added a new option to the plugin
- * called 'static_batch_size' which basically defines how many bytes can
- * be processed on every invocation to process the static files.
- *
- * When the limit is reached, we just break the loop and as a side effect
- * we allow other events keep processing.
- */
- if (ctx->static_batch_size > 0 &&
- total_processed >= ctx->static_batch_size) {
- break;
- }
-
- /* get initial offset to calculate the number of processed bytes later */
- pre = file->offset;
-
- /* Process the file */
- ret = flb_tail_file_chunk(file);
-
- /* Update the total number of bytes processed */
- if (file->offset > pre) {
- total_processed += (file->offset - pre);
- }
-
- switch (ret) {
- case FLB_TAIL_ERROR:
- /* Could not longer read the file */
- flb_plg_debug(ctx->ins, "inode=%"PRIu64" collect static ERROR",
- file->inode);
- flb_tail_file_remove(file);
- break;
- case FLB_TAIL_OK:
- case FLB_TAIL_BUSY:
- active++;
- break;
- case FLB_TAIL_WAIT:
- if (file->config->exit_on_eof) {
- flb_plg_info(ctx->ins, "inode=%"PRIu64" file=%s ended, stop",
- file->inode, file->name);
- if (ctx->files_static_count == 1) {
- flb_engine_exit(config);
- }
- }
- /* Promote file to 'events' type handler */
- flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s promote to TAIL_EVENT",
- file->inode, file->name);
-
- /*
- * When promoting a file from 'static' to 'event' mode, the promoter
- * will check if the file has been rotated while it was being
- * processed on this function, if so, it will try to check for the
- * following condition:
- *
- * "discover a new possible file created due to rotation"
- *
- * If the condition above is met, a new file entry will be added to
- * the list that we are processing and a 'new signal' will be send
- * to the signal manager. But the signal manager will trigger the
- * message only if no pending messages exists (to avoid queue size
- * exhaustion).
- *
- * All good, but there is a corner case where if no 'active' files
- * exists, the signal will be read and this function will not be
- * called again and since the signal did not triggered the
- * message, the 'new file' enqueued by the nested function
- * might stay in stale mode (note that altering the length of this
- * list will not be reflected yet)
- *
- * To fix the corner case, we use a variable called 'alter_size'
- * that determinate if the size of the list keeps the same after
- * a rotation, so it means: a new file was added.
- *
- * We use 'alter_size' as a helper in the conditional below to know
- * when to stop processing the static list.
- */
- if (alter_size == 0) {
- pre_size = ctx->files_static_count;
- }
- ret = flb_tail_file_to_event(file);
- if (ret == -1) {
- flb_plg_debug(ctx->ins, "file=%s cannot promote, unregistering",
- file->name);
- flb_tail_file_remove(file);
- }
-
- if (alter_size == 0) {
- pos_size = ctx->files_static_count;
- if (pre_size == pos_size) {
- alter_size++;
- }
- }
- break;
- }
- }
-
- /*
- * If there are no more active static handlers, we consume the 'byte' that
- * triggered this event so this is not longer called again.
- */
- if (active == 0 && alter_size == 0) {
- consume_byte(ctx->ch_manager[0]);
- ctx->ch_reads++;
- completed = FLB_TRUE;
- }
-
- /* Debugging number of processed bytes */
- if (flb_log_check_level(ctx->ins->log_level, FLB_LOG_DEBUG)) {
- flb_utils_bytes_to_human_readable_size(total_processed,
- s_size, sizeof(s_size));
- if (completed) {
- flb_plg_debug(ctx->ins, "[static files] processed %s, done", s_size);
- }
- else {
- flb_plg_debug(ctx->ins, "[static files] processed %s", s_size);
- }
- }
-
- return 0;
-}
-
-static int in_tail_watcher_callback(struct flb_input_instance *ins,
- struct flb_config *config, void *context)
-{
- int ret = 0;
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_tail_config *ctx = context;
- struct flb_tail_file *file;
- (void) config;
-
- mk_list_foreach_safe(head, tmp, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- if (file->is_link == FLB_TRUE) {
- ret = flb_tail_file_is_rotated(ctx, file);
- if (ret == FLB_FALSE) {
- continue;
- }
-
- /* The symbolic link name has been rotated */
- flb_tail_file_rotated(file);
- }
- }
- return ret;
-}
-
-int in_tail_collect_event(void *file, struct flb_config *config)
-{
- int ret;
- struct stat st;
- struct flb_tail_file *f = file;
-
- ret = fstat(f->fd, &st);
- if (ret == -1) {
- flb_tail_file_remove(f);
- return 0;
- }
-
- ret = flb_tail_file_chunk(f);
- switch (ret) {
- case FLB_TAIL_ERROR:
- /* Could not longer read the file */
- flb_tail_file_remove(f);
- break;
- case FLB_TAIL_OK:
- case FLB_TAIL_WAIT:
- break;
- }
-
- return 0;
-}
-
-/* Initialize plugin */
-static int in_tail_init(struct flb_input_instance *in,
- struct flb_config *config, void *data)
-{
- int ret = -1;
- struct flb_tail_config *ctx = NULL;
-
- /* Allocate space for the configuration */
- ctx = flb_tail_config_create(in, config);
- if (!ctx) {
- return -1;
- }
- ctx->ins = in;
-
- /* Initialize file-system watcher */
- ret = flb_tail_fs_init(in, ctx, config);
- if (ret == -1) {
- flb_tail_config_destroy(ctx);
- return -1;
- }
-
- /* Scan path */
- flb_tail_scan(ctx->path_list, ctx);
-
- /*
- * After the first scan (on start time), all new files discovered needs to be
- * read from head, so we switch the 'read_from_head' flag to true so any
- * other file discovered after a scan or a rotation are read from the
- * beginning.
- */
- ctx->read_from_head = FLB_TRUE;
-
- /* Set plugin context */
- flb_input_set_context(in, ctx);
-
- /* Register an event collector */
- ret = flb_input_set_collector_event(in, in_tail_collect_static,
- ctx->ch_manager[0], config);
- if (ret == -1) {
- flb_tail_config_destroy(ctx);
- return -1;
- }
- ctx->coll_fd_static = ret;
-
- /* Register re-scan: time managed by 'refresh_interval' property */
- ret = flb_input_set_collector_time(in, flb_tail_scan_callback,
- ctx->refresh_interval_sec,
- ctx->refresh_interval_nsec,
- config);
- if (ret == -1) {
- flb_tail_config_destroy(ctx);
- return -1;
- }
- ctx->coll_fd_scan = ret;
-
- /* Register watcher, interval managed by 'watcher_interval' property */
- ret = flb_input_set_collector_time(in, in_tail_watcher_callback,
- ctx->watcher_interval, 0,
- config);
- if (ret == -1) {
- flb_tail_config_destroy(ctx);
- return -1;
- }
- ctx->coll_fd_watcher = ret;
-
- /* Register callback to purge rotated files */
- ret = flb_input_set_collector_time(in, flb_tail_file_purge,
- ctx->rotate_wait, 0,
- config);
- if (ret == -1) {
- flb_tail_config_destroy(ctx);
- return -1;
- }
- ctx->coll_fd_rotated = ret;
-
- /* Register callback to process pending bytes in promoted files */
- ret = flb_input_set_collector_event(in, in_tail_collect_pending,
- ctx->ch_pending[0], config);//1, 0, config);
- if (ret == -1) {
- flb_tail_config_destroy(ctx);
- return -1;
- }
- ctx->coll_fd_pending = ret;
-
-
- if (ctx->multiline == FLB_TRUE && ctx->parser) {
- ctx->parser = NULL;
- flb_plg_warn(in, "on multiline mode 'Parser' is not allowed "
- "(parser disabled)");
- }
-
- /* Register callback to process docker mode queued buffer */
- if (ctx->docker_mode == FLB_TRUE) {
- ret = flb_input_set_collector_time(in, flb_tail_dmode_pending_flush,
- ctx->docker_mode_flush, 0,
- config);
- if (ret == -1) {
- ctx->docker_mode = FLB_FALSE;
- flb_tail_config_destroy(ctx);
- return -1;
- }
- ctx->coll_fd_dmode_flush = ret;
- }
-
-#ifdef FLB_HAVE_PARSER
- /* Register callback to process multiline queued buffer */
- if (ctx->multiline == FLB_TRUE) {
- ret = flb_input_set_collector_time(in, flb_tail_mult_pending_flush,
- ctx->multiline_flush, 0,
- config);
- if (ret == -1) {
- ctx->multiline = FLB_FALSE;
- flb_tail_config_destroy(ctx);
- return -1;
- }
- ctx->coll_fd_mult_flush = ret;
- }
-#endif
-
- return 0;
-}
-
-/* Pre-run callback / before the event loop */
-static int in_tail_pre_run(struct flb_input_instance *ins,
- struct flb_config *config, void *in_context)
-{
- struct flb_tail_config *ctx = in_context;
- (void) ins;
-
- return tail_signal_manager(ctx);
-}
-
-static int in_tail_exit(void *data, struct flb_config *config)
-{
- (void) *config;
- struct flb_tail_config *ctx = data;
-
- flb_tail_file_remove_all(ctx);
- flb_tail_fs_exit(ctx);
- flb_tail_config_destroy(ctx);
-
- return 0;
-}
-
-static void in_tail_pause(void *data, struct flb_config *config)
-{
- struct flb_tail_config *ctx = data;
-
- /*
- * Pause general collectors:
- *
- * - static : static files lookup before promotion
- */
- flb_input_collector_pause(ctx->coll_fd_static, ctx->ins);
- flb_input_collector_pause(ctx->coll_fd_pending, ctx->ins);
-
- if (ctx->docker_mode == FLB_TRUE) {
- flb_input_collector_pause(ctx->coll_fd_dmode_flush, ctx->ins);
- if (config->is_ingestion_active == FLB_FALSE) {
- flb_plg_info(ctx->ins, "flushing pending docker mode data...");
- flb_tail_dmode_pending_flush_all(ctx);
- }
- }
-
- if (ctx->multiline == FLB_TRUE) {
- flb_input_collector_pause(ctx->coll_fd_mult_flush, ctx->ins);
- if (config->is_ingestion_active == FLB_FALSE) {
- flb_plg_info(ctx->ins, "flushing pending multiline data...");
- flb_tail_mult_pending_flush_all(ctx);
- }
- }
-
- /* Pause file system backend handlers */
- flb_tail_fs_pause(ctx);
-}
-
-static void in_tail_resume(void *data, struct flb_config *config)
-{
- struct flb_tail_config *ctx = data;
-
- flb_input_collector_resume(ctx->coll_fd_static, ctx->ins);
- flb_input_collector_resume(ctx->coll_fd_pending, ctx->ins);
-
- if (ctx->docker_mode == FLB_TRUE) {
- flb_input_collector_resume(ctx->coll_fd_dmode_flush, ctx->ins);
- }
-
- if (ctx->multiline == FLB_TRUE) {
- flb_input_collector_resume(ctx->coll_fd_mult_flush, ctx->ins);
- }
-
- /* Pause file system backend handlers */
- flb_tail_fs_resume(ctx);
-}
-
-/* Configuration properties map */
-static struct flb_config_map config_map[] = {
- {
- FLB_CONFIG_MAP_CLIST, "path", NULL,
- 0, FLB_TRUE, offsetof(struct flb_tail_config, path_list),
- "pattern specifying log files or multiple ones through "
- "the use of common wildcards."
- },
- {
- FLB_CONFIG_MAP_CLIST, "exclude_path", NULL,
- 0, FLB_TRUE, offsetof(struct flb_tail_config, exclude_list),
- "Set one or multiple shell patterns separated by commas to exclude "
- "files matching a certain criteria, e.g: 'exclude_path *.gz,*.zip'"
- },
- {
- FLB_CONFIG_MAP_STR, "key", "log",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, key),
- "when a message is unstructured (no parser applied), it's appended "
- "as a string under the key name log. This option allows to define an "
- "alternative name for that key."
- },
- {
- FLB_CONFIG_MAP_BOOL, "read_from_head", "false",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, read_from_head),
- "For new discovered files on start (without a database offset/position), read the "
- "content from the head of the file, not tail."
- },
- {
- FLB_CONFIG_MAP_STR, "refresh_interval", "60",
- 0, FLB_FALSE, 0,
- "interval to refresh the list of watched files expressed in seconds."
- },
- {
- FLB_CONFIG_MAP_TIME, "watcher_interval", "2s",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, watcher_interval),
- },
- {
- FLB_CONFIG_MAP_TIME, "progress_check_interval", "2s",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, progress_check_interval),
- },
- {
- FLB_CONFIG_MAP_INT, "progress_check_interval_nsec", "0",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, progress_check_interval_nsec),
- },
- {
- FLB_CONFIG_MAP_TIME, "rotate_wait", FLB_TAIL_ROTATE_WAIT,
- 0, FLB_TRUE, offsetof(struct flb_tail_config, rotate_wait),
- "specify the number of extra time in seconds to monitor a file once is "
- "rotated in case some pending data is flushed."
- },
- {
- FLB_CONFIG_MAP_BOOL, "docker_mode", "false",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, docker_mode),
- "If enabled, the plugin will recombine split Docker log lines before "
- "passing them to any parser as configured above. This mode cannot be "
- "used at the same time as Multiline."
- },
- {
- FLB_CONFIG_MAP_INT, "docker_mode_flush", "4",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, docker_mode_flush),
- "wait period time in seconds to flush queued unfinished split lines."
-
- },
-#ifdef FLB_HAVE_REGEX
- {
- FLB_CONFIG_MAP_STR, "docker_mode_parser", NULL,
- 0, FLB_FALSE, 0,
- "specify the parser name to fetch log first line for muliline log"
- },
-#endif
- {
- FLB_CONFIG_MAP_STR, "path_key", NULL,
- 0, FLB_TRUE, offsetof(struct flb_tail_config, path_key),
- "set the 'key' name where the name of monitored file will be appended."
- },
- {
- FLB_CONFIG_MAP_STR, "offset_key", NULL,
- 0, FLB_TRUE, offsetof(struct flb_tail_config, offset_key),
- "set the 'key' name where the offset of monitored file will be appended."
- },
- {
- FLB_CONFIG_MAP_TIME, "ignore_older", "0",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, ignore_older),
- "ignore files older than 'ignore_older'. Supports m,h,d (minutes, "
- "hours, days) syntax. Default behavior is to read all the files."
- },
- {
- FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", FLB_TAIL_CHUNK,
- 0, FLB_TRUE, offsetof(struct flb_tail_config, buf_chunk_size),
- "set the initial buffer size to read data from files. This value is "
- "used too to increase buffer size."
- },
- {
- FLB_CONFIG_MAP_SIZE, "buffer_max_size", FLB_TAIL_CHUNK,
- 0, FLB_TRUE, offsetof(struct flb_tail_config, buf_max_size),
- "set the limit of the buffer size per monitored file. When a buffer "
- "needs to be increased (e.g: very long lines), this value is used to "
- "restrict how much the memory buffer can grow. If reading a file exceed "
- "this limit, the file is removed from the monitored file list."
- },
- {
- FLB_CONFIG_MAP_SIZE, "static_batch_size", FLB_TAIL_STATIC_BATCH_SIZE,
- 0, FLB_TRUE, offsetof(struct flb_tail_config, static_batch_size),
- "On start, Fluent Bit might process files which already contains data, "
- "these files are called 'static' files. The configuration property "
- "in question set's the maximum number of bytes to process per iteration "
- "for the static files monitored."
- },
- {
- FLB_CONFIG_MAP_SIZE, "event_batch_size", FLB_TAIL_EVENT_BATCH_SIZE,
- 0, FLB_TRUE, offsetof(struct flb_tail_config, event_batch_size),
- "When Fluent Bit is processing files in event based mode the amount of"
- "data available for consumption could be too much and cause the input plugin "
- "to over extend and smother other plugins"
- "The configuration property sets the maximum number of bytes to process per iteration "
- "for the files monitored (in event mode)."
- },
- {
- FLB_CONFIG_MAP_BOOL, "skip_long_lines", "false",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, skip_long_lines),
- "if a monitored file reach it buffer capacity due to a very long line "
- "(buffer_max_size), the default behavior is to stop monitoring that "
- "file. This option alter that behavior and instruct Fluent Bit to skip "
- "long lines and continue processing other lines that fits into the buffer."
- },
- {
- FLB_CONFIG_MAP_BOOL, "exit_on_eof", "false",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, exit_on_eof),
- "exit Fluent Bit when reaching EOF on a monitored file."
- },
-
- {
- FLB_CONFIG_MAP_BOOL, "skip_empty_lines", "false",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, skip_empty_lines),
- "Allows to skip empty lines."
- },
-
-#ifdef FLB_HAVE_INOTIFY
- {
- FLB_CONFIG_MAP_BOOL, "inotify_watcher", "true",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, inotify_watcher),
- "set to false to use file stat watcher instead of inotify."
- },
-#endif
-#ifdef FLB_HAVE_REGEX
- {
- FLB_CONFIG_MAP_STR, "parser", NULL,
- 0, FLB_FALSE, 0,
- "specify the parser name to process an unstructured message."
- },
- {
- FLB_CONFIG_MAP_STR, "tag_regex", NULL,
- 0, FLB_FALSE, 0,
- "set a regex to extract fields from the file name and use them later to "
- "compose the Tag."
- },
-#endif
-
-#ifdef FLB_HAVE_SQLDB
- {
- FLB_CONFIG_MAP_STR, "db", NULL,
- 0, FLB_FALSE, 0,
- "set a database file to keep track of monitored files and it offsets."
- },
- {
- FLB_CONFIG_MAP_STR, "db.sync", "normal",
- 0, FLB_FALSE, 0,
- "set a database sync method. values: extra, full, normal and off."
- },
- {
- FLB_CONFIG_MAP_BOOL, "db.locking", "false",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, db_locking),
- "set exclusive locking mode, increase performance but don't allow "
- "external connections to the database file."
- },
- {
- FLB_CONFIG_MAP_STR, "db.journal_mode", "WAL",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, db_journal_mode),
- "Option to provide WAL configuration for Work Ahead Logging mechanism (WAL). Enabling WAL "
- "provides higher performance. Note that WAL is not compatible with "
- "shared network file systems."
- },
-#endif
-
- /* Multiline Options */
-#ifdef FLB_HAVE_PARSER
- {
- FLB_CONFIG_MAP_BOOL, "multiline", "false",
- 0, FLB_TRUE, offsetof(struct flb_tail_config, multiline),
- "if enabled, the plugin will try to discover multiline messages and use "
- "the proper parsers to compose the outgoing messages. Note that when this "
- "option is enabled the Parser option is not used."
- },
- {
- FLB_CONFIG_MAP_TIME, "multiline_flush", FLB_TAIL_MULT_FLUSH,
- 0, FLB_TRUE, offsetof(struct flb_tail_config, multiline_flush),
- "wait period time in seconds to process queued multiline messages."
- },
- {
- FLB_CONFIG_MAP_STR, "parser_firstline", NULL,
- 0, FLB_FALSE, 0,
- "name of the parser that matches the beginning of a multiline message. "
- "Note that the regular expression defined in the parser must include a "
- "group name (named capture)."
- },
- {
- FLB_CONFIG_MAP_STR_PREFIX, "parser_", NULL,
- 0, FLB_FALSE, 0,
- "optional extra parser to interpret and structure multiline entries. This "
- "option can be used to define multiple parsers, e.g: Parser_1 ab1, "
- "Parser_2 ab2, Parser_N abN."
- },
-
- /* Multiline Core Engine based API */
- {
- FLB_CONFIG_MAP_CLIST, "multiline.parser", NULL,
- FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_tail_config, multiline_parsers),
- "specify one or multiple multiline parsers: docker, cri, go, java, etc."
- },
-#endif
-
- /* EOF */
- {0}
-};
-
-struct flb_input_plugin in_tail_plugin = {
- .name = "tail",
- .description = "Tail files",
- .cb_init = in_tail_init,
- .cb_pre_run = in_tail_pre_run,
- .cb_collect = NULL,
- .cb_flush_buf = NULL,
- .cb_pause = in_tail_pause,
- .cb_resume = in_tail_resume,
- .cb_exit = in_tail_exit,
- .config_map = config_map,
- .flags = 0
-};
diff --git a/fluent-bit/plugins/in_tail/tail.h b/fluent-bit/plugins/in_tail/tail.h
deleted file mode 100644
index 074f7a49f..000000000
--- a/fluent-bit/plugins/in_tail/tail.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_H
-#define FLB_TAIL_H
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input.h>
-
-/* Internal return values */
-#define FLB_TAIL_ERROR -1
-#define FLB_TAIL_OK 0
-#define FLB_TAIL_WAIT 1
-#define FLB_TAIL_BUSY 2
-
-/* Consuming mode */
-#define FLB_TAIL_STATIC 0 /* Data is being consumed through read(2) */
-#define FLB_TAIL_EVENT 1 /* Data is being consumed through inotify */
-
-/* Config */
-#define FLB_TAIL_CHUNK "32768" /* buffer chunk = 32KB */
-#define FLB_TAIL_REFRESH 60 /* refresh every 60 seconds */
-#define FLB_TAIL_ROTATE_WAIT "5" /* time to monitor after rotation */
-#define FLB_TAIL_STATIC_BATCH_SIZE "50M" /* static batch size */
-#define FLB_TAIL_EVENT_BATCH_SIZE "50M" /* event batch size */
-
-int in_tail_collect_event(void *file, struct flb_config *config);
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_config.c b/fluent-bit/plugins/in_tail/tail_config.c
deleted file mode 100644
index 360b3dbea..000000000
--- a/fluent-bit/plugins/in_tail/tail_config.c
+++ /dev/null
@@ -1,472 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/multiline/flb_ml.h>
-#include <fluent-bit/multiline/flb_ml_parser.h>
-
-#include <stdlib.h>
-#include <fcntl.h>
-
-#include "tail_fs.h"
-#include "tail_db.h"
-#include "tail_config.h"
-#include "tail_scan.h"
-#include "tail_sql.h"
-#include "tail_dockermode.h"
-
-#ifdef FLB_HAVE_PARSER
-#include "tail_multiline.h"
-#endif
-
-static int multiline_load_parsers(struct flb_tail_config *ctx)
-{
- struct mk_list *head;
- struct mk_list *head_p;
- struct flb_config_map_val *mv;
- struct flb_slist_entry *val = NULL;
- struct flb_ml_parser_ins *parser_i;
-
- if (!ctx->multiline_parsers) {
- return 0;
- }
-
- /* Create Multiline context using the plugin instance name */
- ctx->ml_ctx = flb_ml_create(ctx->config, ctx->ins->name);
- if (!ctx->ml_ctx) {
- return -1;
- }
-
- /*
- * Iterate all 'multiline.parser' entries. Every entry is considered
- * a group which can have multiple multiline parser instances.
- */
- flb_config_map_foreach(head, mv, ctx->multiline_parsers) {
- mk_list_foreach(head_p, mv->val.list) {
- val = mk_list_entry(head_p, struct flb_slist_entry, _head);
-
- /* Create an instance of the defined parser */
- parser_i = flb_ml_parser_instance_create(ctx->ml_ctx, val->str);
- if (!parser_i) {
- return -1;
- }
- }
- }
-
- return 0;
-}
-
-struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
- struct flb_config *config)
-{
- int ret;
- int sec;
- int i;
- long nsec;
- const char *tmp;
- struct flb_tail_config *ctx;
-
- ctx = flb_calloc(1, sizeof(struct flb_tail_config));
- if (!ctx) {
- flb_errno();
- return NULL;
- }
- ctx->config = config;
- ctx->ins = ins;
- ctx->ignore_older = 0;
- ctx->skip_long_lines = FLB_FALSE;
-#ifdef FLB_HAVE_SQLDB
- ctx->db_sync = 1; /* sqlite sync 'normal' */
-#endif
-
- /* Load the config map */
- ret = flb_input_config_map_set(ins, (void *) ctx);
- if (ret == -1) {
- flb_free(ctx);
- return NULL;
- }
-
- /* Create the channel manager */
- ret = flb_pipe_create(ctx->ch_manager);
- if (ret == -1) {
- flb_errno();
- flb_free(ctx);
- return NULL;
- }
- ctx->ch_reads = 0;
- ctx->ch_writes = 0;
-
- /* Create the pending channel */
- ret = flb_pipe_create(ctx->ch_pending);
- if (ret == -1) {
- flb_errno();
- flb_tail_config_destroy(ctx);
- return NULL;
- }
- /* Make pending channel non-blocking */
- for (i = 0; i <= 1; i++) {
- ret = flb_pipe_set_nonblocking(ctx->ch_pending[i]);
- if (ret == -1) {
- flb_errno();
- flb_tail_config_destroy(ctx);
- return NULL;
- }
- }
-
- /* Config: path/pattern to read files */
- if (!ctx->path_list || mk_list_size(ctx->path_list) == 0) {
- flb_plg_error(ctx->ins, "no input 'path' was given");
- flb_tail_config_destroy(ctx);
- return NULL;
- }
-
- /* Config: seconds interval before to re-scan the path */
- tmp = flb_input_get_property("refresh_interval", ins);
- if (!tmp) {
- ctx->refresh_interval_sec = FLB_TAIL_REFRESH;
- ctx->refresh_interval_nsec = 0;
- }
- else {
- ret = flb_utils_time_split(tmp, &sec, &nsec);
- if (ret == 0) {
- ctx->refresh_interval_sec = sec;
- ctx->refresh_interval_nsec = nsec;
-
- if (sec == 0 && nsec == 0) {
- flb_plg_error(ctx->ins, "invalid 'refresh_interval' config "
- "value (%s)", tmp);
- flb_free(ctx);
- return NULL;
- }
-
- if (sec == 0 && nsec <= 1000000) {
- flb_plg_warn(ctx->ins, "very low refresh_interval "
- "(%i.%lu nanoseconds) might cause high CPU usage",
- sec, nsec);
- }
- }
- else {
- flb_plg_error(ctx->ins,
- "invalid 'refresh_interval' config value (%s)",
- tmp);
- flb_tail_config_destroy(ctx);
- return NULL;
- }
- }
-
- /* Config: seconds interval to monitor file after rotation */
- if (ctx->rotate_wait <= 0) {
- flb_plg_error(ctx->ins, "invalid 'rotate_wait' config value");
- flb_free(ctx);
- return NULL;
- }
-
-#ifdef FLB_HAVE_PARSER
- /* Config: multi-line support */
- if (ctx->multiline == FLB_TRUE) {
- ret = flb_tail_mult_create(ctx, ins, config);
- if (ret == -1) {
- flb_tail_config_destroy(ctx);
- return NULL;
- }
- }
-#endif
-
- /* Config: Docker mode */
- if(ctx->docker_mode == FLB_TRUE) {
- ret = flb_tail_dmode_create(ctx, ins, config);
- if (ret == -1) {
- flb_tail_config_destroy(ctx);
- return NULL;
- }
- }
-
- /* Validate buffer limit */
- if (ctx->buf_chunk_size > ctx->buf_max_size) {
- flb_plg_error(ctx->ins, "buffer_max_size must be >= buffer_chunk");
- flb_free(ctx);
- return NULL;
- }
-
-#ifdef FLB_HAVE_REGEX
- /* Parser / Format */
- tmp = flb_input_get_property("parser", ins);
- if (tmp) {
- ctx->parser = flb_parser_get(tmp, config);
- if (!ctx->parser) {
- flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp);
- }
- }
-#endif
-
- mk_list_init(&ctx->files_static);
- mk_list_init(&ctx->files_event);
- mk_list_init(&ctx->files_rotated);
-
- /* hash table for files lookups */
- ctx->static_hash = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1000, 0);
- if (!ctx->static_hash) {
- flb_plg_error(ctx->ins, "could not create static hash");
- flb_tail_config_destroy(ctx);
- return NULL;
- }
-
- ctx->event_hash = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1000, 0);
- if (!ctx->event_hash) {
- flb_plg_error(ctx->ins, "could not create event hash");
- flb_tail_config_destroy(ctx);
- return NULL;
- }
-
-#ifdef FLB_HAVE_SQLDB
- ctx->db = NULL;
-#endif
-
-#ifdef FLB_HAVE_REGEX
- tmp = flb_input_get_property("tag_regex", ins);
- if (tmp) {
- ctx->tag_regex = flb_regex_create(tmp);
- if (ctx->tag_regex) {
- ctx->dynamic_tag = FLB_TRUE;
- }
- else {
- flb_plg_error(ctx->ins, "invalid 'tag_regex' config value");
- }
- }
- else {
- ctx->tag_regex = NULL;
- }
-#endif
-
- /* Check if it should use dynamic tags */
- tmp = strchr(ins->tag, '*');
- if (tmp) {
- ctx->dynamic_tag = FLB_TRUE;
- }
-
-#ifdef FLB_HAVE_SQLDB
- /* Database options (needs to be set before the context) */
- tmp = flb_input_get_property("db.sync", ins);
- if (tmp) {
- if (strcasecmp(tmp, "extra") == 0) {
- ctx->db_sync = 3;
- }
- else if (strcasecmp(tmp, "full") == 0) {
- ctx->db_sync = 2;
- }
- else if (strcasecmp(tmp, "normal") == 0) {
- ctx->db_sync = 1;
- }
- else if (strcasecmp(tmp, "off") == 0) {
- ctx->db_sync = 0;
- }
- else {
- flb_plg_error(ctx->ins, "invalid database 'db.sync' value");
- }
- }
-
- /* Initialize database */
- tmp = flb_input_get_property("db", ins);
- if (tmp) {
- ctx->db = flb_tail_db_open(tmp, ins, ctx, config);
- if (!ctx->db) {
- flb_plg_error(ctx->ins, "could not open/create database");
- flb_tail_config_destroy(ctx);
- return NULL;
- }
- }
-
- /* Journal mode check */
- tmp = flb_input_get_property("db.journal_mode", ins);
- if (tmp) {
- if (strcasecmp(tmp, "DELETE") != 0 &&
- strcasecmp(tmp, "TRUNCATE") != 0 &&
- strcasecmp(tmp, "PERSIST") != 0 &&
- strcasecmp(tmp, "MEMORY") != 0 &&
- strcasecmp(tmp, "WAL") != 0 &&
- strcasecmp(tmp, "OFF") != 0) {
-
- flb_plg_error(ctx->ins, "invalid db.journal_mode=%s", tmp);
- flb_tail_config_destroy(ctx);
- return NULL;
- }
- }
-
- /* Prepare Statement */
- if (ctx->db) {
- /* SQL_GET_FILE */
- ret = sqlite3_prepare_v2(ctx->db->handler,
- SQL_GET_FILE,
- -1,
- &ctx->stmt_get_file,
- 0);
- if (ret != SQLITE_OK) {
- flb_plg_error(ctx->ins, "error preparing database SQL statement");
- flb_tail_config_destroy(ctx);
- return NULL;
- }
-
- /* SQL_INSERT_FILE */
- ret = sqlite3_prepare_v2(ctx->db->handler,
- SQL_INSERT_FILE,
- -1,
- &ctx->stmt_insert_file,
- 0);
- if (ret != SQLITE_OK) {
- flb_plg_error(ctx->ins, "error preparing database SQL statement");
- flb_tail_config_destroy(ctx);
- return NULL;
- }
-
- /* SQL_ROTATE_FILE */
- ret = sqlite3_prepare_v2(ctx->db->handler,
- SQL_ROTATE_FILE,
- -1,
- &ctx->stmt_rotate_file,
- 0);
- if (ret != SQLITE_OK) {
- flb_plg_error(ctx->ins, "error preparing database SQL statement");
- flb_tail_config_destroy(ctx);
- return NULL;
- }
-
- /* SQL_UPDATE_OFFSET */
- ret = sqlite3_prepare_v2(ctx->db->handler,
- SQL_UPDATE_OFFSET,
- -1,
- &ctx->stmt_offset,
- 0);
- if (ret != SQLITE_OK) {
- flb_plg_error(ctx->ins, "error preparing database SQL statement");
- flb_tail_config_destroy(ctx);
- return NULL;
- }
-
- /* SQL_DELETE_FILE */
- ret = sqlite3_prepare_v2(ctx->db->handler,
- SQL_DELETE_FILE,
- -1,
- &ctx->stmt_delete_file,
- 0);
- if (ret != SQLITE_OK) {
- flb_plg_error(ctx->ins, "error preparing database SQL statement");
- flb_tail_config_destroy(ctx);
- return NULL;
- }
-
- }
-#endif
-
-#ifdef FLB_HAVE_PARSER
- /* Multiline core API */
- if (ctx->multiline_parsers && mk_list_size(ctx->multiline_parsers) > 0) {
- ret = multiline_load_parsers(ctx);
- if (ret != 0) {
- flb_plg_error(ctx->ins, "could not load multiline parsers");
- flb_tail_config_destroy(ctx);
- return NULL;
- }
-
- /* Enable auto-flush routine */
- ret = flb_ml_auto_flush_init(ctx->ml_ctx);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "could not start multiline auto-flush");
- flb_tail_config_destroy(ctx);
- return NULL;
- }
- flb_plg_info(ctx->ins, "multiline core started");
- }
-#endif
-
-#ifdef FLB_HAVE_METRICS
- ctx->cmt_files_opened = cmt_counter_create(ins->cmt,
- "fluentbit", "input",
- "files_opened_total",
- "Total number of opened files",
- 1, (char *[]) {"name"});
-
- ctx->cmt_files_closed = cmt_counter_create(ins->cmt,
- "fluentbit", "input",
- "files_closed_total",
- "Total number of closed files",
- 1, (char *[]) {"name"});
-
- ctx->cmt_files_rotated = cmt_counter_create(ins->cmt,
- "fluentbit", "input",
- "files_rotated_total",
- "Total number of rotated files",
- 1, (char *[]) {"name"});
-
- /* OLD metrics */
- flb_metrics_add(FLB_TAIL_METRIC_F_OPENED,
- "files_opened", ctx->ins->metrics);
- flb_metrics_add(FLB_TAIL_METRIC_F_CLOSED,
- "files_closed", ctx->ins->metrics);
- flb_metrics_add(FLB_TAIL_METRIC_F_ROTATED,
- "files_rotated", ctx->ins->metrics);
-#endif
-
- return ctx;
-}
-
-int flb_tail_config_destroy(struct flb_tail_config *config)
-{
-
-#ifdef FLB_HAVE_PARSER
- flb_tail_mult_destroy(config);
-
- if (config->ml_ctx) {
- flb_ml_destroy(config->ml_ctx);
- }
-#endif
-
- /* Close pipe ends */
- flb_pipe_close(config->ch_manager[0]);
- flb_pipe_close(config->ch_manager[1]);
- flb_pipe_close(config->ch_pending[0]);
- flb_pipe_close(config->ch_pending[1]);
-
-#ifdef FLB_HAVE_REGEX
- if (config->tag_regex) {
- flb_regex_destroy(config->tag_regex);
- }
-#endif
-
-#ifdef FLB_HAVE_SQLDB
- if (config->db != NULL) {
- sqlite3_finalize(config->stmt_get_file);
- sqlite3_finalize(config->stmt_insert_file);
- sqlite3_finalize(config->stmt_delete_file);
- sqlite3_finalize(config->stmt_rotate_file);
- sqlite3_finalize(config->stmt_offset);
- flb_tail_db_close(config->db);
- }
-#endif
-
- if (config->static_hash) {
- flb_hash_table_destroy(config->static_hash);
- }
- if (config->event_hash) {
- flb_hash_table_destroy(config->event_hash);
- }
-
- flb_free(config);
- return 0;
-}
diff --git a/fluent-bit/plugins/in_tail/tail_config.h b/fluent-bit/plugins/in_tail/tail_config.h
deleted file mode 100644
index dcfa54e02..000000000
--- a/fluent-bit/plugins/in_tail/tail_config.h
+++ /dev/null
@@ -1,168 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_CONFIG_H
-#define FLB_TAIL_CONFIG_H
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input.h>
-#include <fluent-bit/flb_parser.h>
-#include <fluent-bit/flb_macros.h>
-#include <fluent-bit/flb_sqldb.h>
-#include <fluent-bit/flb_metrics.h>
-#include <fluent-bit/flb_log_event.h>
-#ifdef FLB_HAVE_REGEX
-#include <fluent-bit/flb_regex.h>
-#endif
-#ifdef FLB_HAVE_PARSER
-#include <fluent-bit/multiline/flb_ml.h>
-#endif
-
-#include <xxhash.h>
-
-/* Metrics */
-#ifdef FLB_HAVE_METRICS
-#define FLB_TAIL_METRIC_F_OPENED 100 /* number of opened files */
-#define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */
-#define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */
-#endif
-
-struct flb_tail_config {
- int fd_notify; /* inotify fd */
- flb_pipefd_t ch_manager[2]; /* pipe: channel manager */
- flb_pipefd_t ch_pending[2]; /* pipe: pending events */
- int ch_reads; /* count number if signal reads */
- int ch_writes; /* count number of signal writes */
-
- /* Buffer Config */
- size_t buf_chunk_size; /* allocation chunks */
- size_t buf_max_size; /* max size of a buffer */
-
- /* Static files processor */
- size_t static_batch_size;
-
- /* Event files processor */
- size_t event_batch_size;
-
- /* Collectors */
- int coll_fd_static;
- int coll_fd_scan;
- int coll_fd_watcher;
- int coll_fd_rotated;
- int coll_fd_pending;
- int coll_fd_inactive;
- int coll_fd_dmode_flush;
- int coll_fd_mult_flush;
- int coll_fd_progress_check;
-
- /* Backend collectors */
- int coll_fd_fs1; /* used by fs_inotify & fs_stat */
- int coll_fd_fs2; /* only used by fs_stat */
-
- /* Configuration */
- int dynamic_tag; /* dynamic tag ? e.g: abc.* */
-#ifdef FLB_HAVE_REGEX
- struct flb_regex *tag_regex;/* path to tag regex */
-#endif
- int refresh_interval_sec; /* seconds to re-scan */
- long refresh_interval_nsec;/* nanoseconds to re-scan */
- int read_from_head; /* read new files from head */
- int rotate_wait; /* sec to wait on rotated files */
- int watcher_interval; /* watcher interval */
- int ignore_older; /* ignore fields older than X seconds */
- time_t last_pending; /* last time a 'pending signal' was emitted' */
- struct mk_list *path_list; /* list of paths to scan (glob) */
- flb_sds_t path_key; /* key name of file path */
- flb_sds_t key; /* key for unstructured record */
- int skip_long_lines; /* skip long lines */
- int skip_empty_lines; /* skip empty lines (off) */
- int exit_on_eof; /* exit fluent-bit on EOF, test */
-
- int progress_check_interval; /* watcher interval */
- int progress_check_interval_nsec; /* watcher interval */
-
-#ifdef FLB_HAVE_INOTIFY
- int inotify_watcher; /* enable/disable inotify monitor */
-#endif
- flb_sds_t offset_key; /* key name of file offset */
-
- /* Database */
-#ifdef FLB_HAVE_SQLDB
- struct flb_sqldb *db;
- int db_sync;
- int db_locking;
- flb_sds_t db_journal_mode;
- sqlite3_stmt *stmt_get_file;
- sqlite3_stmt *stmt_insert_file;
- sqlite3_stmt *stmt_delete_file;
- sqlite3_stmt *stmt_rotate_file;
- sqlite3_stmt *stmt_offset;
-#endif
-
- /* Parser / Format */
- struct flb_parser *parser;
-
- /* Multiline */
- int multiline; /* multiline enabled ? */
- int multiline_flush; /* multiline flush/wait */
- struct flb_parser *mult_parser_firstline;
- struct mk_list mult_parsers;
-
- /* Docker mode */
- int docker_mode; /* Docker mode enabled ? */
- int docker_mode_flush; /* Docker mode flush/wait */
- struct flb_parser *docker_mode_parser; /* Parser for separate multiline logs */
-
- /* Multiline core engine */
- struct flb_ml *ml_ctx;
- struct mk_list *multiline_parsers;
-
- uint64_t files_static_count; /* number of items in the static file list */
- struct mk_list files_static;
- struct mk_list files_event;
-
- /* List of rotated files that needs to be removed after 'rotate_wait' */
- struct mk_list files_rotated;
-
- /* List of shell patterns used to exclude certain file names */
- struct mk_list *exclude_list;
-
- /* Plugin input instance */
- struct flb_input_instance *ins;
-
- struct flb_log_event_encoder log_event_encoder;
- struct flb_log_event_decoder log_event_decoder;
-
- /* Metrics */
- struct cmt_counter *cmt_files_opened;
- struct cmt_counter *cmt_files_closed;
- struct cmt_counter *cmt_files_rotated;
-
- /* Hash: hash tables for quick acess to registered files */
- struct flb_hash_table *static_hash;
- struct flb_hash_table *event_hash;
-
- struct flb_config *config;
-};
-
-struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
- struct flb_config *config);
-int flb_tail_config_destroy(struct flb_tail_config *config);
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_db.c b/fluent-bit/plugins/in_tail/tail_db.c
deleted file mode 100644
index 664963b6d..000000000
--- a/fluent-bit/plugins/in_tail/tail_db.c
+++ /dev/null
@@ -1,277 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/flb_sqldb.h>
-
-#include "tail_db.h"
-#include "tail_sql.h"
-#include "tail_file.h"
-
-struct query_status {
- int id;
- int rows;
- int64_t offset;
-};
-
-/* Open or create database required by tail plugin */
-struct flb_sqldb *flb_tail_db_open(const char *path,
- struct flb_input_instance *in,
- struct flb_tail_config *ctx,
- struct flb_config *config)
-{
- int ret;
- char tmp[64];
- struct flb_sqldb *db;
-
- /* Open/create the database */
- db = flb_sqldb_open(path, in->name, config);
- if (!db) {
- return NULL;
- }
-
- /* Create table schema if it don't exists */
- ret = flb_sqldb_query(db, SQL_CREATE_FILES, NULL, NULL);
- if (ret != FLB_OK) {
- flb_plg_error(ctx->ins, "db: could not create 'in_tail_files' table");
- flb_sqldb_close(db);
- return NULL;
- }
-
- if (ctx->db_sync >= 0) {
- snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC,
- ctx->db_sync);
- ret = flb_sqldb_query(db, tmp, NULL, NULL);
- if (ret != FLB_OK) {
- flb_plg_error(ctx->ins, "db could not set pragma 'sync'");
- flb_sqldb_close(db);
- return NULL;
- }
- }
-
- if (ctx->db_locking == FLB_TRUE) {
- ret = flb_sqldb_query(db, SQL_PRAGMA_LOCKING_MODE, NULL, NULL);
- if (ret != FLB_OK) {
- flb_plg_error(ctx->ins, "db: could not set pragma 'locking_mode'");
- flb_sqldb_close(db);
- return NULL;
- }
- }
-
- if (ctx->db_journal_mode) {
- snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_JOURNAL_MODE,
- ctx->db_journal_mode);
- ret = flb_sqldb_query(db, tmp, NULL, NULL);
- if (ret != FLB_OK) {
- flb_plg_error(ctx->ins, "db could not set pragma 'journal_mode'");
- flb_sqldb_close(db);
- return NULL;
- }
- }
-
- return db;
-}
-
-int flb_tail_db_close(struct flb_sqldb *db)
-{
- flb_sqldb_close(db);
- return 0;
-}
-
-/*
- * Check if an file inode exists in the database. Return FLB_TRUE or
- * FLB_FALSE
- */
-static int db_file_exists(struct flb_tail_file *file,
- struct flb_tail_config *ctx,
- uint64_t *id, uint64_t *inode, off_t *offset)
-{
- int ret;
- int exists = FLB_FALSE;
-
- /* Bind parameters */
- sqlite3_bind_int64(ctx->stmt_get_file, 1, file->inode);
- ret = sqlite3_step(ctx->stmt_get_file);
-
- if (ret == SQLITE_ROW) {
- exists = FLB_TRUE;
-
- /* id: column 0 */
- *id = sqlite3_column_int64(ctx->stmt_get_file, 0);
-
- /* offset: column 2 */
- *offset = sqlite3_column_int64(ctx->stmt_get_file, 2);
-
- /* inode: column 3 */
- *inode = sqlite3_column_int64(ctx->stmt_get_file, 3);
- }
- else if (ret == SQLITE_DONE) {
- /* all good */
- }
- else {
- exists = -1;
- }
-
- sqlite3_clear_bindings(ctx->stmt_get_file);
- sqlite3_reset(ctx->stmt_get_file);
-
- return exists;
-
-}
-
-static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ctx)
-
-{
- int ret;
- time_t created;
-
- /* Register the file */
- created = time(NULL);
-
- /* Bind parameters */
- sqlite3_bind_text(ctx->stmt_insert_file, 1, file->name, -1, 0);
- sqlite3_bind_int64(ctx->stmt_insert_file, 2, file->offset);
- sqlite3_bind_int64(ctx->stmt_insert_file, 3, file->inode);
- sqlite3_bind_int64(ctx->stmt_insert_file, 4, created);
-
- /* Run the insert */
- ret = sqlite3_step(ctx->stmt_insert_file);
- if (ret != SQLITE_DONE) {
- sqlite3_clear_bindings(ctx->stmt_insert_file);
- sqlite3_reset(ctx->stmt_insert_file);
- flb_plg_error(ctx->ins, "cannot execute insert file %s inode=%lu",
- file->name, file->inode);
- return -1;
- }
-
- sqlite3_clear_bindings(ctx->stmt_insert_file);
- sqlite3_reset(ctx->stmt_insert_file);
-
- /* Get the database ID for this file */
- return flb_sqldb_last_id(ctx->db);
-}
-
-int flb_tail_db_file_set(struct flb_tail_file *file,
- struct flb_tail_config *ctx)
-{
- int ret;
- uint64_t id = 0;
- off_t offset = 0;
- uint64_t inode = 0;
-
- /* Check if the file exists */
- ret = db_file_exists(file, ctx, &id, &inode, &offset);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "cannot execute query to check inode: %lu",
- file->inode);
- return -1;
- }
-
- if (ret == FLB_FALSE) {
- /* Get the database ID for this file */
- file->db_id = db_file_insert(file, ctx);
- }
- else {
- file->db_id = id;
- file->offset = offset;
- }
-
- return 0;
-}
-
-/* Update Offset v2 */
-int flb_tail_db_file_offset(struct flb_tail_file *file,
- struct flb_tail_config *ctx)
-{
- int ret;
-
- /* Bind parameters */
- sqlite3_bind_int64(ctx->stmt_offset, 1, file->offset);
- sqlite3_bind_int64(ctx->stmt_offset, 2, file->db_id);
-
- ret = sqlite3_step(ctx->stmt_offset);
-
- if (ret != SQLITE_DONE) {
- sqlite3_clear_bindings(ctx->stmt_offset);
- sqlite3_reset(ctx->stmt_offset);
- return -1;
- }
-
- /* Verify number of updated rows */
- ret = sqlite3_changes(ctx->db->handler);
- if (ret == 0) {
- /*
- * 'someone' like you 'the reader' or another user has deleted the database
- * entry, just restore it.
- */
- file->db_id = db_file_insert(file, ctx);
- }
-
- sqlite3_clear_bindings(ctx->stmt_offset);
- sqlite3_reset(ctx->stmt_offset);
-
- return 0;
-}
-
-/* Mark a file as rotated v2 */
-int flb_tail_db_file_rotate(const char *new_name,
- struct flb_tail_file *file,
- struct flb_tail_config *ctx)
-{
- int ret;
-
- /* Bind parameters */
- sqlite3_bind_text(ctx->stmt_rotate_file, 1, new_name, -1, 0);
- sqlite3_bind_int64(ctx->stmt_rotate_file, 2, file->db_id);
-
- ret = sqlite3_step(ctx->stmt_rotate_file);
-
- sqlite3_clear_bindings(ctx->stmt_rotate_file);
- sqlite3_reset(ctx->stmt_rotate_file);
-
- if (ret != SQLITE_DONE) {
- return -1;
- }
-
- return 0;
-}
-
-/* Delete file entry from the database */
-int flb_tail_db_file_delete(struct flb_tail_file *file,
- struct flb_tail_config *ctx)
-{
- int ret;
-
- /* Bind parameters */
- sqlite3_bind_int64(ctx->stmt_delete_file, 1, file->db_id);
- ret = sqlite3_step(ctx->stmt_delete_file);
-
- sqlite3_clear_bindings(ctx->stmt_delete_file);
- sqlite3_reset(ctx->stmt_delete_file);
-
- if (ret != SQLITE_DONE) {
- flb_plg_error(ctx->ins, "db: error deleting entry from database: %s",
- file->name);
- return -1;
- }
-
- flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name);
- return 0;
-}
diff --git a/fluent-bit/plugins/in_tail/tail_db.h b/fluent-bit/plugins/in_tail/tail_db.h
deleted file mode 100644
index 7b5355d22..000000000
--- a/fluent-bit/plugins/in_tail/tail_db.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_DB_H
-#define FLB_TAIL_DB_H
-
-#include <fluent-bit/flb_input.h>
-#include <fluent-bit/flb_sqldb.h>
-
-#include "tail_file.h"
-
-struct flb_sqldb *flb_tail_db_open(const char *path,
- struct flb_input_instance *in,
- struct flb_tail_config *ctx,
- struct flb_config *config);
-
-int flb_tail_db_close(struct flb_sqldb *db);
-int flb_tail_db_file_set(struct flb_tail_file *file,
- struct flb_tail_config *ctx);
-int flb_tail_db_file_offset(struct flb_tail_file *file,
- struct flb_tail_config *ctx);
-int flb_tail_db_file_rotate(const char *new_name,
- struct flb_tail_file *file,
- struct flb_tail_config *ctx);
-int flb_tail_db_file_delete(struct flb_tail_file *file,
- struct flb_tail_config *ctx);
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_dockermode.c b/fluent-bit/plugins/in_tail/tail_dockermode.c
deleted file mode 100644
index a8f20f9cd..000000000
--- a/fluent-bit/plugins/in_tail/tail_dockermode.c
+++ /dev/null
@@ -1,459 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_unescape.h>
-
-#include "tail_config.h"
-#include "tail_dockermode.h"
-#include "tail_file_internal.h"
-
-int flb_tail_dmode_create(struct flb_tail_config *ctx,
- struct flb_input_instance *ins,
- struct flb_config *config)
-{
- const char *tmp;
-
- if (ctx->multiline == FLB_TRUE) {
- flb_plg_error(ctx->ins, "Docker mode cannot be enabled when multiline "
- "is enabled");
- return -1;
- }
-
-#ifdef FLB_HAVE_REGEX
- /* First line Parser */
- tmp = flb_input_get_property("docker_mode_parser", ins);
- if (tmp) {
- ctx->docker_mode_parser = flb_parser_get(tmp, config);
- if (!ctx->docker_mode_parser) {
- flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp);
- }
- }
- else {
- ctx->docker_mode_parser = NULL;
- }
-#endif
-
- tmp = flb_input_get_property("docker_mode_flush", ins);
- if (!tmp) {
- ctx->docker_mode_flush = FLB_TAIL_DMODE_FLUSH;
- }
- else {
- ctx->docker_mode_flush = atoi(tmp);
- if (ctx->docker_mode_flush <= 0) {
- ctx->docker_mode_flush = 1;
- }
- }
-
- return 0;
-}
-
-static int modify_json_cond(char *js, size_t js_len,
- char **val, size_t *val_len,
- char **out, size_t *out_len,
- int cond(char*, size_t),
- int mod(char*, size_t, char**, size_t*, void*), void *data)
-{
- int ret;
- struct flb_pack_state state;
- jsmntok_t *t;
- jsmntok_t *t_val = NULL;
- int i;
- int i_root = -1;
- int i_key = -1;
- char *old_val;
- size_t old_val_len;
- char *new_val = NULL;
- size_t new_val_len = 0;
- size_t mod_len;
-
- ret = flb_pack_state_init(&state);
- if (ret != 0) {
- ret = -1;
- goto modify_json_cond_end;
- }
-
- ret = flb_json_tokenise(js, js_len, &state);
- if (ret != 0 || state.tokens_count == 0) {
- ret = -1;
- goto modify_json_cond_end;
- }
-
- for (i = 0; i < state.tokens_count; i++) {
- t = &state.tokens[i];
-
- if (i_key >= 0) {
- if (t->parent == i_key) {
- if (t->type == JSMN_STRING) {
- t_val = t;
- }
- break;
- }
- continue;
- }
-
- if (t->start == 0 && t->parent == -1 && t->type == JSMN_OBJECT) {
- i_root = i;
- continue;
- }
- if (i_root == -1) {
- continue;
- }
-
- if (t->parent == i_root && t->type == JSMN_STRING && t->end - t->start == 3 && strncmp(js + t->start, "log", 3) == 0) {
- i_key = i;
- }
- }
-
- if (!t_val) {
- ret = -1;
- goto modify_json_cond_end;
- }
-
- *out = js;
- *out_len = js_len;
-
- if (val) {
- *val = js + t_val->start;
- }
- if (val_len) {
- *val_len = t_val->end - t_val->start;
- }
-
- if (!cond || cond(js + t_val->start, t_val->end - t_val->start)) {
- old_val = js + t_val->start;
- old_val_len = t_val->end - t_val->start;
- ret = mod(old_val, old_val_len, &new_val, &new_val_len, data);
- if (ret != 0) {
- ret = -1;
- goto modify_json_cond_end;
- }
-
- ret = 1;
-
- if (new_val == old_val) {
- goto modify_json_cond_end;
- }
-
- mod_len = js_len + new_val_len - old_val_len;
- *out = flb_malloc(mod_len);
- if (!*out) {
- flb_errno();
- flb_free(new_val);
- ret = -1;
- goto modify_json_cond_end;
- }
- *out_len = mod_len;
-
- memcpy(*out, js, t_val->start);
- memcpy(*out + t_val->start, new_val, new_val_len);
- memcpy(*out + t_val->start + new_val_len, js + t_val->end, js_len - t_val->end);
-
- flb_free(new_val);
- }
-
- modify_json_cond_end:
- flb_pack_state_reset(&state);
- if (ret < 0) {
- *out = NULL;
- }
- return ret;
-}
-
-static int unesc_ends_with_nl(char *str, size_t len)
-{
- char* unesc;
- int unesc_len;
- int nl;
-
- unesc = flb_malloc(len + 1);
- if (!unesc) {
- flb_errno();
- return FLB_FALSE;
- }
- unesc_len = flb_unescape_string(str, len, &unesc);
- nl = unesc[unesc_len - 1] == '\n';
- flb_free(unesc);
- return nl;
-}
-
-static int prepend_sds_to_str(char *str, size_t len, char **out, size_t *out_len, void *data)
-{
- flb_sds_t sds = data;
-
- if (flb_sds_len(sds) == 0) {
- *out = str;
- *out_len = len;
- return 0;
- }
-
- size_t mod_len = flb_sds_len(sds) + len;
- *out = flb_malloc(mod_len);
- if (!*out) {
- flb_errno();
- return -1;
- }
- *out_len = mod_len;
-
- memcpy(*out, sds, flb_sds_len(sds));
- memcpy(*out + flb_sds_len(sds), str, len);
- return 0;
-}
-
-static int use_sds(char *str, size_t len, char **out, size_t *out_len, void *data)
-{
- flb_sds_t sds = data;
- size_t mod_len = flb_sds_len(sds);
- *out = flb_malloc(mod_len);
- if (!*out) {
- flb_errno();
- return -1;
- }
- *out_len = mod_len;
-
- memcpy(*out, sds, flb_sds_len(sds));
- return 0;
-}
-
-int flb_tail_dmode_process_content(time_t now,
- char* line, size_t line_len,
- char **repl_line, size_t *repl_line_len,
- struct flb_tail_file *file,
- struct flb_tail_config *ctx)
-{
- char* val = NULL;
- size_t val_len;
- int ret;
- void *out_buf = NULL;
- size_t out_size;
- struct flb_time out_time = {0};
- *repl_line = NULL;
- *repl_line_len = 0;
- flb_sds_t tmp;
- flb_sds_t tmp_copy;
-
-#ifdef FLB_HAVE_REGEX
- if (ctx->docker_mode_parser) {
- ret = flb_parser_do(ctx->docker_mode_parser, line, line_len,
- &out_buf, &out_size, &out_time);
- flb_free(out_buf);
-
- /*
- * Set dmode_firstline if the line meets the first-line requirement
- */
- if(ret >= 0) {
- file->dmode_firstline = true;
- }
-
- /*
- * Process if buffer contains full log line
- */
- if (flb_sds_len(file->dmode_lastline) > 0 && file->dmode_complete) {
- /*
- * Buffered log should be flushed out
- * as current line meets first-line requirement
- */
- if(ret >= 0) {
- flb_tail_dmode_flush(file, ctx);
- }
-
- /*
- * Flush the buffer if multiline has not been detected yet
- */
- if (!file->dmode_firstline) {
- flb_tail_dmode_flush(file, ctx);
- }
- }
- }
-#endif
-
- ret = modify_json_cond(line, line_len,
- &val, &val_len,
- repl_line, repl_line_len,
- unesc_ends_with_nl,
- prepend_sds_to_str, file->dmode_buf);
- if (ret >= 0) {
- /* line is a valid json */
- flb_sds_len_set(file->dmode_lastline, 0);
-
- /* concatenate current log line with buffered one */
- tmp = flb_sds_cat(file->dmode_buf, val, val_len);
- if (!tmp) {
- flb_errno();
- return -1;
- }
- file->dmode_buf = tmp;
-
- tmp_copy = flb_sds_copy(file->dmode_lastline, line, line_len);
- if (!tmp_copy) {
- flb_errno();
- return -1;
- }
-
- file->dmode_lastline = tmp_copy;
- file->dmode_flush_timeout = now + (ctx->docker_mode_flush - 1);
-
- if (ret == 0) {
- /* Line not ended with newline */
- file->dmode_complete = false;
- }
- else {
- /* Line ended with newline */
- file->dmode_complete = true;
-#ifdef FLB_HAVE_REGEX
- if (!ctx->docker_mode_parser) {
- flb_tail_dmode_flush(file, ctx);
- }
-#else
- flb_tail_dmode_flush(file, ctx);
-#endif
- }
- }
- return ret;
-}
-
-void flb_tail_dmode_flush(struct flb_tail_file *file, struct flb_tail_config *ctx)
-{
- int ret;
- char *repl_line = NULL;
- size_t repl_line_len = 0;
- void *out_buf = NULL;
- size_t out_size;
- struct flb_time out_time = {0};
- time_t now = time(NULL);
-
- if (flb_sds_len(file->dmode_lastline) == 0) {
- return;
- }
-
- flb_time_zero(&out_time);
-
- ret = modify_json_cond(file->dmode_lastline,
- flb_sds_len(file->dmode_lastline),
- NULL, NULL,
- &repl_line, &repl_line_len,
- NULL,
- use_sds, file->dmode_buf);
- if (ret < 0) {
- return;
- }
-
- flb_sds_len_set(file->dmode_buf, 0);
- flb_sds_len_set(file->dmode_lastline, 0);
- file->dmode_flush_timeout = 0;
-
-#ifdef FLB_HAVE_REGEX
- if (ctx->parser) {
- ret = flb_parser_do(ctx->parser, repl_line, repl_line_len,
- &out_buf, &out_size, &out_time);
- if (ret >= 0) {
- if (flb_time_to_double(&out_time) == 0) {
- flb_time_get(&out_time);
- }
- if (ctx->ignore_older > 0 && (now - ctx->ignore_older) > out_time.tm.tv_sec) {
- goto dmode_flush_end;
- }
-
- flb_tail_pack_line_map(&out_time, (char**) &out_buf, &out_size, file, 0);
-
- goto dmode_flush_end;
- }
- }
-#endif
-
- flb_tail_file_pack_line(NULL, repl_line, repl_line_len, file, 0);
-
- dmode_flush_end:
- flb_free(repl_line);
- flb_free(out_buf);
-}
-
-static void file_pending_flush(struct flb_tail_config *ctx,
- struct flb_tail_file *file, time_t now)
-{
- if (file->dmode_flush_timeout > now) {
- return;
- }
-
- if (flb_sds_len(file->dmode_lastline) == 0) {
- return;
- }
-
- flb_tail_dmode_flush(file, ctx);
-
- if (file->sl_log_event_encoder->output_length > 0) {
- flb_input_log_append(ctx->ins,
- file->tag_buf,
- file->tag_len,
- file->sl_log_event_encoder->output_buffer,
- file->sl_log_event_encoder->output_length);
-
- flb_log_event_encoder_reset(file->sl_log_event_encoder);
- }
-}
-
-int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx)
-{
- time_t expired;
- struct mk_list *head;
- struct flb_tail_file *file;
-
- expired = time(NULL) + 3600;
-
- /* Iterate promoted event files with pending bytes */
- mk_list_foreach(head, &ctx->files_static) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- file_pending_flush(ctx, file, expired);
- }
-
- /* Iterate promoted event files with pending bytes */
- mk_list_foreach(head, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- file_pending_flush(ctx, file, expired);
- }
-
- return 0;
-}
-
-int flb_tail_dmode_pending_flush(struct flb_input_instance *ins,
- struct flb_config *config, void *context)
-{
- time_t now;
- struct mk_list *head;
- struct flb_tail_file *file;
- struct flb_tail_config *ctx = context;
-
- now = time(NULL);
-
- /* Iterate static event files with pending bytes */
- mk_list_foreach(head, &ctx->files_static) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- file_pending_flush(ctx, file, now);
- }
-
- /* Iterate promoted event files with pending bytes */
- mk_list_foreach(head, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- file_pending_flush(ctx, file, now);
- }
-
- return 0;
-}
diff --git a/fluent-bit/plugins/in_tail/tail_dockermode.h b/fluent-bit/plugins/in_tail/tail_dockermode.h
deleted file mode 100644
index 50869ff62..000000000
--- a/fluent-bit/plugins/in_tail/tail_dockermode.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_DOCKERMODE_H
-#define FLB_TAIL_DOCKERMODE_H
-
-#include "tail_file.h"
-#define FLB_TAIL_DMODE_FLUSH 4
-
-int flb_tail_dmode_create(struct flb_tail_config *ctx,
- struct flb_input_instance *ins, struct flb_config *config);
-int flb_tail_dmode_process_content(time_t now,
- char* line, size_t line_len,
- char **repl_line, size_t *repl_line_len,
- struct flb_tail_file *file,
- struct flb_tail_config *ctx);
-void flb_tail_dmode_flush(struct flb_tail_file *file, struct flb_tail_config *ctx);
-int flb_tail_dmode_pending_flush(struct flb_input_instance *ins,
- struct flb_config *config, void *context);
-int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx);
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_file.c b/fluent-bit/plugins/in_tail/tail_file.c
deleted file mode 100644
index 2385f0626..000000000
--- a/fluent-bit/plugins/in_tail/tail_file.c
+++ /dev/null
@@ -1,1860 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <time.h>
-#ifdef FLB_SYSTEM_FREEBSD
-#include <sys/user.h>
-#include <libutil.h>
-#endif
-
-#include <fluent-bit/flb_compat.h>
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/flb_parser.h>
-#ifdef FLB_HAVE_REGEX
-#include <fluent-bit/flb_regex.h>
-#include <fluent-bit/flb_hash_table.h>
-#endif
-
-#include "tail.h"
-#include "tail_file.h"
-#include "tail_config.h"
-#include "tail_db.h"
-#include "tail_signal.h"
-#include "tail_dockermode.h"
-#include "tail_multiline.h"
-#include "tail_scan.h"
-
-#ifdef FLB_SYSTEM_WINDOWS
-#include "win32.h"
-#endif
-
-#include <cfl/cfl.h>
-
-static inline void consume_bytes(char *buf, int bytes, int length)
-{
- memmove(buf, buf + bytes, length - bytes);
-}
-
-static uint64_t stat_get_st_dev(struct stat *st)
-{
-#ifdef FLB_SYSTEM_WINDOWS
- /* do you want to contribute with a way to extract volume serial number ? */
- return 0;
-#else
- return st->st_dev;
-#endif
-}
-
-static int stat_to_hash_bits(struct flb_tail_config *ctx, struct stat *st,
- uint64_t *out_hash)
-{
- int len;
- uint64_t st_dev;
- char tmp[64];
-
- st_dev = stat_get_st_dev(st);
-
- len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 ":%" PRIu64,
- st_dev, (uint64_t)st->st_ino);
-
- *out_hash = cfl_hash_64bits(tmp, len);
- return 0;
-}
-
-static int stat_to_hash_key(struct flb_tail_config *ctx, struct stat *st,
- flb_sds_t *key)
-{
- uint64_t st_dev;
- flb_sds_t tmp;
- flb_sds_t buf;
-
- buf = flb_sds_create_size(64);
- if (!buf) {
- return -1;
- }
-
- st_dev = stat_get_st_dev(st);
- tmp = flb_sds_printf(&buf, "%" PRIu64 ":%" PRIu64,
- st_dev, (uint64_t)st->st_ino);
- if (!tmp) {
- flb_sds_destroy(buf);
- return -1;
- }
-
- *key = buf;
- return 0;
-}
-
-/* Append custom keys and report the number of records processed */
-static int record_append_custom_keys(struct flb_tail_file *file,
- char *in_data, size_t in_size,
- char **out_data, size_t *out_size)
-{
- int i;
- int ret;
- int records = 0;
- msgpack_object k;
- msgpack_object v;
- struct flb_log_event event;
- struct flb_tail_config *ctx;
- struct flb_log_event_encoder encoder;
- struct flb_log_event_decoder decoder;
-
- ctx = (struct flb_tail_config *) file->config;
-
- ret = flb_log_event_decoder_init(&decoder, in_data, in_size);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- return -1;
- }
-
- ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT);
-
- if (ret != FLB_EVENT_ENCODER_SUCCESS) {
- flb_log_event_decoder_destroy(&decoder);
-
- return -2;
- }
-
- while (flb_log_event_decoder_next(&decoder, &event) ==
- FLB_EVENT_DECODER_SUCCESS) {
-
- ret = flb_log_event_encoder_begin_record(&encoder);
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_timestamp(&encoder, &event.timestamp);
- }
-
- /* append previous map keys */
- for (i = 0; i < event.body->via.map.size; i++) {
- k = event.body->via.map.ptr[i].key;
- v = event.body->via.map.ptr[i].val;
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_append_body_msgpack_object(
- &encoder,
- &k);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_append_body_msgpack_object(
- &encoder,
- &v);
- }
- }
-
- /* path_key */
- if (ctx->path_key != NULL) {
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_append_body_cstring(
- &encoder,
- file->config->path_key);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_append_body_cstring(
- &encoder,
- file->orig_name);
- }
- }
-
- /* offset_key */
- if (ctx->offset_key != NULL) {
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_append_body_cstring(
- &encoder,
- file->config->offset_key);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_append_body_uint64(
- &encoder,
- file->offset +
- file->last_processed_bytes);
- }
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_commit_record(&encoder);
- }
- else {
- flb_plg_error(file->config->ins, "error packing event : %d", ret);
-
- flb_log_event_encoder_rollback_record(&encoder);
- }
-
- /* counter */
- records++;
- }
-
- *out_data = encoder.output_buffer;
- *out_size = encoder.output_length;
-
- /* This function transfers ownership of the internal memory allocated by
- * sbuffer using msgpack_sbuffer_release which means the caller is
- * responsible for releasing the memory.
- */
- flb_log_event_encoder_claim_internal_buffer_ownership(&encoder);
-
- flb_log_event_decoder_destroy(&decoder);
- flb_log_event_encoder_destroy(&encoder);
-
- return records;
-}
-
-static int flb_tail_repack_map(struct flb_log_event_encoder *encoder,
- char *data,
- size_t data_size)
-{
- msgpack_unpacked source_map;
- size_t offset;
- int result;
- size_t index;
- msgpack_object value;
- msgpack_object key;
-
- result = FLB_EVENT_ENCODER_SUCCESS;
-
- if (data_size > 0) {
- msgpack_unpacked_init(&source_map);
-
- offset = 0;
- result = msgpack_unpack_next(&source_map,
- data,
- data_size,
- &offset);
-
- if (result == MSGPACK_UNPACK_SUCCESS) {
- result = FLB_EVENT_ENCODER_SUCCESS;
- }
- else {
- result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE;
- }
-
- for (index = 0;
- index < source_map.data.via.map.size &&
- result == FLB_EVENT_ENCODER_SUCCESS;
- index++) {
- key = source_map.data.via.map.ptr[index].key;
- value = source_map.data.via.map.ptr[index].val;
-
- result = flb_log_event_encoder_append_body_msgpack_object(
- encoder,
- &key);
-
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_append_body_msgpack_object(
- encoder,
- &value);
- }
- }
-
- msgpack_unpacked_destroy(&source_map);
- }
-
- return result;
-}
-
-int flb_tail_pack_line_map(struct flb_time *time, char **data,
- size_t *data_size, struct flb_tail_file *file,
- size_t processed_bytes)
-{
- int result;
-
- result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder);
-
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_set_timestamp(
- file->sl_log_event_encoder, time);
- }
-
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_tail_repack_map(file->sl_log_event_encoder,
- *data,
- *data_size);
- }
-
- /* path_key */
- if (file->config->path_key != NULL) {
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_append_body_values(
- file->sl_log_event_encoder,
- FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key),
- FLB_LOG_EVENT_STRING_VALUE(file->orig_name,
- file->orig_name_len));
- }
- }
-
- /* offset_key */
- if (file->config->offset_key != NULL) {
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_append_body_values(
- file->sl_log_event_encoder,
- FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key),
- FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes));
- }
- }
-
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder);
- }
- else {
- flb_log_event_encoder_rollback_record(file->sl_log_event_encoder);
- }
-
- if (result != FLB_EVENT_ENCODER_SUCCESS) {
- flb_plg_error(file->config->ins, "error packing event");
-
- return -1;
- }
-
- return 0;
-}
-
-int flb_tail_file_pack_line(struct flb_time *time, char *data, size_t data_size,
- struct flb_tail_file *file, size_t processed_bytes)
-{
- int result;
-
- result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder);
-
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_set_timestamp(
- file->sl_log_event_encoder, time);
- }
-
- /* path_key */
- if (file->config->path_key != NULL) {
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_append_body_values(
- file->sl_log_event_encoder,
- FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key),
- FLB_LOG_EVENT_STRING_VALUE(file->orig_name,
- file->orig_name_len));
- }
- }
-
- /* offset_key */
- if (file->config->offset_key != NULL) {
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_append_body_values(
- file->sl_log_event_encoder,
- FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key),
- FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes));
- }
- }
-
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_append_body_values(
- file->sl_log_event_encoder,
- FLB_LOG_EVENT_CSTRING_VALUE(file->config->key),
- FLB_LOG_EVENT_STRING_VALUE(data,
- data_size));
- }
-
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder);
- }
-
- if (result != FLB_EVENT_ENCODER_SUCCESS) {
- flb_plg_error(file->config->ins, "error packing event : %d", result);
-
- return -1;
- }
-
- return 0;
-}
-
-static int ml_stream_buffer_append(struct flb_tail_file *file, char *buf_data, size_t buf_size)
-{
- int result;
-
- result = flb_log_event_encoder_emit_raw_record(
- file->ml_log_event_encoder,
- buf_data, buf_size);
-
- if (result != FLB_EVENT_ENCODER_SUCCESS) {
- flb_plg_error(file->config->ins,
- "log event raw append error : %d",
- result);
-
- return -1;
- }
-
- return 0;
-}
-
-static int ml_stream_buffer_flush(struct flb_tail_config *ctx, struct flb_tail_file *file)
-{
- if (file->ml_log_event_encoder->output_length > 0) {
- flb_input_log_append(ctx->ins,
- file->tag_buf,
- file->tag_len,
- file->ml_log_event_encoder->output_buffer,
- file->ml_log_event_encoder->output_length);
-
- flb_log_event_encoder_reset(file->ml_log_event_encoder);
- }
-
- return 0;
-}
-
-static int process_content(struct flb_tail_file *file, size_t *bytes)
-{
- size_t len;
- int lines = 0;
- int ret;
- size_t processed_bytes = 0;
- char *data;
- char *end;
- char *p;
- void *out_buf;
- size_t out_size;
- int crlf;
- char *line;
- size_t line_len;
- char *repl_line;
- size_t repl_line_len;
- time_t now = time(NULL);
- struct flb_time out_time = {0};
- struct flb_tail_config *ctx;
-
- ctx = (struct flb_tail_config *) file->config;
-
- /* Parse the data content */
- data = file->buf_data;
- end = data + file->buf_len;
-
- /* reset last processed bytes */
- file->last_processed_bytes = 0;
-
- /* Skip null characters from the head (sometimes introduced by copy-truncate log rotation) */
- while (data < end && *data == '\0') {
- data++;
- processed_bytes++;
- }
-
- while (data < end && (p = memchr(data, '\n', end - data))) {
- len = (p - data);
- crlf = 0;
- if (file->skip_next == FLB_TRUE) {
- data += len + 1;
- processed_bytes += len + 1;
- file->skip_next = FLB_FALSE;
- continue;
- }
-
- /*
- * Empty line (just breakline)
- * ---------------------------
- * [NOTE] with the new Multiline core feature and Multiline Filter on
- * Fluent Bit v1.8.2, there are a couple of cases where stack traces
- * or multi line patterns expects an empty line (meaning only the
- * breakline), skipping empty lines on this plugin will break that
- * functionality.
- *
- * We are introducing 'skip_empty_lines=off' configuration
- * property to revert this behavior if some user is affected by
- * this change.
- */
-
- if (len == 0 && ctx->skip_empty_lines) {
- data++;
- processed_bytes++;
- continue;
- }
-
- /* Process '\r\n' */
- if (len >= 2) {
- crlf = (data[len-1] == '\r');
- if (len == 1 && crlf) {
- data += 2;
- processed_bytes += 2;
- continue;
- }
- }
-
- /* Reset time for each line */
- flb_time_zero(&out_time);
-
- line = data;
- line_len = len - crlf;
- repl_line = NULL;
-
- if (ctx->ml_ctx) {
- ret = flb_ml_append_text(ctx->ml_ctx,
- file->ml_stream_id,
- &out_time,
- line,
- line_len);
- goto go_next;
- }
- else if (ctx->docker_mode) {
- ret = flb_tail_dmode_process_content(now, line, line_len,
- &repl_line, &repl_line_len,
- file, ctx);
- if (ret >= 0) {
- if (repl_line == line) {
- repl_line = NULL;
- }
- else {
- line = repl_line;
- line_len = repl_line_len;
- }
- /* Skip normal parsers flow */
- goto go_next;
- }
- else {
- flb_tail_dmode_flush(file, ctx);
- }
- }
-
-#ifdef FLB_HAVE_PARSER
- if (ctx->parser) {
- /* Common parser (non-multiline) */
- ret = flb_parser_do(ctx->parser, line, line_len,
- &out_buf, &out_size, &out_time);
- if (ret >= 0) {
- if (flb_time_to_nanosec(&out_time) == 0L) {
- flb_time_get(&out_time);
- }
-
- /* If multiline is enabled, flush any buffered data */
- if (ctx->multiline == FLB_TRUE) {
- flb_tail_mult_flush(file, ctx);
- }
-
- flb_tail_pack_line_map(&out_time,
- (char**) &out_buf, &out_size, file,
- processed_bytes);
-
- flb_free(out_buf);
- }
- else {
- /* Parser failed, pack raw text */
- flb_tail_file_pack_line(NULL, data, len, file, processed_bytes);
- }
- }
- else if (ctx->multiline == FLB_TRUE) {
- ret = flb_tail_mult_process_content(now,
- line, line_len,
- file, ctx, processed_bytes);
-
- /* No multiline */
- if (ret == FLB_TAIL_MULT_NA) {
- flb_tail_mult_flush(file, ctx);
-
- flb_tail_file_pack_line(NULL,
- line, line_len, file, processed_bytes);
- }
- else if (ret == FLB_TAIL_MULT_MORE) {
- /* we need more data, do nothing */
- goto go_next;
- }
- else if (ret == FLB_TAIL_MULT_DONE) {
- /* Finalized */
- }
- }
- else {
- flb_tail_file_pack_line(NULL,
- line, line_len, file, processed_bytes);
- }
-#else
- flb_tail_file_pack_line(NULL,
- line, line_len, file, processed_bytes);
-#endif
-
- go_next:
- flb_free(repl_line);
- repl_line = NULL;
- /* Adjust counters */
- data += len + 1;
- processed_bytes += len + 1;
- lines++;
- file->parsed = 0;
- file->last_processed_bytes += processed_bytes;
- }
- file->parsed = file->buf_len;
-
- if (lines > 0) {
- /* Append buffer content to a chunk */
- *bytes = processed_bytes;
-
- if (file->sl_log_event_encoder->output_length > 0) {
- flb_input_log_append_records(ctx->ins,
- lines,
- file->tag_buf,
- file->tag_len,
- file->sl_log_event_encoder->output_buffer,
- file->sl_log_event_encoder->output_length);
-
- flb_log_event_encoder_reset(file->sl_log_event_encoder);
- }
- }
- else if (file->skip_next) {
- *bytes = file->buf_len;
- }
- else {
- *bytes = processed_bytes;
- }
-
- if (ctx->ml_ctx) {
- ml_stream_buffer_flush(ctx, file);
- }
-
- return lines;
-}
-
-static inline void drop_bytes(char *buf, size_t len, int pos, int bytes)
-{
- memmove(buf + pos,
- buf + pos + bytes,
- len - pos - bytes);
-}
-
-#ifdef FLB_HAVE_REGEX
-static void cb_results(const char *name, const char *value,
- size_t vlen, void *data)
-{
- struct flb_hash_table *ht = data;
-
- if (vlen == 0) {
- return;
- }
-
- flb_hash_table_add(ht, name, strlen(name), (void *) value, vlen);
-}
-#endif
-
-#ifdef FLB_HAVE_REGEX
-static int tag_compose(char *tag, struct flb_regex *tag_regex, char *fname,
- char *out_buf, size_t *out_size,
- struct flb_tail_config *ctx)
-#else
-static int tag_compose(char *tag, char *fname, char *out_buf, size_t *out_size,
- struct flb_tail_config *ctx)
-#endif
-{
- int i;
- size_t len;
- char *p;
- size_t buf_s = 0;
-#ifdef FLB_HAVE_REGEX
- ssize_t n;
- struct flb_regex_search result;
- struct flb_hash_table *ht;
- char *beg;
- char *end;
- int ret;
- const char *tmp;
- size_t tmp_s;
-#endif
-
-#ifdef FLB_HAVE_REGEX
- if (tag_regex) {
- n = flb_regex_do(tag_regex, fname, strlen(fname), &result);
- if (n <= 0) {
- flb_plg_error(ctx->ins, "invalid tag_regex pattern for file %s",
- fname);
- return -1;
- }
- else {
- ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE,
- FLB_HASH_TABLE_SIZE, FLB_HASH_TABLE_SIZE);
- flb_regex_parse(tag_regex, &result, cb_results, ht);
-
- for (p = tag, beg = p; (beg = strchr(p, '<')); p = end + 2) {
- if (beg != p) {
- len = (beg - p);
- memcpy(out_buf + buf_s, p, len);
- buf_s += len;
- }
-
- beg++;
-
- end = strchr(beg, '>');
- if (end && !memchr(beg, '<', end - beg)) {
- end--;
-
- len = end - beg + 1;
- ret = flb_hash_table_get(ht, beg, len, (void *) &tmp, &tmp_s);
- if (ret != -1) {
- memcpy(out_buf + buf_s, tmp, tmp_s);
- buf_s += tmp_s;
- }
- else {
- memcpy(out_buf + buf_s, "_", 1);
- buf_s++;
- }
- }
- else {
- flb_plg_error(ctx->ins,
- "missing closing angle bracket in tag %s "
- "at position %lu", tag, beg - tag);
- flb_hash_table_destroy(ht);
- return -1;
- }
- }
-
- flb_hash_table_destroy(ht);
- if (*p) {
- len = strlen(p);
- memcpy(out_buf + buf_s, p, len);
- buf_s += len;
- }
- }
- }
- else {
-#endif
- p = strchr(tag, '*');
- if (!p) {
- return -1;
- }
-
- /* Copy tag prefix if any */
- len = (p - tag);
- if (len > 0) {
- memcpy(out_buf, tag, len);
- buf_s += len;
- }
-
- /* Append file name */
- len = strlen(fname);
- memcpy(out_buf + buf_s, fname, len);
- buf_s += len;
-
- /* Tag suffix (if any) */
- p++;
- if (*p) {
- len = strlen(tag);
- memcpy(out_buf + buf_s, p, (len - (p - tag)));
- buf_s += (len - (p - tag));
- }
-
- /* Sanitize buffer */
- for (i = 0; i < buf_s; i++) {
- if (out_buf[i] == '/' || out_buf[i] == '\\' || out_buf[i] == ':') {
- if (i > 0) {
- out_buf[i] = '.';
- }
- else {
- drop_bytes(out_buf, buf_s, i, 1);
- buf_s--;
- i--;
- }
- }
-
- if (i > 0 && out_buf[i] == '.') {
- if (out_buf[i - 1] == '.') {
- drop_bytes(out_buf, buf_s, i, 1);
- buf_s--;
- i--;
- }
- }
- else if (out_buf[i] == '*') {
- drop_bytes(out_buf, buf_s, i, 1);
- buf_s--;
- i--;
- }
- }
-
- /* Check for an ending '.' */
- if (out_buf[buf_s - 1] == '.') {
- drop_bytes(out_buf, buf_s, buf_s - 1, 1);
- buf_s--;
- }
-#ifdef FLB_HAVE_REGEX
- }
-#endif
-
- out_buf[buf_s] = '\0';
- *out_size = buf_s;
-
- return 0;
-}
-
-static inline int flb_tail_file_exists(struct stat *st,
- struct flb_tail_config *ctx)
-{
- int ret;
- uint64_t hash;
-
- ret = stat_to_hash_bits(ctx, st, &hash);
- if (ret != 0) {
- return -1;
- }
-
- /* static hash */
- if (flb_hash_table_exists(ctx->static_hash, hash)) {
- return FLB_TRUE;
- }
-
- /* event hash */
- if (flb_hash_table_exists(ctx->event_hash, hash)) {
- return FLB_TRUE;
- }
-
- return FLB_FALSE;
-}
-
-/*
- * Based in the configuration or database offset, set the proper 'offset' for the
- * file in question.
- */
-static int set_file_position(struct flb_tail_config *ctx,
- struct flb_tail_file *file)
-{
- int64_t ret;
-
-#ifdef FLB_HAVE_SQLDB
- /*
- * If the database option is enabled, try to gather the file position. The
- * database function updates the file->offset entry.
- */
- if (ctx->db) {
- ret = flb_tail_db_file_set(file, ctx);
- if (ret == 0) {
- if (file->offset > 0) {
- ret = lseek(file->fd, file->offset, SEEK_SET);
- if (ret == -1) {
- flb_errno();
- return -1;
- }
- }
- else if (ctx->read_from_head == FLB_FALSE) {
- ret = lseek(file->fd, 0, SEEK_END);
- if (ret == -1) {
- flb_errno();
- return -1;
- }
- file->offset = ret;
- flb_tail_db_file_offset(file, ctx);
- }
- return 0;
- }
- }
-#endif
-
- if (ctx->read_from_head == FLB_TRUE) {
- /* no need to seek, offset position is already zero */
- return 0;
- }
-
- /* tail... */
- ret = lseek(file->fd, 0, SEEK_END);
- if (ret == -1) {
- flb_errno();
- return -1;
- }
- file->offset = ret;
-
- return 0;
-}
-
-/* Multiline flush callback: invoked every time some content is complete */
-static int ml_flush_callback(struct flb_ml_parser *parser,
- struct flb_ml_stream *mst,
- void *data, char *buf_data, size_t buf_size)
-{
- int result;
- size_t mult_size = 0;
- char *mult_buf = NULL;
- struct flb_tail_file *file = data;
- struct flb_tail_config *ctx = file->config;
-
- if (ctx->path_key == NULL && ctx->offset_key == NULL) {
- ml_stream_buffer_append(file, buf_data, buf_size);
- }
- else {
- /* adjust the records in a new buffer */
- result = record_append_custom_keys(file,
- buf_data,
- buf_size,
- &mult_buf,
- &mult_size);
-
- if (result < 0) {
- ml_stream_buffer_append(file, buf_data, buf_size);
- }
- else {
- ml_stream_buffer_append(file, mult_buf, mult_size);
-
- flb_free(mult_buf);
- }
- }
-
- if (mst->forced_flush) {
- ml_stream_buffer_flush(ctx, file);
- }
-
- return 0;
-}
-
-int flb_tail_file_append(char *path, struct stat *st, int mode,
- struct flb_tail_config *ctx)
-{
- int fd;
- int ret;
- uint64_t stream_id;
- uint64_t ts;
- uint64_t hash_bits;
- flb_sds_t hash_key;
- size_t len;
- char *tag;
- char *name;
- size_t tag_len;
- struct flb_tail_file *file;
- struct stat lst;
- flb_sds_t inode_str;
-
- if (!S_ISREG(st->st_mode)) {
- return -1;
- }
-
- if (flb_tail_file_exists(st, ctx) == FLB_TRUE) {
- return -1;
- }
-
- fd = open(path, O_RDONLY);
- if (fd == -1) {
- flb_errno();
- flb_plg_error(ctx->ins, "cannot open %s", path);
- return -1;
- }
-
- file = flb_calloc(1, sizeof(struct flb_tail_file));
- if (!file) {
- flb_errno();
- goto error;
- }
-
- /* Initialize */
- file->watch_fd = -1;
- file->fd = fd;
-
- /* On non-windows environments check if the original path is a link */
- ret = lstat(path, &lst);
- if (ret == 0) {
- if (S_ISLNK(lst.st_mode)) {
- file->is_link = FLB_TRUE;
- file->link_inode = lst.st_ino;
- }
- }
-
- /* get unique hash for this file */
- ret = stat_to_hash_bits(ctx, st, &hash_bits);
- if (ret != 0) {
- flb_plg_error(ctx->ins, "error procesisng hash bits for file %s", path);
- goto error;
- }
- file->hash_bits = hash_bits;
-
- /* store the hash key used for hash_bits */
- ret = stat_to_hash_key(ctx, st, &hash_key);
- if (ret != 0) {
- flb_plg_error(ctx->ins, "error procesisng hash key for file %s", path);
- goto error;
- }
- file->hash_key = hash_key;
-
- file->inode = st->st_ino;
- file->offset = 0;
- file->size = st->st_size;
- file->buf_len = 0;
- file->parsed = 0;
- file->config = ctx;
- file->tail_mode = mode;
- file->tag_len = 0;
- file->tag_buf = NULL;
- file->rotated = 0;
- file->pending_bytes = 0;
- file->mult_firstline = FLB_FALSE;
- file->mult_keys = 0;
- file->mult_flush_timeout = 0;
- file->mult_skipping = FLB_FALSE;
-
- /*
- * Duplicate string into 'file' structure, the called function
- * take cares to resolve real-name of the file in case we are
- * running in a non-Linux system.
- *
- * Depending of the operating system, the way to obtain the file
- * name associated to it file descriptor can have different behaviors
- * specifically if it root path it's under a symbolic link. On Linux
- * we can trust the file name but in others it's better to solve it
- * with some extra calls.
- */
- ret = flb_tail_file_name_dup(path, file);
- if (!file->name) {
- flb_errno();
- goto error;
- }
-
- /* We keep a copy of the initial filename in orig_name. This is required
- * for path_key to continue working after rotation. */
- file->orig_name = flb_strdup(file->name);
- if (!file->orig_name) {
- flb_free(file->name);
- flb_errno();
- goto error;
- }
- file->orig_name_len = file->name_len;
-
- /* multiline msgpack buffers */
- file->mult_records = 0;
- msgpack_sbuffer_init(&file->mult_sbuf);
- msgpack_packer_init(&file->mult_pck, &file->mult_sbuf,
- msgpack_sbuffer_write);
-
- /* docker mode */
- file->dmode_flush_timeout = 0;
- file->dmode_complete = true;
- file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0);
- file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0);
- file->dmode_firstline = false;
-#ifdef FLB_HAVE_SQLDB
- file->db_id = 0;
-#endif
- file->skip_next = FLB_FALSE;
- file->skip_warn = FLB_FALSE;
-
- /* Multiline core mode */
- if (ctx->ml_ctx) {
- /*
- * Create inode str to get stream_id.
- *
- * If stream_id is created by filename,
- * it will be same after file rotation and it causes invalid destruction:
- *
- * - https://github.com/fluent/fluent-bit/issues/4190
- */
- inode_str = flb_sds_create_size(64);
- flb_sds_printf(&inode_str, "%"PRIu64, file->inode);
-
- /* Create a stream for this file */
- ret = flb_ml_stream_create(ctx->ml_ctx,
- inode_str, flb_sds_len(inode_str),
- ml_flush_callback, file,
- &stream_id);
- if (ret != 0) {
- flb_plg_error(ctx->ins,
- "could not create multiline stream for file: %s",
- inode_str);
- flb_sds_destroy(inode_str);
- goto error;
- }
- file->ml_stream_id = stream_id;
- flb_sds_destroy(inode_str);
-
- /*
- * Multiline core file buffer: the multiline core functionality invokes a callback everytime a message is ready
- * to be processed by the caller, this can be a multiline message or a message that is considered 'complete'. In
- * the previous version of Tail, when it received a message this message was automatically ingested into the pipeline
- * without any previous buffering which leads to performance degradation.
- *
- * The msgpack buffer 'ml_sbuf' keeps all ML provided records and it's flushed just when the file processor finish
- * processing the "read() bytes".
- */
- }
-
- /* Local buffer */
- file->buf_size = ctx->buf_chunk_size;
- file->buf_data = flb_malloc(file->buf_size);
- if (!file->buf_data) {
- flb_errno();
- goto error;
- }
-
- /* Initialize (optional) dynamic tag */
- if (ctx->dynamic_tag == FLB_TRUE) {
- len = ctx->ins->tag_len + strlen(path) + 1;
- tag = flb_malloc(len);
- if (!tag) {
- flb_errno();
- flb_plg_error(ctx->ins, "failed to allocate tag buffer");
- goto error;
- }
-#ifdef FLB_HAVE_REGEX
- ret = tag_compose(ctx->ins->tag, ctx->tag_regex, path, tag, &tag_len, ctx);
-#else
- ret = tag_compose(ctx->ins->tag, path, tag, &tag_len, ctx);
-#endif
- if (ret == 0) {
- file->tag_len = tag_len;
- file->tag_buf = flb_strdup(tag);
- }
- flb_free(tag);
- if (ret != 0) {
- flb_plg_error(ctx->ins, "failed to compose tag for file: %s", path);
- goto error;
- }
- }
- else {
- file->tag_len = strlen(ctx->ins->tag);
- file->tag_buf = flb_strdup(ctx->ins->tag);
- }
- if (!file->tag_buf) {
- flb_plg_error(ctx->ins, "failed to set tag for file: %s", path);
- flb_errno();
- goto error;
- }
-
- if (mode == FLB_TAIL_STATIC) {
- mk_list_add(&file->_head, &ctx->files_static);
- ctx->files_static_count++;
- flb_hash_table_add(ctx->static_hash, file->hash_key, flb_sds_len(file->hash_key),
- file, sizeof(file));
- tail_signal_manager(file->config);
- }
- else if (mode == FLB_TAIL_EVENT) {
- mk_list_add(&file->_head, &ctx->files_event);
- flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key),
- file, sizeof(file));
-
- /* Register this file into the fs_event monitoring */
- ret = flb_tail_fs_add(ctx, file);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "could not register file into fs_events");
- goto error;
- }
- }
-
- /* Set the file position (database offset, head or tail) */
- ret = set_file_position(ctx, file);
- if (ret == -1) {
- flb_tail_file_remove(file);
- goto error;
- }
-
- /* Remaining bytes to read */
- file->pending_bytes = file->size - file->offset;
-
-#ifdef FLB_HAVE_METRICS
- name = (char *) flb_input_name(ctx->ins);
- ts = cfl_time_now();
- cmt_counter_inc(ctx->cmt_files_opened, ts, 1, (char *[]) {name});
-
- /* Old api */
- flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics);
-#endif
-
- file->sl_log_event_encoder = flb_log_event_encoder_create(
- FLB_LOG_EVENT_FORMAT_DEFAULT);
-
- if (file->sl_log_event_encoder == NULL) {
- flb_tail_file_remove(file);
-
- goto error;
- }
-
- file->ml_log_event_encoder = flb_log_event_encoder_create(
- FLB_LOG_EVENT_FORMAT_DEFAULT);
-
- if (file->ml_log_event_encoder == NULL) {
- flb_tail_file_remove(file);
-
- goto error;
- }
-
- flb_plg_debug(ctx->ins,
- "inode=%"PRIu64" with offset=%"PRId64" appended as %s",
- file->inode, file->offset, path);
- return 0;
-
-error:
- if (file) {
- if (file->buf_data) {
- flb_free(file->buf_data);
- }
- if (file->name) {
- flb_free(file->name);
- }
- flb_free(file);
- }
- close(fd);
-
- return -1;
-}
-
-void flb_tail_file_remove(struct flb_tail_file *file)
-{
- uint64_t ts;
- char *name;
- struct flb_tail_config *ctx;
-
- ctx = file->config;
-
- flb_plg_debug(ctx->ins, "inode=%"PRIu64" removing file name %s",
- file->inode, file->name);
-
- /* remove the multiline.core stream */
- if (ctx->ml_ctx && file->ml_stream_id > 0) {
- /* destroy ml stream */
- flb_ml_stream_id_destroy_all(ctx->ml_ctx, file->ml_stream_id);
- }
-
- if (file->rotated > 0) {
-#ifdef FLB_HAVE_SQLDB
- /*
- * Make sure to remove a the file entry from the database if the file
- * was rotated and it's not longer being monitored.
- */
- if (ctx->db) {
- flb_tail_db_file_delete(file, file->config);
- }
-#endif
- mk_list_del(&file->_rotate_head);
- }
-
- msgpack_sbuffer_destroy(&file->mult_sbuf);
-
- if (file->sl_log_event_encoder != NULL) {
- flb_log_event_encoder_destroy(file->sl_log_event_encoder);
- }
-
- if (file->ml_log_event_encoder != NULL) {
- flb_log_event_encoder_destroy(file->ml_log_event_encoder);
- }
-
- flb_sds_destroy(file->dmode_buf);
- flb_sds_destroy(file->dmode_lastline);
- mk_list_del(&file->_head);
- flb_tail_fs_remove(ctx, file);
-
- /* avoid deleting file with -1 fd */
- if (file->fd != -1) {
- close(file->fd);
- }
- if (file->tag_buf) {
- flb_free(file->tag_buf);
- }
-
- /* remove any potential entry from the hash tables */
- flb_hash_table_del(ctx->static_hash, file->hash_key);
- flb_hash_table_del(ctx->event_hash, file->hash_key);
-
- flb_free(file->buf_data);
- flb_free(file->name);
- flb_free(file->orig_name);
- flb_free(file->real_name);
- flb_sds_destroy(file->hash_key);
-
-#ifdef FLB_HAVE_METRICS
- name = (char *) flb_input_name(ctx->ins);
- ts = cfl_time_now();
- cmt_counter_inc(ctx->cmt_files_closed, ts, 1, (char *[]) {name});
-
- /* old api */
- flb_metrics_sum(FLB_TAIL_METRIC_F_CLOSED, 1, ctx->ins->metrics);
-#endif
-
- flb_free(file);
-}
-
-int flb_tail_file_remove_all(struct flb_tail_config *ctx)
-{
- int count = 0;
- struct mk_list *head;
- struct mk_list *tmp;
- struct flb_tail_file *file;
-
- mk_list_foreach_safe(head, tmp, &ctx->files_static) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- flb_tail_file_remove(file);
- count++;
- }
-
- mk_list_foreach_safe(head, tmp, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- flb_tail_file_remove(file);
- count++;
- }
-
- return count;
-}
-
-static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *file)
-{
- int ret;
- int64_t offset;
- struct stat st;
-
- ret = fstat(file->fd, &st);
- if (ret == -1) {
- flb_errno();
- return FLB_TAIL_ERROR;
- }
-
- /* Check if the file was truncated */
- if (file->offset > st.st_size) {
- offset = lseek(file->fd, 0, SEEK_SET);
- if (offset == -1) {
- flb_errno();
- return FLB_TAIL_ERROR;
- }
-
- flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s",
- file->inode, file->name);
- file->offset = offset;
- file->buf_len = 0;
-
- /* Update offset in the database file */
-#ifdef FLB_HAVE_SQLDB
- if (ctx->db) {
- flb_tail_db_file_offset(file, ctx);
- }
-#endif
- }
- else {
- file->size = st.st_size;
- file->pending_bytes = (st.st_size - file->offset);
- }
-
- return FLB_TAIL_OK;
-}
-
-int flb_tail_file_chunk(struct flb_tail_file *file)
-{
- int ret;
- char *tmp;
- size_t size;
- size_t capacity;
- size_t processed_bytes;
- ssize_t bytes;
- struct flb_tail_config *ctx;
-
- /* Check if we the engine issued a pause */
- ctx = file->config;
- if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) {
- return FLB_TAIL_BUSY;
- }
-
- capacity = (file->buf_size - file->buf_len) - 1;
- if (capacity < 1) {
- /*
- * If there is no more room for more data, try to increase the
- * buffer under the limit of buffer_max_size.
- */
- if (file->buf_size >= ctx->buf_max_size) {
- if (ctx->skip_long_lines == FLB_FALSE) {
- flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, "
- "lines are too long. Skipping file.", file->name);
- return FLB_TAIL_ERROR;
- }
-
- /* Warn the user */
- if (file->skip_warn == FLB_FALSE) {
- flb_plg_warn(ctx->ins, "file=%s have long lines. "
- "Skipping long lines.", file->name);
- file->skip_warn = FLB_TRUE;
- }
-
- /* Do buffer adjustments */
- file->offset += file->buf_len;
- file->buf_len = 0;
- file->skip_next = FLB_TRUE;
- }
- else {
- size = file->buf_size + ctx->buf_chunk_size;
- if (size > ctx->buf_max_size) {
- size = ctx->buf_max_size;
- }
-
- /* Increase the buffer size */
- tmp = flb_realloc(file->buf_data, size);
- if (tmp) {
- flb_plg_trace(ctx->ins, "file=%s increase buffer size "
- "%lu => %lu bytes",
- file->name, file->buf_size, size);
- file->buf_data = tmp;
- file->buf_size = size;
- }
- else {
- flb_errno();
- flb_plg_error(ctx->ins, "cannot increase buffer size for %s, "
- "skipping file.", file->name);
- return FLB_TAIL_ERROR;
- }
- }
- capacity = (file->buf_size - file->buf_len) - 1;
- }
-
- bytes = read(file->fd, file->buf_data + file->buf_len, capacity);
- if (bytes > 0) {
- /* we read some data, let the content processor take care of it */
- file->buf_len += bytes;
- file->buf_data[file->buf_len] = '\0';
-
- /* Now that we have some data in the buffer, call the data processor
- * which aims to cut lines and register the entries into the engine.
- *
- * The returned value is the absolute offset the file must be seek
- * now. It may need to get back a few bytes at the beginning of a new
- * line.
- */
- ret = process_content(file, &processed_bytes);
- if (ret < 0) {
- flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR",
- file->inode, file->name);
- return FLB_TAIL_ERROR;
- }
-
- /* Adjust the file offset and buffer */
- file->offset += processed_bytes;
- consume_bytes(file->buf_data, processed_bytes, file->buf_len);
- file->buf_len -= processed_bytes;
- file->buf_data[file->buf_len] = '\0';
-
-#ifdef FLB_HAVE_SQLDB
- if (file->config->db) {
- flb_tail_db_file_offset(file, file->config);
- }
-#endif
-
- /* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */
- ret = adjust_counters(ctx, file);
-
- /* Data was consumed but likely some bytes still remain */
- return ret;
- }
- else if (bytes == 0) {
- /* We reached the end of file, let's wait for some incoming data */
- ret = adjust_counters(ctx, file);
- if (ret == FLB_TAIL_OK) {
- return FLB_TAIL_WAIT;
- }
- else {
- return FLB_TAIL_ERROR;
- }
- }
- else {
- /* error */
- flb_errno();
- flb_plg_error(ctx->ins, "error reading %s", file->name);
- return FLB_TAIL_ERROR;
- }
-
- return FLB_TAIL_ERROR;
-}
-
-/* Returns FLB_TRUE if a file has been rotated, otherwise FLB_FALSE */
-int flb_tail_file_is_rotated(struct flb_tail_config *ctx,
- struct flb_tail_file *file)
-{
- int ret;
- char *name;
- struct stat st;
-
- /*
- * Do not double-check already rotated files since the caller of this
- * function will trigger a rotation.
- */
- if (file->rotated != 0) {
- return FLB_FALSE;
- }
-
- /* Check if the 'original monitored file' is a link and rotated */
- if (file->is_link == FLB_TRUE) {
- ret = lstat(file->name, &st);
- if (ret == -1) {
- /* Broken link or missing file */
- if (errno == ENOENT) {
- flb_plg_info(ctx->ins, "inode=%"PRIu64" link_rotated: %s",
- file->link_inode, file->name);
- return FLB_TRUE;
- }
- else {
- flb_errno();
- flb_plg_error(ctx->ins,
- "link_inode=%"PRIu64" cannot detect if file: %s",
- file->link_inode, file->name);
- return -1;
- }
- }
- else {
- /* The file name is there, check if the same that we have */
- if (st.st_ino != file->link_inode) {
- return FLB_TRUE;
- }
- }
- }
-
- /* Retrieve the real file name, operating system lookup */
- name = flb_tail_file_name(file);
- if (!name) {
- flb_plg_error(ctx->ins,
- "inode=%"PRIu64" cannot detect if file was rotated: %s",
- file->inode, file->name);
- return -1;
- }
-
-
- /* Get stats from the file name */
- ret = stat(name, &st);
- if (ret == -1) {
- flb_errno();
- flb_free(name);
- return -1;
- }
-
- /* Compare inodes and names */
- if (file->inode == st.st_ino &&
- flb_tail_target_file_name_cmp(name, file) == 0) {
- flb_free(name);
- return FLB_FALSE;
- }
-
- flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated: %s => %s",
- file->inode, file->name, name);
-
- flb_free(name);
- return FLB_TRUE;
-}
-
-/* Promote a event in the static list to the dynamic 'events' interface */
-int flb_tail_file_to_event(struct flb_tail_file *file)
-{
- int ret;
- struct stat st;
- struct flb_tail_config *ctx = file->config;
-
- /* Check if the file promoted have pending bytes */
- ret = fstat(file->fd, &st);
- if (ret != 0) {
- flb_errno();
- return -1;
- }
-
- if (file->offset < st.st_size) {
- file->pending_bytes = (st.st_size - file->offset);
- tail_signal_pending(file->config);
- }
- else {
- file->pending_bytes = 0;
- }
-
- /* Check if the file has been rotated */
- ret = flb_tail_file_is_rotated(ctx, file);
- if (ret == FLB_TRUE) {
- flb_tail_file_rotated(file);
- }
-
- /* Notify the fs-event handler that we will start monitoring this 'file' */
- ret = flb_tail_fs_add(ctx, file);
- if (ret == -1) {
- return -1;
- }
-
- /* List swap: change from 'static' to 'event' list */
- mk_list_del(&file->_head);
- ctx->files_static_count--;
- flb_hash_table_del(ctx->static_hash, file->hash_key);
-
- mk_list_add(&file->_head, &file->config->files_event);
- flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key),
- file, sizeof(file));
-
- file->tail_mode = FLB_TAIL_EVENT;
-
- return 0;
-}
-
-/*
- * Given an open file descriptor, return the filename. This function is a
- * bit slow and it aims to be used only when a file is rotated.
- */
-char *flb_tail_file_name(struct flb_tail_file *file)
-{
- int ret;
- char *buf;
-#ifdef __linux__
- ssize_t s;
- char tmp[128];
-#elif defined(__APPLE__)
- char path[PATH_MAX];
-#elif defined(FLB_SYSTEM_WINDOWS)
- HANDLE h;
-#elif defined(FLB_SYSTEM_FREEBSD)
- struct kinfo_file *file_entries;
- int file_count;
- int file_index;
-#endif
-
- buf = flb_malloc(PATH_MAX);
- if (!buf) {
- flb_errno();
- return NULL;
- }
-
-#ifdef __linux__
- ret = snprintf(tmp, sizeof(tmp) - 1, "/proc/%i/fd/%i", getpid(), file->fd);
- if (ret == -1) {
- flb_errno();
- flb_free(buf);
- return NULL;
- }
-
- s = readlink(tmp, buf, PATH_MAX);
- if (s == -1) {
- flb_free(buf);
- flb_errno();
- return NULL;
- }
- buf[s] = '\0';
-
-#elif __APPLE__
- int len;
-
- ret = fcntl(file->fd, F_GETPATH, path);
- if (ret == -1) {
- flb_errno();
- flb_free(buf);
- return NULL;
- }
-
- len = strlen(path);
- memcpy(buf, path, len);
- buf[len] = '\0';
-
-#elif defined(FLB_SYSTEM_WINDOWS)
- int len;
-
- h = (HANDLE) _get_osfhandle(file->fd);
- if (h == INVALID_HANDLE_VALUE) {
- flb_errno();
- flb_free(buf);
- return NULL;
- }
-
- /* This function returns the length of the string excluding "\0"
- * and the resulting path has a "\\?\" prefix.
- */
- len = GetFinalPathNameByHandleA(h, buf, PATH_MAX, FILE_NAME_NORMALIZED);
- if (len == 0 || len >= PATH_MAX) {
- flb_free(buf);
- return NULL;
- }
-
- if (strstr(buf, "\\\\?\\")) {
- memmove(buf, buf + 4, len + 1);
- }
-#elif defined(FLB_SYSTEM_FREEBSD)
- if ((file_entries = kinfo_getfile(getpid(), &file_count)) == NULL) {
- flb_free(buf);
- return NULL;
- }
-
- for (file_index=0; file_index < file_count; file_index++) {
- if (file_entries[file_index].kf_fd == file->fd) {
- strncpy(buf, file_entries[file_index].kf_path, PATH_MAX - 1);
- buf[PATH_MAX - 1] = 0;
- break;
- }
- }
- free(file_entries);
-#endif
- return buf;
-}
-
-int flb_tail_file_name_dup(char *path, struct flb_tail_file *file)
-{
- file->name = flb_strdup(path);
- if (!file->name) {
- flb_errno();
- return -1;
- }
- file->name_len = strlen(file->name);
-
- if (file->real_name) {
- flb_free(file->real_name);
- }
-
- file->real_name = flb_tail_file_name(file);
- if (!file->real_name) {
- flb_errno();
- flb_free(file->name);
- file->name = NULL;
- return -1;
- }
-
- return 0;
-}
-
-/* Invoked every time a file was rotated */
-int flb_tail_file_rotated(struct flb_tail_file *file)
-{
- int ret;
- uint64_t ts;
- char *name;
- char *i_name;
- char *tmp;
- struct stat st;
- struct flb_tail_config *ctx = file->config;
-
- /* Get the new file name */
- name = flb_tail_file_name(file);
- if (!name) {
- return -1;
- }
-
- flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated %s -> %s",
- file->inode, file->name, name);
-
- /* Update local file entry */
- tmp = file->name;
- flb_tail_file_name_dup(name, file);
- flb_plg_info(ctx->ins, "inode=%"PRIu64" handle rotation(): %s => %s",
- file->inode, tmp, file->name);
- if (file->rotated == 0) {
- file->rotated = time(NULL);
- mk_list_add(&file->_rotate_head, &file->config->files_rotated);
-
- /* Rotate the file in the database */
-#ifdef FLB_HAVE_SQLDB
- if (file->config->db) {
- ret = flb_tail_db_file_rotate(name, file, file->config);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "could not rotate file %s->%s in database",
- file->name, name);
- }
- }
-#endif
-
-#ifdef FLB_HAVE_METRICS
- i_name = (char *) flb_input_name(ctx->ins);
- ts = cfl_time_now();
- cmt_counter_inc(ctx->cmt_files_rotated, ts, 1, (char *[]) {i_name});
-
- /* OLD api */
- flb_metrics_sum(FLB_TAIL_METRIC_F_ROTATED,
- 1, file->config->ins->metrics);
-#endif
-
- /* Check if a new file has been created */
- ret = stat(tmp, &st);
- if (ret == 0 && st.st_ino != file->inode) {
- if (flb_tail_file_exists(&st, ctx) == FLB_FALSE) {
- ret = flb_tail_file_append(tmp, &st, FLB_TAIL_STATIC, ctx);
- if (ret == -1) {
- flb_tail_scan(ctx->path_list, ctx);
- }
- else {
- tail_signal_manager(file->config);
- }
- }
- }
- }
- flb_free(tmp);
- flb_free(name);
-
- return 0;
-}
-
-static int check_purge_deleted_file(struct flb_tail_config *ctx,
- struct flb_tail_file *file, time_t ts)
-{
- int ret;
- struct stat st;
-
- ret = fstat(file->fd, &st);
- if (ret == -1) {
- flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name);
- flb_tail_file_remove(file);
- return FLB_TRUE;
- }
-
- if (st.st_nlink == 0) {
- flb_plg_debug(ctx->ins, "purge: monitored file has been deleted: %s",
- file->name);
-#ifdef FLB_HAVE_SQLDB
- if (ctx->db) {
- /* Remove file entry from the database */
- flb_tail_db_file_delete(file, file->config);
- }
-#endif
- /* Remove file from the monitored list */
- flb_tail_file_remove(file);
- return FLB_TRUE;
- }
-
- return FLB_FALSE;
-}
-
-/* Purge rotated and deleted files */
-int flb_tail_file_purge(struct flb_input_instance *ins,
- struct flb_config *config, void *context)
-{
- int ret;
- int count = 0;
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_tail_file *file;
- struct flb_tail_config *ctx = context;
- time_t now;
- struct stat st;
-
- /* Rotated files */
- now = time(NULL);
- mk_list_foreach_safe(head, tmp, &ctx->files_rotated) {
- file = mk_list_entry(head, struct flb_tail_file, _rotate_head);
- if ((file->rotated + ctx->rotate_wait) <= now) {
- ret = fstat(file->fd, &st);
- if (ret == 0) {
- flb_plg_debug(ctx->ins,
- "inode=%"PRIu64" purge rotated file %s " \
- "(offset=%"PRId64" / size = %"PRIu64")",
- file->inode, file->name, file->offset, (uint64_t)st.st_size);
- if (file->pending_bytes > 0 && flb_input_buf_paused(ins)) {
- flb_plg_warn(ctx->ins, "purged rotated file while data "
- "ingestion is paused, consider increasing "
- "rotate_wait");
- }
- }
- else {
- flb_plg_debug(ctx->ins,
- "inode=%"PRIu64" purge rotated file %s (offset=%"PRId64")",
- file->inode, file->name, file->offset);
- }
-
- flb_tail_file_remove(file);
- count++;
- }
- }
-
- /*
- * Deleted files: under high load scenarios, exists the chances that in
- * our event loop we miss some notifications about a file. In order to
- * sanitize our list of monitored files we will iterate all of them and check
- * if they have been deleted or not.
- */
- mk_list_foreach_safe(head, tmp, &ctx->files_static) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- check_purge_deleted_file(ctx, file, now);
- }
- mk_list_foreach_safe(head, tmp, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- check_purge_deleted_file(ctx, file, now);
- }
-
- return count;
-}
diff --git a/fluent-bit/plugins/in_tail/tail_file.h b/fluent-bit/plugins/in_tail/tail_file.h
deleted file mode 100644
index 796224c37..000000000
--- a/fluent-bit/plugins/in_tail/tail_file.h
+++ /dev/null
@@ -1,137 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_FILE_H
-#define FLB_TAIL_FILE_H
-
-#include <sys/types.h>
-#include <sys/stat.h>
-
-#include <fluent-bit/flb_compat.h>
-#include <fluent-bit/flb_input.h>
-
-#include "tail.h"
-#include "tail_fs.h"
-#include "tail_config.h"
-#include "tail_file_internal.h"
-
-#ifdef FLB_SYSTEM_WINDOWS
-#include "win32.h"
-#endif
-
-#ifdef FLB_HAVE_REGEX
-#define FLB_HASH_TABLE_SIZE 50
-#endif
-
-/* return the file modification time in seconds since epoch */
-static inline int64_t flb_tail_stat_mtime(struct stat *st)
-{
-#if defined(FLB_HAVE_WINDOWS)
- return (int64_t) st->st_mtime;
-#elif defined(__APPLE__) && !defined(_POSIX_C_SOURCE)
- return (int64_t) st->st_mtimespec.tv_sec;
-#elif (_POSIX_C_SOURCE >= 200809L || \
- defined(_BSD_SOURCE) || defined(_SVID_SOURCE) || \
- defined(__BIONIC__) || (defined (__SVR4) && defined (__sun)) || \
- defined(__FreeBSD__) || defined (__linux__))
- return (int64_t) st->st_mtim.tv_sec;
-#elif defined(_AIX)
- return (int64_t) st->st_mtime;
-#else
- return (int64_t) st->st_mtime;
-#endif
-
- /* backend unsupported: submit a PR :) */
- return -1;
-}
-
-static inline int flb_tail_target_file_name_cmp(char *name,
- struct flb_tail_file *file)
-{
- int ret;
- char *name_a = NULL;
- char *name_b = NULL;
- char *base_a = NULL;
- char *base_b = NULL;
-
- name_a = flb_strdup(name);
- if (!name_a) {
- flb_errno();
- ret = -1;
- goto out;
- }
-
- base_a = flb_strdup(basename(name_a));
- if (!base_a) {
- flb_errno();
- ret = -1;
- goto out;
- }
-
-#if defined(FLB_SYSTEM_WINDOWS)
- name_b = flb_strdup(file->real_name);
- if (!name_b) {
- flb_errno();
- ret = -1;
- goto out;
- }
-
- base_b = basename(name_b);
- ret = _stricmp(base_a, base_b);
-#else
- name_b = flb_strdup(file->real_name);
- if (!name_b) {
- flb_errno();
- ret = -1;
- goto out;
- }
- base_b = basename(name_b);
- ret = strcmp(base_a, base_b);
-#endif
-
- out:
- flb_free(name_a);
- flb_free(name_b);
- flb_free(base_a);
-
- /* FYI: 'base_b' never points to a new allocation, no flb_free is needed */
-
- return ret;
-}
-
-int flb_tail_file_name_dup(char *path, struct flb_tail_file *file);
-int flb_tail_file_to_event(struct flb_tail_file *file);
-int flb_tail_file_chunk(struct flb_tail_file *file);
-int flb_tail_file_append(char *path, struct stat *st, int mode,
- struct flb_tail_config *ctx);
-void flb_tail_file_remove(struct flb_tail_file *file);
-int flb_tail_file_remove_all(struct flb_tail_config *ctx);
-char *flb_tail_file_name(struct flb_tail_file *file);
-int flb_tail_file_is_rotated(struct flb_tail_config *ctx,
- struct flb_tail_file *file);
-int flb_tail_file_rotated(struct flb_tail_file *file);
-int flb_tail_file_purge(struct flb_input_instance *ins,
- struct flb_config *config, void *context);
-int flb_tail_pack_line_map(struct flb_time *time, char **data,
- size_t *data_size, struct flb_tail_file *file,
- size_t processed_bytes);
-int flb_tail_file_pack_line(struct flb_time *time, char *data, size_t data_size,
- struct flb_tail_file *file, size_t processed_bytes);
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_file_internal.h b/fluent-bit/plugins/in_tail/tail_file_internal.h
deleted file mode 100644
index 6d95c87c1..000000000
--- a/fluent-bit/plugins/in_tail/tail_file_internal.h
+++ /dev/null
@@ -1,130 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_INTERNAL_H
-#define FLB_TAIL_INTERNAL_H
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input.h>
-#include <fluent-bit/flb_sds.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_log_event_encoder.h>
-
-#ifdef FLB_HAVE_PARSER
-#include <fluent-bit/multiline/flb_ml.h>
-#endif
-
-#include "tail.h"
-#include "tail_config.h"
-
-struct flb_tail_file {
- /* Inotify */
- int watch_fd;
- /* file lookup info */
- int fd;
- int64_t size;
- int64_t offset;
- int64_t last_line;
- uint64_t dev_id;
- uint64_t inode;
- uint64_t link_inode;
- int is_link;
- char *name; /* target file name given by scan routine */
- char *real_name; /* real file name in the file system */
- char *orig_name; /* original file name (before rotation) */
- size_t name_len;
- size_t orig_name_len;
- time_t rotated;
- int64_t pending_bytes;
-
- /* dynamic tag for this file */
- int tag_len;
- char *tag_buf;
-
- /* OLD multiline */
- time_t mult_flush_timeout; /* time when multiline started */
- int mult_firstline; /* bool: mult firstline found ? */
- int mult_firstline_append; /* bool: mult firstline appendable ? */
- int mult_skipping; /* skipping because ignode_older than ? */
- int mult_keys; /* total number of buffered keys */
-
- int mult_records; /* multiline records counter mult_sbuf */
- msgpack_sbuffer mult_sbuf; /* temporary msgpack buffer */
- msgpack_packer mult_pck; /* temporary msgpack packer */
- struct flb_time mult_time; /* multiline time parsed from first line */
-
- /* OLD docker mode */
- time_t dmode_flush_timeout; /* time when docker mode started */
- flb_sds_t dmode_buf; /* buffer for docker mode */
- flb_sds_t dmode_lastline; /* last incomplete line */
- bool dmode_complete; /* buffer contains completed log */
- bool dmode_firstline; /* dmode mult firstline found ? */
-
- /* multiline engine: file stream_id and local buffers */
- uint64_t ml_stream_id;
-
- /* content parsing, positions and buffer */
- size_t parsed;
- size_t buf_len;
- size_t buf_size;
- char *buf_data;
-
- /*
- * This value represent the number of bytes procesed by process_content()
- * in the last iteration.
- */
- size_t last_processed_bytes;
-
- /*
- * Long-lines handling: this flag is enabled when a previous line was
- * too long and the buffer did not contain a \n, so when reaching the
- * missing \n, skip that content and move forward.
- *
- * This flag is only set when Skip_Long_Lines is On.
- */
- int skip_next;
-
- /* Did the plugin already warn the user about long lines ? */
- int skip_warn;
-
- /* Opaque data type for specific fs-event backend data */
- void *fs_backend;
-
- /* database reference */
- uint64_t db_id;
-
- uint64_t hash_bits;
- flb_sds_t hash_key;
-
- /* There are dedicated log event encoders for
- * single and multi line events because I am respecting
- * the old behavior which resulted in grouping both types
- * of logs in tail_file.c but I don't know if this is
- * strictly necessary.
- */
- struct flb_log_event_encoder *ml_log_event_encoder;
- struct flb_log_event_encoder *sl_log_event_encoder;
-
- /* reference */
- int tail_mode;
- struct flb_tail_config *config;
- struct mk_list _head;
- struct mk_list _rotate_head;
-};
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_fs.h b/fluent-bit/plugins/in_tail/tail_fs.h
deleted file mode 100644
index 948954333..000000000
--- a/fluent-bit/plugins/in_tail/tail_fs.h
+++ /dev/null
@@ -1,96 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_FS_H
-#define FLB_TAIL_FS_H
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input.h>
-
-#include "tail_config.h"
-#include "tail_file_internal.h"
-
-#include "tail_fs_stat.h"
-#ifdef FLB_HAVE_INOTIFY
-#include "tail_fs_inotify.h"
-#endif
-
-static inline int flb_tail_fs_init(struct flb_input_instance *in,
- struct flb_tail_config *ctx, struct flb_config *config)
-{
-#ifdef FLB_HAVE_INOTIFY
- if (ctx->inotify_watcher) {
- return flb_tail_fs_inotify_init(in, ctx, config);
- }
-#endif
- return flb_tail_fs_stat_init(in, ctx, config);
-}
-
-static inline void flb_tail_fs_pause(struct flb_tail_config *ctx)
-{
-#ifdef FLB_HAVE_INOTIFY
- if (ctx->inotify_watcher) {
- return flb_tail_fs_inotify_pause(ctx);
- }
-#endif
- return flb_tail_fs_stat_pause(ctx);
-}
-
-static inline void flb_tail_fs_resume(struct flb_tail_config *ctx)
-{
-#ifdef FLB_HAVE_INOTIFY
- if (ctx->inotify_watcher) {
- return flb_tail_fs_inotify_resume(ctx);
- }
-#endif
- return flb_tail_fs_stat_resume(ctx);
-}
-
-static inline int flb_tail_fs_add(struct flb_tail_config *ctx, struct flb_tail_file *file)
-{
-#ifdef FLB_HAVE_INOTIFY
- if (ctx->inotify_watcher) {
- return flb_tail_fs_inotify_add(file);
- }
-#endif
- return flb_tail_fs_stat_add(file);
-}
-
-static inline int flb_tail_fs_remove(struct flb_tail_config *ctx, struct flb_tail_file *file)
-{
-#ifdef FLB_HAVE_INOTIFY
- if (ctx->inotify_watcher) {
- return flb_tail_fs_inotify_remove(file);
- }
-#endif
- return flb_tail_fs_stat_remove(file);
-}
-
-static inline int flb_tail_fs_exit(struct flb_tail_config *ctx)
-{
-#ifdef FLB_HAVE_INOTIFY
- if (ctx->inotify_watcher) {
- return flb_tail_fs_inotify_exit(ctx);
- }
-#endif
- return flb_tail_fs_stat_exit(ctx);
-}
-
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_fs_inotify.c b/fluent-bit/plugins/in_tail/tail_fs_inotify.c
deleted file mode 100644
index 59d10ca08..000000000
--- a/fluent-bit/plugins/in_tail/tail_fs_inotify.c
+++ /dev/null
@@ -1,433 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define _DEFAULT_SOURCE
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input.h>
-#include <fluent-bit/flb_input_plugin.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <sys/inotify.h>
-
-#include "tail_config.h"
-#include "tail_file.h"
-#include "tail_db.h"
-#include "tail_signal.h"
-
-#include <limits.h>
-#include <fcntl.h>
-
-#include <sys/ioctl.h>
-
-static int debug_event_mask(struct flb_tail_config *ctx,
- struct flb_tail_file *file,
- uint32_t mask)
-{
- flb_sds_t buf;
- int buf_size = 256;
-
- /* Only enter this function if debug mode is allowed */
- if (flb_log_check(FLB_LOG_DEBUG) == 0) {
- return 0;
- }
-
- if (file) {
- buf_size = file->name_len + 128;
- }
-
- if (buf_size < 256) {
- buf_size = 256;
- }
-
- /* Create buffer */
- buf = flb_sds_create_size(buf_size);
- if (!buf) {
- return -1;
- }
-
- /* Print info into sds */
- if (file) {
- flb_sds_printf(&buf, "inode=%"PRIu64", %s, events: ", file->inode, file->name);
- }
- else {
- flb_sds_printf(&buf, "events: ");
- }
-
- if (mask & IN_ATTRIB) {
- flb_sds_printf(&buf, "IN_ATTRIB ");
- }
- if (mask & IN_IGNORED) {
- flb_sds_printf(&buf, "IN_IGNORED ");
- }
- if (mask & IN_MODIFY) {
- flb_sds_printf(&buf, "IN_MODIFY ");
- }
- if (mask & IN_MOVE_SELF) {
- flb_sds_printf(&buf, "IN_MOVE_SELF ");
- }
- if (mask & IN_Q_OVERFLOW) {
- flb_sds_printf(&buf, "IN_Q_OVERFLOW ");
- }
-
- flb_plg_debug(ctx->ins, "%s", buf);
- flb_sds_destroy(buf);
-
- return 0;
-}
-
-static int tail_fs_add(struct flb_tail_file *file, int check_rotated)
-{
- int flags;
- int watch_fd;
- char *name;
- struct flb_tail_config *ctx = file->config;
-
- /*
- * If there is no watcher associated, we only want to monitor events if
- * this file is rotated to somewhere. Note at this point we are polling
- * lines from the file and once we reach EOF (and a watch_fd exists),
- * we update the flags to receive notifications.
- */
- flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW;
-
- if (check_rotated == FLB_TRUE) {
- flags |= IN_MOVE_SELF;
- }
-
- /*
- * Double check real name of the file associated to the inode:
- *
- * Inotify interface in the Kernel uses the inode number as a real reference
- * for the file we have opened. If for some reason the file we are pointing
- * out in file->name has been rotated and not been updated, we might not add
- * the watch to the real file we aim to.
- *
- * A case like this can generate the issue:
- *
- * 1. inode=1 : file a.log is being watched
- * 2. inode=1 : file a.log is rotated to a.log.1, but notification not
- * delivered yet.
- * 3. inode=2 : new file 'a.log' is created
- * 4. inode=2 : the scan_path routine discover the new 'a.log' file
- * 5. inode=2 : add an inotify watcher for 'a.log'
- * 6. conflict: inotify_add_watch() receives the path 'a.log',
- */
-
- name = flb_tail_file_name(file);
- if (!name) {
- flb_plg_error(ctx->ins, "inode=%"PRIu64" cannot get real filename for inotify",
- file->inode);
- return -1;
- }
-
- /* Register or update the flags */
- watch_fd = inotify_add_watch(ctx->fd_notify, name, flags);
- flb_free(name);
-
- if (watch_fd == -1) {
- flb_errno();
- if (errno == ENOSPC) {
- flb_plg_error(ctx->ins, "inotify: The user limit on the total "
- "number of inotify watches was reached or the kernel "
- "failed to allocate a needed resource (ENOSPC)");
- }
- return -1;
- }
- file->watch_fd = watch_fd;
- flb_plg_info(ctx->ins, "inotify_fs_add(): inode=%"PRIu64" watch_fd=%i name=%s",
- file->inode, watch_fd, file->name);
- return 0;
-}
-
-static int flb_tail_fs_add_rotated(struct flb_tail_file *file)
-{
- return tail_fs_add(file, FLB_FALSE);
-}
-
-static int tail_fs_event(struct flb_input_instance *ins,
- struct flb_config *config, void *in_context)
-{
- int ret;
- int64_t offset;
- struct mk_list *head;
- struct mk_list *tmp;
- struct flb_tail_config *ctx = in_context;
- struct flb_tail_file *file = NULL;
- struct inotify_event ev;
- struct stat st;
-
- /* Read the event */
- ret = read(ctx->fd_notify, &ev, sizeof(struct inotify_event));
- if (ret < 1) {
- return -1;
- }
-
- /* Lookup watched file */
- mk_list_foreach_safe(head, tmp, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- if (file->watch_fd != ev.wd) {
- file = NULL;
- continue;
- }
- break;
- }
-
- if (!file) {
- return -1;
- }
-
- /* Debug event */
- debug_event_mask(ctx, file, ev.mask);
-
- if (ev.mask & IN_IGNORED) {
- flb_plg_debug(ctx->ins, "inode=%"PRIu64" watch_fd=%i IN_IGNORED",
- file->inode, ev.wd);
- return -1;
- }
-
- /* Check file rotation (only if it has not been rotated before) */
- if (ev.mask & IN_MOVE_SELF && file->rotated == 0) {
- flb_plg_debug(ins, "inode=%"PRIu64" rotated IN_MOVE SELF '%s'",
- file->inode, file->name);
-
- /* A rotated file must be re-registered */
- flb_tail_file_rotated(file);
- flb_tail_fs_remove(ctx, file);
- flb_tail_fs_add_rotated(file);
- }
-
- ret = fstat(file->fd, &st);
- if (ret == -1) {
- flb_plg_debug(ins, "inode=%"PRIu64" error stat(2) %s, removing",
- file->inode, file->name);
- flb_tail_file_remove(file);
- return 0;
- }
- file->size = st.st_size;
- file->pending_bytes = (file->size - file->offset);
-
- /* File was removed ? */
- if (ev.mask & IN_ATTRIB) {
- /* Check if the file have been deleted */
- if (st.st_nlink == 0) {
- flb_plg_debug(ins, "inode=%"PRIu64" file has been deleted: %s",
- file->inode, file->name);
-
-#ifdef FLB_HAVE_SQLDB
- if (ctx->db) {
- /* Remove file entry from the database */
- flb_tail_db_file_delete(file, ctx);
- }
-#endif
- /* Remove file from the monitored list */
- flb_tail_file_remove(file);
- return 0;
- }
- }
-
- if (ev.mask & IN_MODIFY) {
- /*
- * The file was modified, check how many new bytes do
- * we have.
- */
-
- /* Check if the file was truncated */
- if (file->offset > st.st_size) {
- offset = lseek(file->fd, 0, SEEK_SET);
- if (offset == -1) {
- flb_errno();
- return -1;
- }
-
- flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s",
- file->inode, file->name);
- file->offset = offset;
- file->buf_len = 0;
-
- /* Update offset in the database file */
-#ifdef FLB_HAVE_SQLDB
- if (ctx->db) {
- flb_tail_db_file_offset(file, ctx);
- }
-#endif
- }
- }
-
- /* Collect the data */
- ret = in_tail_collect_event(file, config);
- if (ret != FLB_TAIL_ERROR) {
- /*
- * Due to read buffer size capacity, there are some cases where the
- * read operation cannot consume all new data available on one
- * round; upon successfull read(2) some data can still remain.
- *
- * If that is the case, we set in the structure how
- * many bytes are available 'now', so then the further
- * routine that check pending bytes and then the inotified-file
- * can process them properly after an internal signal.
- *
- * The goal to defer this routine is to avoid a blocking
- * read(2) operation, that might kill performance. Just let's
- * wait a second and do a good job.
- */
- tail_signal_pending(ctx);
- }
- else {
- return ret;
- }
-
- return 0;
-}
-
-static int in_tail_progress_check_callback(struct flb_input_instance *ins,
- struct flb_config *config, void *context)
-{
- int ret = 0;
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_tail_config *ctx = context;
- struct flb_tail_file *file;
- int pending_data_detected;
- struct stat st;
-
- (void) config;
-
- pending_data_detected = FLB_FALSE;
-
- mk_list_foreach_safe(head, tmp, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
-
- if (file->offset < file->size) {
- pending_data_detected = FLB_TRUE;
-
- continue;
- }
-
- ret = fstat(file->fd, &st);
- if (ret == -1) {
- flb_errno();
- flb_plg_error(ins, "fstat error");
-
- continue;
- }
-
- if (file->offset < st.st_size) {
- file->size = st.st_size;
- file->pending_bytes = (file->size - file->offset);
-
- pending_data_detected = FLB_TRUE;
- }
- }
-
- if (pending_data_detected) {
- tail_signal_pending(ctx);
- }
-
- return 0;
-}
-
-/* File System events based on Inotify(2). Linux >= 2.6.32 is suggested */
-int flb_tail_fs_inotify_init(struct flb_input_instance *in,
- struct flb_tail_config *ctx, struct flb_config *config)
-{
- int fd;
- int ret;
-
- flb_plg_debug(ctx->ins, "flb_tail_fs_inotify_init() initializing inotify tail input");
-
- /* Create inotify instance */
- fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
- if (fd == -1) {
- flb_errno();
- return -1;
- }
- flb_plg_debug(ctx->ins, "inotify watch fd=%i", fd);
- ctx->fd_notify = fd;
-
- /* This backend use Fluent Bit event-loop to trigger notifications */
- ret = flb_input_set_collector_event(in, tail_fs_event,
- ctx->fd_notify, config);
- if (ret < 0) {
- close(fd);
- return -1;
- }
- ctx->coll_fd_fs1 = ret;
-
- /* Register callback to check current tail offsets */
- ret = flb_input_set_collector_time(in, in_tail_progress_check_callback,
- ctx->progress_check_interval,
- ctx->progress_check_interval_nsec,
- config);
- if (ret == -1) {
- flb_tail_config_destroy(ctx);
- return -1;
- }
- ctx->coll_fd_progress_check = ret;
-
- return 0;
-}
-
-void flb_tail_fs_inotify_pause(struct flb_tail_config *ctx)
-{
- flb_input_collector_pause(ctx->coll_fd_fs1, ctx->ins);
-}
-
-void flb_tail_fs_inotify_resume(struct flb_tail_config *ctx)
-{
- flb_input_collector_resume(ctx->coll_fd_fs1, ctx->ins);
-}
-
-int flb_tail_fs_inotify_add(struct flb_tail_file *file)
-{
- int ret;
- struct flb_tail_config *ctx = file->config;
-
- ret = tail_fs_add(file, FLB_TRUE);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "inode=%"PRIu64" cannot register file %s",
- file->inode, file->name);
- return -1;
- }
-
- return 0;
-}
-
-int flb_tail_fs_inotify_remove(struct flb_tail_file *file)
-{
- struct flb_tail_config *ctx = file->config;
-
- if (file->watch_fd == -1) {
- return 0;
- }
-
- flb_plg_info(ctx->ins, "inotify_fs_remove(): inode=%"PRIu64" watch_fd=%i",
- file->inode, file->watch_fd);
-
- inotify_rm_watch(file->config->fd_notify, file->watch_fd);
- file->watch_fd = -1;
- return 0;
-}
-
-int flb_tail_fs_inotify_exit(struct flb_tail_config *ctx)
-{
- return close(ctx->fd_notify);
-}
diff --git a/fluent-bit/plugins/in_tail/tail_fs_inotify.h b/fluent-bit/plugins/in_tail/tail_fs_inotify.h
deleted file mode 100644
index 128ab0624..000000000
--- a/fluent-bit/plugins/in_tail/tail_fs_inotify.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_FS_INOTIFY_H
-#define FLB_TAIL_FS_INOTIFY_H
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input.h>
-
-#include "tail_config.h"
-#include "tail_file_internal.h"
-
-int flb_tail_fs_inotify_init(struct flb_input_instance *in,
- struct flb_tail_config *ctx, struct flb_config *config);
-int flb_tail_fs_inotify_add(struct flb_tail_file *file);
-int flb_tail_fs_inotify_remove(struct flb_tail_file *file);
-int flb_tail_fs_inotify_exit(struct flb_tail_config *ctx);
-void flb_tail_fs_inotify_pause(struct flb_tail_config *ctx);
-void flb_tail_fs_inotify_resume(struct flb_tail_config *ctx);
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_fs_stat.c b/fluent-bit/plugins/in_tail/tail_fs_stat.c
deleted file mode 100644
index 6b312c9bd..000000000
--- a/fluent-bit/plugins/in_tail/tail_fs_stat.c
+++ /dev/null
@@ -1,253 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define _DEFAULT_SOURCE
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input_plugin.h>
-
-#include <sys/types.h>
-#include <sys/stat.h>
-
-#include "tail_file.h"
-#include "tail_db.h"
-#include "tail_config.h"
-#include "tail_signal.h"
-
-#ifdef FLB_SYSTEM_WINDOWS
-#include "win32.h"
-#endif
-
-struct fs_stat {
- /* last time check */
- time_t checked;
-
- /* previous status */
- struct stat st;
-};
-
-static int tail_fs_event(struct flb_input_instance *ins,
- struct flb_config *config, void *in_context)
-{
- int ret;
- struct mk_list *head;
- struct mk_list *tmp;
- struct flb_tail_config *ctx = in_context;
- struct flb_tail_file *file = NULL;
- struct fs_stat *fst;
- struct stat st;
- time_t t;
-
- t = time(NULL);
-
- /* Lookup watched file */
- mk_list_foreach_safe(head, tmp, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- fst = file->fs_backend;
-
- /* Check current status of the file */
- ret = fstat(file->fd, &st);
- if (ret == -1) {
- flb_errno();
- continue;
- }
-
- /* Check if the file was modified */
- if ((fst->st.st_mtime != st.st_mtime) ||
- (fst->st.st_size != st.st_size)) {
- /* Update stat info and trigger the notification */
- memcpy(&fst->st, &st, sizeof(struct stat));
- fst->checked = t;
- in_tail_collect_event(file, config);
- }
- }
-
- return 0;
-}
-
-static int tail_fs_check(struct flb_input_instance *ins,
- struct flb_config *config, void *in_context)
-{
- int ret;
- int64_t offset;
- char *name;
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_tail_config *ctx = in_context;
- struct flb_tail_file *file = NULL;
- struct fs_stat *fst;
- struct stat st;
-
- /* Lookup watched file */
- mk_list_foreach_safe(head, tmp, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- fst = file->fs_backend;
-
- ret = fstat(file->fd, &st);
- if (ret == -1) {
- flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name);
- flb_tail_file_remove(file);
- continue;
- }
-
- /* Check if the file have been deleted */
- if (st.st_nlink == 0) {
- flb_plg_debug(ctx->ins, "file has been deleted: %s", file->name);
-#ifdef FLB_HAVE_SQLDB
- if (ctx->db) {
- /* Remove file entry from the database */
- flb_tail_db_file_delete(file, ctx);
- }
-#endif
- flb_tail_file_remove(file);
- continue;
- }
-
- /* Check if the file was truncated */
- if (file->offset > st.st_size) {
- offset = lseek(file->fd, 0, SEEK_SET);
- if (offset == -1) {
- flb_errno();
- return -1;
- }
-
- flb_plg_debug(ctx->ins, "file truncated %s", file->name);
- file->offset = offset;
- file->buf_len = 0;
- memcpy(&fst->st, &st, sizeof(struct stat));
-
-#ifdef FLB_HAVE_SQLDB
- /* Update offset in database file */
- if (ctx->db) {
- flb_tail_db_file_offset(file, ctx);
- }
-#endif
- }
-
- if (file->offset < st.st_size) {
- file->pending_bytes = (st.st_size - file->offset);
- tail_signal_pending(ctx);
- }
- else {
- file->pending_bytes = 0;
- }
-
-
- /* Discover the current file name for the open file descriptor */
- name = flb_tail_file_name(file);
- if (!name) {
- flb_plg_debug(ctx->ins, "could not resolve %s, removing", file->name);
- flb_tail_file_remove(file);
- continue;
- }
-
- /*
- * Check if file still exists. This method requires explicity that the
- * user is using an absolute path, otherwise we will be rotating the
- * wrong file.
- *
- * flb_tail_target_file_name_cmp is a deeper compare than
- * flb_tail_file_name_cmp. If applicable, it compares to the underlying
- * real_name of the file.
- */
- if (flb_tail_file_is_rotated(ctx, file) == FLB_TRUE) {
- flb_tail_file_rotated(file);
- }
- flb_free(name);
-
- }
-
- return 0;
-}
-
-/* File System events based on stat(2) */
-int flb_tail_fs_stat_init(struct flb_input_instance *in,
- struct flb_tail_config *ctx, struct flb_config *config)
-{
- int ret;
-
- flb_plg_debug(ctx->ins, "flb_tail_fs_stat_init() initializing stat tail input");
-
- /* Set a manual timer to collect events every 0.250 seconds */
- ret = flb_input_set_collector_time(in, tail_fs_event,
- 0, 250000000, config);
- if (ret < 0) {
- return -1;
- }
- ctx->coll_fd_fs1 = ret;
-
- /* Set a manual timer to check deleted/rotated files every 2.5 seconds */
- ret = flb_input_set_collector_time(in, tail_fs_check,
- 2, 500000000, config);
- if (ret < 0) {
- return -1;
- }
- ctx->coll_fd_fs2 = ret;
-
- return 0;
-}
-
-void flb_tail_fs_stat_pause(struct flb_tail_config *ctx)
-{
- flb_input_collector_pause(ctx->coll_fd_fs1, ctx->ins);
- flb_input_collector_pause(ctx->coll_fd_fs2, ctx->ins);
-}
-
-void flb_tail_fs_stat_resume(struct flb_tail_config *ctx)
-{
- flb_input_collector_resume(ctx->coll_fd_fs1, ctx->ins);
- flb_input_collector_resume(ctx->coll_fd_fs2, ctx->ins);
-}
-
-int flb_tail_fs_stat_add(struct flb_tail_file *file)
-{
- int ret;
- struct fs_stat *fst;
-
- fst = flb_malloc(sizeof(struct fs_stat));
- if (!fst) {
- flb_errno();
- return -1;
- }
-
- fst->checked = time(NULL);
- ret = stat(file->name, &fst->st);
- if (ret == -1) {
- flb_errno();
- flb_free(fst);
- return -1;
- }
- file->fs_backend = fst;
-
- return 0;
-}
-
-int flb_tail_fs_stat_remove(struct flb_tail_file *file)
-{
- if (file->tail_mode == FLB_TAIL_EVENT) {
- flb_free(file->fs_backend);
- }
- return 0;
-}
-
-int flb_tail_fs_stat_exit(struct flb_tail_config *ctx)
-{
- (void) ctx;
- return 0;
-}
diff --git a/fluent-bit/plugins/in_tail/tail_fs_stat.h b/fluent-bit/plugins/in_tail/tail_fs_stat.h
deleted file mode 100644
index 21a0704cb..000000000
--- a/fluent-bit/plugins/in_tail/tail_fs_stat.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_FS_STAT_H
-#define FLB_TAIL_FS_STAT_H
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input.h>
-
-#include "tail_config.h"
-#include "tail_file_internal.h"
-
-int flb_tail_fs_stat_init(struct flb_input_instance *in,
- struct flb_tail_config *ctx, struct flb_config *config);
-int flb_tail_fs_stat_add(struct flb_tail_file *file);
-int flb_tail_fs_stat_remove(struct flb_tail_file *file);
-int flb_tail_fs_stat_exit(struct flb_tail_config *ctx);
-void flb_tail_fs_stat_pause(struct flb_tail_config *ctx);
-void flb_tail_fs_stat_resume(struct flb_tail_config *ctx);
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_multiline.c b/fluent-bit/plugins/in_tail/tail_multiline.c
deleted file mode 100644
index 71c031014..000000000
--- a/fluent-bit/plugins/in_tail/tail_multiline.c
+++ /dev/null
@@ -1,606 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/flb_config.h>
-#include <fluent-bit/flb_kv.h>
-
-#include "tail_config.h"
-#include "tail_multiline.h"
-
-static int tail_mult_append(struct flb_parser *parser,
- struct flb_tail_config *ctx)
-{
- struct flb_tail_mult *mp;
-
- mp = flb_malloc(sizeof(struct flb_tail_mult));
- if (!mp) {
- flb_errno();
- return -1;
- }
-
- mp->parser = parser;
- mk_list_add(&mp->_head, &ctx->mult_parsers);
-
- return 0;
-}
-
-int flb_tail_mult_create(struct flb_tail_config *ctx,
- struct flb_input_instance *ins,
- struct flb_config *config)
-{
- int ret;
- const char *tmp;
- struct mk_list *head;
- struct flb_parser *parser;
- struct flb_kv *kv;
-
- if (ctx->multiline_flush <= 0) {
- ctx->multiline_flush = 1;
- }
-
- mk_list_init(&ctx->mult_parsers);
-
- /* Get firstline parser */
- tmp = flb_input_get_property("parser_firstline", ins);
- if (!tmp) {
- flb_plg_error(ctx->ins, "multiline: no parser defined for firstline");
- return -1;
- }
- parser = flb_parser_get(tmp, config);
- if (!parser) {
- flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", tmp);
- return -1;
- }
-
- ctx->mult_parser_firstline = parser;
-
- /* Read all multiline rules */
- mk_list_foreach(head, &ins->properties) {
- kv = mk_list_entry(head, struct flb_kv, _head);
- if (strcasecmp("parser_firstline", kv->key) == 0) {
- continue;
- }
-
- if (strncasecmp("parser_", kv->key, 7) == 0) {
- parser = flb_parser_get(kv->val, config);
- if (!parser) {
- flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", kv->val);
- return -1;
- }
-
- ret = tail_mult_append(parser, ctx);
- if (ret == -1) {
- return -1;
- }
- }
- }
-
- return 0;
-}
-
-int flb_tail_mult_destroy(struct flb_tail_config *ctx)
-{
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_tail_mult *mp;
-
- if (ctx->multiline == FLB_FALSE) {
- return 0;
- }
-
- mk_list_foreach_safe(head, tmp, &ctx->mult_parsers) {
- mp = mk_list_entry(head, struct flb_tail_mult, _head);
- mk_list_del(&mp->_head);
- flb_free(mp);
- }
-
- return 0;
-}
-
-/* Process the result of a firstline match */
-int flb_tail_mult_process_first(time_t now,
- char *buf, size_t size,
- struct flb_time *out_time,
- struct flb_tail_file *file,
- struct flb_tail_config *ctx)
-{
- int ret;
- size_t off;
- msgpack_object map;
- msgpack_unpacked result;
-
- /* If a previous multiline context already exists, flush first */
- if (file->mult_firstline && !file->mult_skipping) {
- flb_tail_mult_flush(file, ctx);
- }
-
- /* Remark as first multiline message */
- file->mult_firstline = FLB_TRUE;
-
- /* Validate obtained time, if not set, set the current time */
- if (flb_time_to_nanosec(out_time) == 0L) {
- flb_time_get(out_time);
- }
-
- /* Should we skip this multiline record ? */
- if (ctx->ignore_older > 0) {
- if ((now - ctx->ignore_older) > out_time->tm.tv_sec) {
- flb_free(buf);
- file->mult_skipping = FLB_TRUE;
- file->mult_firstline = FLB_TRUE;
-
- /* we expect more data to skip */
- return FLB_TAIL_MULT_MORE;
- }
- }
-
- /* Re-initiate buffers */
- msgpack_sbuffer_init(&file->mult_sbuf);
- msgpack_packer_init(&file->mult_pck, &file->mult_sbuf, msgpack_sbuffer_write);
-
- /*
- * flb_parser_do() always return a msgpack buffer, so we tweak our
- * local msgpack reference to avoid an extra allocation. The only
- * concern is that we don't know what's the real size of the memory
- * allocated, so we assume it's just 'out_size'.
- */
- file->mult_flush_timeout = now + (ctx->multiline_flush - 1);
- file->mult_sbuf.data = buf;
- file->mult_sbuf.size = size;
- file->mult_sbuf.alloc = size;
-
- /* Set multiline status */
- file->mult_firstline = FLB_TRUE;
- file->mult_skipping = FLB_FALSE;
- flb_time_copy(&file->mult_time, out_time);
-
- off = 0;
- msgpack_unpacked_init(&result);
- ret = msgpack_unpack_next(&result, buf, size, &off);
- if (ret != MSGPACK_UNPACK_SUCCESS) {
- msgpack_sbuffer_destroy(&file->mult_sbuf);
- msgpack_unpacked_destroy(&result);
- return FLB_TAIL_MULT_NA;
- }
-
- map = result.data;
- file->mult_keys = map.via.map.size;
- msgpack_unpacked_destroy(&result);
-
- /* We expect more data */
- return FLB_TAIL_MULT_MORE;
-}
-
-/* Append a raw log entry to the last structured field in the mult buffer */
-static inline void flb_tail_mult_append_raw(char *buf, int size,
- struct flb_tail_file *file,
- struct flb_tail_config *config)
-{
- /* Append the raw string */
- msgpack_pack_str(&file->mult_pck, size);
- msgpack_pack_str_body(&file->mult_pck, buf, size);
-}
-
-/* Check if the last key value type of a map is string or not */
-static inline int is_last_key_val_string(char *buf, size_t size)
-{
- int ret = FLB_FALSE;
- size_t off;
- msgpack_unpacked result;
- msgpack_object v;
- msgpack_object root;
-
- off = 0;
- msgpack_unpacked_init(&result);
- ret = msgpack_unpack_next(&result, buf, size, &off);
- if (ret != MSGPACK_UNPACK_SUCCESS) {
- return ret;
- }
-
- root = result.data;
- if (root.type != MSGPACK_OBJECT_MAP) {
- ret = FLB_FALSE;
- }
- else {
- if (root.via.map.size == 0) {
- ret = FLB_FALSE;
- }
- else {
- v = root.via.map.ptr[root.via.map.size - 1].val;
- if (v.type == MSGPACK_OBJECT_STR) {
- ret = FLB_TRUE;
- }
- }
- }
-
- msgpack_unpacked_destroy(&result);
- return ret;
-}
-
-int flb_tail_mult_process_content(time_t now,
- char *buf, size_t len,
- struct flb_tail_file *file,
- struct flb_tail_config *ctx,
- size_t processed_bytes)
-{
- int ret;
- size_t off;
- void *out_buf;
- size_t out_size = 0;
- struct mk_list *head;
- struct flb_tail_mult *mult_parser = NULL;
- struct flb_time out_time = {0};
- msgpack_object map;
- msgpack_unpacked result;
-
- /* Always check if this line is the beginning of a new multiline message */
- ret = flb_parser_do(ctx->mult_parser_firstline,
- buf, len,
- &out_buf, &out_size, &out_time);
- if (ret >= 0) {
- /*
- * The content is a candidate for a firstline, but we need to perform
- * the extra-mandatory check where the last key value type must be
- * a string, otherwise no string concatenation with continuation lines
- * will be possible.
- */
- ret = is_last_key_val_string(out_buf, out_size);
- if (ret == FLB_TRUE)
- file->mult_firstline_append = FLB_TRUE;
- else
- file->mult_firstline_append = FLB_FALSE;
-
- flb_tail_mult_process_first(now, out_buf, out_size, &out_time,
- file, ctx);
- return FLB_TAIL_MULT_MORE;
- }
-
- if (file->mult_skipping == FLB_TRUE) {
- return FLB_TAIL_MULT_MORE;
- }
-
- /*
- * Once here means we have some data that is a continuation, iterate
- * parsers trying to find a match
- */
- out_buf = NULL;
- mk_list_foreach(head, &ctx->mult_parsers) {
- mult_parser = mk_list_entry(head, struct flb_tail_mult, _head);
-
- /* Process line text with current parser */
- out_buf = NULL;
- out_size = 0;
- ret = flb_parser_do(mult_parser->parser,
- buf, len,
- &out_buf, &out_size, &out_time);
- if (ret < 0) {
- mult_parser = NULL;
- continue;
- }
-
- /* The line was processed, break the loop and buffer the data */
- break;
- }
-
- if (!mult_parser) {
- /*
- * If no parser was found means the string log must be appended
- * to the last structured field.
- */
- if (file->mult_firstline && file->mult_firstline_append) {
- flb_tail_mult_append_raw(buf, len, file, ctx);
- }
- else {
- flb_tail_file_pack_line(NULL, buf, len, file, processed_bytes);
- }
-
- return FLB_TAIL_MULT_MORE;
- }
-
- off = 0;
- msgpack_unpacked_init(&result);
- msgpack_unpack_next(&result, out_buf, out_size, &off);
- map = result.data;
-
- /* Append new map to our local msgpack buffer */
- file->mult_keys += map.via.map.size;
- msgpack_unpacked_destroy(&result);
- msgpack_sbuffer_write(&file->mult_sbuf, out_buf, out_size);
- flb_free(out_buf);
-
- return FLB_TAIL_MULT_MORE;
-}
-
-static int flb_tail_mult_pack_line_body(
- struct flb_log_event_encoder *context,
- struct flb_tail_file *file)
-{
- size_t adjacent_object_offset;
- size_t continuation_length;
- msgpack_unpacked adjacent_object;
- msgpack_unpacked current_object;
- size_t entry_index;
- msgpack_object entry_value;
- msgpack_object entry_key;
- msgpack_object_map *data_map;
- int map_size;
- size_t offset;
- struct flb_tail_config *config;
- int result;
-
- result = FLB_EVENT_ENCODER_SUCCESS;
- config = (struct flb_tail_config *) file->config;
-
- /* New Map size */
- map_size = file->mult_keys;
-
- if (file->config->path_key != NULL) {
- map_size++;
-
- result = flb_log_event_encoder_append_body_values(
- context,
- FLB_LOG_EVENT_CSTRING_VALUE(config->path_key),
- FLB_LOG_EVENT_CSTRING_VALUE(file->name));
- }
-
-
- msgpack_unpacked_init(&current_object);
- msgpack_unpacked_init(&adjacent_object);
-
- offset = 0;
-
- while (result == FLB_EVENT_ENCODER_SUCCESS &&
- msgpack_unpack_next(&current_object,
- file->mult_sbuf.data,
- file->mult_sbuf.size,
- &offset) == MSGPACK_UNPACK_SUCCESS) {
- if (current_object.data.type != MSGPACK_OBJECT_MAP) {
- continue;
- }
-
- data_map = &current_object.data.via.map;
-
- continuation_length = 0;
-
- for (entry_index = 0; entry_index < data_map->size; entry_index++) {
- entry_key = data_map->ptr[entry_index].key;
- entry_value = data_map->ptr[entry_index].val;
-
- result = flb_log_event_encoder_append_body_msgpack_object(context,
- &entry_key);
-
- if (result != FLB_EVENT_ENCODER_SUCCESS) {
- break;
- }
-
- /* Check if this is the last entry in the map and if that is
- * the case then add the lengths of all the trailing string
- * objects after the map in order to append them to the value
- * but only if the value object is a string
- */
- if (entry_index + 1 == data_map->size &&
- entry_value.type == MSGPACK_OBJECT_STR) {
- adjacent_object_offset = offset;
-
- while (msgpack_unpack_next(
- &adjacent_object,
- file->mult_sbuf.data,
- file->mult_sbuf.size,
- &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) {
- if (adjacent_object.data.type != MSGPACK_OBJECT_STR) {
- break;
- }
-
- /* Sum total bytes to append */
- continuation_length += adjacent_object.data.via.str.size + 1;
- }
-
- result = flb_log_event_encoder_append_body_string_length(
- context,
- entry_value.via.str.size +
- continuation_length);
-
- if (result != FLB_EVENT_ENCODER_SUCCESS) {
- break;
- }
-
- result = flb_log_event_encoder_append_body_string_body(
- context,
- (char *) entry_value.via.str.ptr,
- entry_value.via.str.size);
-
- if (result != FLB_EVENT_ENCODER_SUCCESS) {
- break;
- }
-
- if (continuation_length > 0) {
- adjacent_object_offset = offset;
-
- while (msgpack_unpack_next(
- &adjacent_object,
- file->mult_sbuf.data,
- file->mult_sbuf.size,
- &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) {
- if (adjacent_object.data.type != MSGPACK_OBJECT_STR) {
- break;
- }
-
- result = flb_log_event_encoder_append_body_string_body(
- context,
- "\n",
- 1);
-
- if (result != FLB_EVENT_ENCODER_SUCCESS) {
- break;
- }
-
- result = flb_log_event_encoder_append_body_string_body(
- context,
- (char *) adjacent_object.data.via.str.ptr,
- adjacent_object.data.via.str.size);
-
- if (result != FLB_EVENT_ENCODER_SUCCESS) {
- break;
- }
- }
- }
- }
- else {
- result = flb_log_event_encoder_append_body_msgpack_object(context,
- &entry_value);
- }
- }
- }
-
- msgpack_unpacked_destroy(&current_object);
- msgpack_unpacked_destroy(&adjacent_object);
-
- /* Reset status */
- file->mult_firstline = FLB_FALSE;
- file->mult_skipping = FLB_FALSE;
- file->mult_keys = 0;
- file->mult_flush_timeout = 0;
-
- msgpack_sbuffer_destroy(&file->mult_sbuf);
-
- file->mult_sbuf.data = NULL;
-
- flb_time_zero(&file->mult_time);
-
- return result;
-}
-
-/* Flush any multiline context data into outgoing buffers */
-int flb_tail_mult_flush(struct flb_tail_file *file, struct flb_tail_config *ctx)
-{
- int result;
-
- /* nothing to flush */
- if (file->mult_firstline == FLB_FALSE) {
- return -1;
- }
-
- if (file->mult_keys == 0) {
- return -1;
- }
-
- result = flb_log_event_encoder_begin_record(file->ml_log_event_encoder);
-
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_set_timestamp(
- file->ml_log_event_encoder, &file->mult_time);
- }
-
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_tail_mult_pack_line_body(
- file->ml_log_event_encoder,
- file);
- }
-
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- result = flb_log_event_encoder_commit_record(
- file->ml_log_event_encoder);
- }
-
- if (result == FLB_EVENT_ENCODER_SUCCESS) {
- flb_input_log_append(ctx->ins,
- file->tag_buf,
- file->tag_len,
- file->ml_log_event_encoder->output_buffer,
- file->ml_log_event_encoder->output_length);
- result = 0;
- }
- else {
- flb_plg_error(file->config->ins, "error packing event : %d", result);
-
- result = -1;
- }
-
- flb_log_event_encoder_reset(file->ml_log_event_encoder);
-
- return result;
-}
-
-static void file_pending_flush(struct flb_tail_config *ctx,
- struct flb_tail_file *file, time_t now)
-{
- if (file->mult_flush_timeout > now) {
- return;
- }
-
- if (file->mult_firstline == FLB_FALSE) {
- if (file->mult_sbuf.data == NULL || file->mult_sbuf.size <= 0) {
- return;
- }
- }
-
- flb_tail_mult_flush(file, ctx);
-}
-
-int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx)
-{
- time_t expired;
- struct mk_list *head;
- struct flb_tail_file *file;
-
- expired = time(NULL) + 3600;
-
- /* Iterate promoted event files with pending bytes */
- mk_list_foreach(head, &ctx->files_static) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- file_pending_flush(ctx, file, expired);
- }
-
- /* Iterate promoted event files with pending bytes */
- mk_list_foreach(head, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
- file_pending_flush(ctx, file, expired);
- }
-
- return 0;
-}
-
-int flb_tail_mult_pending_flush(struct flb_input_instance *ins,
- struct flb_config *config, void *context)
-{
- time_t now;
- struct mk_list *head;
- struct flb_tail_file *file;
- struct flb_tail_config *ctx = context;
-
- now = time(NULL);
-
- /* Iterate promoted event files with pending bytes */
- mk_list_foreach(head, &ctx->files_static) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
-
- file_pending_flush(ctx, file, now);
- }
-
- /* Iterate promoted event files with pending bytes */
- mk_list_foreach(head, &ctx->files_event) {
- file = mk_list_entry(head, struct flb_tail_file, _head);
-
- file_pending_flush(ctx, file, now);
- }
-
- return 0;
-}
diff --git a/fluent-bit/plugins/in_tail/tail_multiline.h b/fluent-bit/plugins/in_tail/tail_multiline.h
deleted file mode 100644
index d7f7539b1..000000000
--- a/fluent-bit/plugins/in_tail/tail_multiline.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_TAIL_MULT_H
-#define FLB_TAIL_TAIL_MULT_H
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_input.h>
-
-#include "tail_config.h"
-#include "tail_file.h"
-
-#define FLB_TAIL_MULT_NA -1 /* not applicable as a multiline stream */
-#define FLB_TAIL_MULT_DONE 0 /* finished a multiline stream */
-#define FLB_TAIL_MULT_MORE 1 /* expect more lines to come */
-#define FLB_TAIL_MULT_FLUSH "4" /* max flush time for multiline: 4 seconds */
-
-struct flb_tail_mult {
- struct flb_parser *parser;
- struct mk_list _head;
-};
-
-int flb_tail_mult_create(struct flb_tail_config *ctx,
- struct flb_input_instance *ins,
- struct flb_config *config);
-
-int flb_tail_mult_destroy(struct flb_tail_config *ctx);
-
-int flb_tail_mult_process_content(time_t now,
- char *buf, size_t len,
- struct flb_tail_file *file,
- struct flb_tail_config *ctx,
- size_t processed_bytes);
-int flb_tail_mult_flush(struct flb_tail_file *file,
- struct flb_tail_config *ctx);
-
-int flb_tail_mult_pending_flush(struct flb_input_instance *ins,
- struct flb_config *config, void *context);
-int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx);
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_scan.c b/fluent-bit/plugins/in_tail/tail_scan.c
deleted file mode 100644
index ccb8e070a..000000000
--- a/fluent-bit/plugins/in_tail/tail_scan.c
+++ /dev/null
@@ -1,71 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_input_plugin.h>
-#include "tail.h"
-#include "tail_config.h"
-
-/*
- * Include proper scan backend
- */
-#ifdef FLB_SYSTEM_WINDOWS
-#include "tail_scan_win32.c"
-#else
-#include "tail_scan_glob.c"
-#endif
-
-int flb_tail_scan(struct mk_list *path_list, struct flb_tail_config *ctx)
-{
- int ret;
- struct mk_list *head;
- struct flb_slist_entry *pattern;
-
- mk_list_foreach(head, path_list) {
- pattern = mk_list_entry(head, struct flb_slist_entry, _head);
- ret = tail_scan_path(pattern->str, ctx);
- if (ret == -1) {
- flb_plg_warn(ctx->ins, "error scanning path: %s", pattern->str);
- }
- else {
- flb_plg_debug(ctx->ins, "%i new files found on path '%s'",
- ret, pattern->str);
- }
- }
-
- return 0;
-}
-
-/*
- * Triggered by refresh_interval, it re-scan the path looking for new files
- * that match the original path pattern.
- */
-int flb_tail_scan_callback(struct flb_input_instance *ins,
- struct flb_config *config, void *context)
-{
- int ret;
- struct flb_tail_config *ctx = context;
- (void) config;
-
- ret = flb_tail_scan(ctx->path_list, ctx);
- if (ret > 0) {
- flb_plg_debug(ins, "%i new files found", ret);
- }
-
- return ret;
-}
diff --git a/fluent-bit/plugins/in_tail/tail_scan.h b/fluent-bit/plugins/in_tail/tail_scan.h
deleted file mode 100644
index ec3c96a2a..000000000
--- a/fluent-bit/plugins/in_tail/tail_scan.h
+++ /dev/null
@@ -1,29 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_SCAN_H
-#define FLB_TAIL_SCAN_H
-
-#include "tail_config.h"
-
-int flb_tail_scan(struct mk_list *path, struct flb_tail_config *ctx);
-int flb_tail_scan_callback(struct flb_input_instance *ins,
- struct flb_config *config, void *context);
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_scan_glob.c b/fluent-bit/plugins/in_tail/tail_scan_glob.c
deleted file mode 100644
index b330b7c3b..000000000
--- a/fluent-bit/plugins/in_tail/tail_scan_glob.c
+++ /dev/null
@@ -1,278 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <glob.h>
-#include <fnmatch.h>
-
-#include <fluent-bit/flb_compat.h>
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/flb_utils.h>
-
-#include "tail.h"
-#include "tail_file.h"
-#include "tail_signal.h"
-#include "tail_scan.h"
-#include "tail_config.h"
-
-/* Define missing GLOB_TILDE if not exists */
-#ifndef GLOB_TILDE
-#define GLOB_TILDE 1<<2 /* use GNU Libc value */
-#define UNSUP_TILDE 1
-
-/* we need these extra headers for path resolution */
-#include <limits.h>
-#include <sys/types.h>
-#include <pwd.h>
-
-static char *expand_tilde(const char *path)
-{
- int len;
- char user[256];
- char *p = NULL;
- char *dir = NULL;
- char *tmp = NULL;
- struct passwd *uinfo = NULL;
-
- if (path[0] == '~') {
- p = strchr(path, '/');
-
- if (p) {
- /* check case '~/' */
- if ((p - path) == 1) {
- dir = getenv("HOME");
- if (!dir) {
- return path;
- }
- }
- else {
- /*
- * it refers to a different user: ~user/abc, first step grab
- * the user name.
- */
- len = (p - path) - 1;
- memcpy(user, path + 1, len);
- user[len] = '\0';
-
- /* use getpwnam() to resolve user information */
- uinfo = getpwnam(user);
- if (!uinfo) {
- return path;
- }
-
- dir = uinfo->pw_dir;
- }
- }
- else {
- dir = getenv("HOME");
- if (!dir) {
- return path;
- }
- }
-
- if (p) {
- tmp = flb_malloc(PATH_MAX);
- if (!tmp) {
- flb_errno();
- return NULL;
- }
- snprintf(tmp, PATH_MAX - 1, "%s%s", dir, p);
- }
- else {
- dir = getenv("HOME");
- if (!dir) {
- return path;
- }
-
- tmp = flb_strdup(dir);
- if (!tmp) {
- return path;
- }
- }
-
- return tmp;
- }
-
- return path;
-}
-#endif
-
-static int tail_is_excluded(char *path, struct flb_tail_config *ctx)
-{
- struct mk_list *head;
- struct flb_slist_entry *pattern;
-
- if (!ctx->exclude_list) {
- return FLB_FALSE;
- }
-
- mk_list_foreach(head, ctx->exclude_list) {
- pattern = mk_list_entry(head, struct flb_slist_entry, _head);
- if (fnmatch(pattern->str, path, 0) == 0) {
- return FLB_TRUE;
- }
- }
-
- return FLB_FALSE;
-}
-
-static inline int do_glob(const char *pattern, int flags,
- void *not_used, glob_t *pglob)
-{
- int ret;
- int new_flags;
- char *tmp = NULL;
- int tmp_needs_free = FLB_FALSE;
- (void) not_used;
-
- /* Save current values */
- new_flags = flags;
-
- if (flags & GLOB_TILDE) {
-#ifdef UNSUP_TILDE
- /*
- * Some libc libraries like Musl do not support GLOB_TILDE for tilde
- * expansion. A workaround is to use wordexp(3) but looking at it
- * implementation in Musl it looks quite expensive:
- *
- * http://git.musl-libc.org/cgit/musl/tree/src/misc/wordexp.c
- *
- * the workaround is to do our own tilde expansion in a temporary buffer.
- */
-
- /* Look for a tilde */
- tmp = expand_tilde(pattern);
- if (tmp != pattern) {
- /* the path was expanded */
- pattern = tmp;
- tmp_needs_free = FLB_TRUE;
- }
-
- /* remove unused flag */
- new_flags &= ~GLOB_TILDE;
-#endif
- }
-
- /* invoke glob with new parameters */
- ret = glob(pattern, new_flags, NULL, pglob);
-
- /* remove temporary buffer, if allocated by expand_tilde above.
- * Note that this buffer is only used for libc implementations
- * that do not support the GLOB_TILDE flag, like musl. */
- if ((tmp != NULL) && (tmp_needs_free == FLB_TRUE)) {
- flb_free(tmp);
- }
-
- return ret;
-}
-
-/* Scan a path, register the entries and return how many */
-static int tail_scan_path(const char *path, struct flb_tail_config *ctx)
-{
- int i;
- int ret;
- int count = 0;
- glob_t globbuf;
- time_t now;
- int64_t mtime;
- struct stat st;
-
- flb_plg_debug(ctx->ins, "scanning path %s", path);
-
- /* Safe reset for globfree() */
- globbuf.gl_pathv = NULL;
-
- /* Scan the given path */
- ret = do_glob(path, GLOB_TILDE | GLOB_ERR, NULL, &globbuf);
- if (ret != 0) {
- switch (ret) {
- case GLOB_NOSPACE:
- flb_plg_error(ctx->ins, "no memory space available");
- return -1;
- case GLOB_ABORTED:
- flb_plg_error(ctx->ins, "read error, check permissions: %s", path);
- return -1;
- case GLOB_NOMATCH:
- ret = stat(path, &st);
- if (ret == -1) {
- flb_plg_debug(ctx->ins, "cannot read info from: %s", path);
- }
- else {
- ret = access(path, R_OK);
- if (ret == -1 && errno == EACCES) {
- flb_plg_error(ctx->ins, "NO read access for path: %s", path);
- }
- else {
- flb_plg_debug(ctx->ins, "NO matches for path: %s", path);
- }
- }
- return 0;
- }
- }
-
-
- /* For every entry found, generate an output list */
- now = time(NULL);
- for (i = 0; i < globbuf.gl_pathc; i++) {
- ret = stat(globbuf.gl_pathv[i], &st);
- if (ret == 0 && S_ISREG(st.st_mode)) {
- /* Check if this file is blacklisted */
- if (tail_is_excluded(globbuf.gl_pathv[i], ctx) == FLB_TRUE) {
- flb_plg_debug(ctx->ins, "excluded=%s", globbuf.gl_pathv[i]);
- continue;
- }
-
- if (ctx->ignore_older > 0) {
- mtime = flb_tail_stat_mtime(&st);
- if (mtime > 0) {
- if ((now - ctx->ignore_older) > mtime) {
- flb_plg_debug(ctx->ins, "excluded=%s (ignore_older)",
- globbuf.gl_pathv[i]);
- continue;
- }
- }
- }
-
- /* Append file to list */
- ret = flb_tail_file_append(globbuf.gl_pathv[i], &st,
- FLB_TAIL_STATIC, ctx);
- if (ret == 0) {
- flb_plg_debug(ctx->ins, "scan_glob add(): %s, inode %li",
- globbuf.gl_pathv[i], st.st_ino);
- count++;
- }
- else {
- flb_plg_debug(ctx->ins, "scan_blog add(): dismissed: %s, inode %li",
- globbuf.gl_pathv[i], st.st_ino);
- }
- }
- else {
- flb_plg_debug(ctx->ins, "skip (invalid) entry=%s",
- globbuf.gl_pathv[i]);
- }
- }
-
- if (count > 0) {
- tail_signal_manager(ctx);
- }
-
- globfree(&globbuf);
- return count;
-}
diff --git a/fluent-bit/plugins/in_tail/tail_scan_win32.c b/fluent-bit/plugins/in_tail/tail_scan_win32.c
deleted file mode 100644
index 94733f065..000000000
--- a/fluent-bit/plugins/in_tail/tail_scan_win32.c
+++ /dev/null
@@ -1,245 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * This file implements glob-like patch matching feature for Windows
- * based on Win32 API.
- */
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_compat.h>
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/flb_utils.h>
-
-#include <shlwapi.h>
-
-#include "tail.h"
-#include "tail_file.h"
-#include "tail_signal.h"
-#include "tail_config.h"
-
-#include "win32.h"
-
-static int tail_is_excluded(char *path, struct flb_tail_config *ctx)
-{
- struct mk_list *head;
- struct flb_slist_entry *pattern;
-
- if (!ctx->exclude_list) {
- return FLB_FALSE;
- }
-
- mk_list_foreach(head, ctx->exclude_list) {
- pattern = mk_list_entry(head, struct flb_slist_entry, _head);
- if (PathMatchSpecA(path, pattern->str)) {
- return FLB_TRUE;
- }
- }
-
- return FLB_FALSE;
-}
-
-/*
- * This function is a thin wrapper over flb_tail_file_append(),
- * adding normalization and sanity checks on top of it.
- */
-static int tail_register_file(const char *target, struct flb_tail_config *ctx,
- time_t ts)
-{
- int64_t mtime;
- struct stat st;
- char path[MAX_PATH];
-
- if (_fullpath(path, target, MAX_PATH) == NULL) {
- flb_plg_error(ctx->ins, "cannot get absolute path of %s", target);
- return -1;
- }
-
- if (stat(path, &st) != 0 || !S_ISREG(st.st_mode)) {
- return -1;
- }
-
- if (ctx->ignore_older > 0) {
- mtime = flb_tail_stat_mtime(&st);
- if (mtime > 0) {
- if ((ts - ctx->ignore_older) > mtime) {
- flb_plg_debug(ctx->ins, "excluded=%s (ignore_older)",
- target);
- return -1;
- }
- }
- }
-
- if (tail_is_excluded(path, ctx) == FLB_TRUE) {
- flb_plg_trace(ctx->ins, "skip '%s' (excluded)", path);
- return -1;
- }
-
- return flb_tail_file_append(path, &st, FLB_TAIL_STATIC, ctx);
-}
-
-/*
- * Perform patern match on the given path string. This function
- * supports patterns with "nested" wildcards like below.
- *
- * tail_scan_pattern("C:\fluent-bit\*\*.txt", ctx);
- *
- * On success, the number of files found is returned (zero indicates
- * "no file found"). On error, -1 is returned.
- */
-static int tail_scan_pattern(const char *path, struct flb_tail_config *ctx)
-{
- char *star, *p0, *p1;
- char pattern[MAX_PATH];
- char buf[MAX_PATH];
- int ret;
- int n_added = 0;
- time_t now;
- int64_t mtime;
- HANDLE h;
- WIN32_FIND_DATA data;
-
- if (strlen(path) > MAX_PATH - 1) {
- flb_plg_error(ctx->ins, "path too long '%s'");
- return -1;
- }
-
- star = strchr(path, '*');
- if (star == NULL) {
- return -1;
- }
-
- /*
- * C:\data\tmp\input_*.conf
- * 0<-----|
- */
- p0 = star;
- while (path <= p0 && *p0 != '\\') {
- p0--;
- }
-
- /*
- * C:\data\tmp\input_*.conf
- * |---->1
- */
- p1 = star;
- while (*p1 && *p1 != '\\') {
- p1++;
- }
-
- memcpy(pattern, path, (p1 - path));
- pattern[p1 - path] = '\0';
-
- h = FindFirstFileA(pattern, &data);
- if (h == INVALID_HANDLE_VALUE) {
- return 0; /* none matched */
- }
-
- now = time(NULL);
- do {
- /* Ignore the current and parent dirs */
- if (!strcmp(".", data.cFileName) || !strcmp("..", data.cFileName)) {
- continue;
- }
-
- /* Avoid an infinite loop */
- if (strchr(data.cFileName, '*')) {
- continue;
- }
-
- /* Create a path (prefix + filename + suffix) */
- memcpy(buf, path, p0 - path + 1);
- buf[p0 - path + 1] = '\0';
-
- if (strlen(buf) + strlen(data.cFileName) + strlen(p1) > MAX_PATH - 1) {
- flb_plg_warn(ctx->ins, "'%s%s%s' is too long", buf, data.cFileName, p1);
- continue;
- }
- strcat(buf, data.cFileName);
- strcat(buf, p1);
-
- if (strchr(p1, '*')) {
- ret = tail_scan_pattern(buf, ctx); /* recursive */
- if (ret >= 0) {
- n_added += ret;
- }
- continue;
- }
-
- /* Try to register the target file */
- ret = tail_register_file(buf, ctx, now);
- if (ret == 0) {
- n_added++;
- }
- } while (FindNextFileA(h, &data) != 0);
-
- FindClose(h);
- return n_added;
-}
-
-static int tail_filepath(char *buf, int len, const char *basedir, const char *filename)
-{
- char drive[_MAX_DRIVE];
- char dir[_MAX_DIR];
- char fname[_MAX_FNAME];
- char ext[_MAX_EXT];
- char tmp[MAX_PATH];
- int ret;
-
- ret = _splitpath_s(basedir, drive, _MAX_DRIVE, dir, _MAX_DIR, NULL, 0, NULL, 0);
- if (ret) {
- return -1;
- }
-
- ret = _splitpath_s(filename, NULL, 0, NULL, 0, fname, _MAX_FNAME, ext, _MAX_EXT);
- if (ret) {
- return -1;
- }
-
- ret = _makepath_s(tmp, MAX_PATH, drive, dir, fname, ext);
- if (ret) {
- return -1;
- }
-
- if (_fullpath(buf, tmp, len) == NULL) {
- return -1;
- }
-
- return 0;
-}
-
-static int tail_scan_path(const char *path, struct flb_tail_config *ctx)
-{
- int ret;
- int n_added = 0;
- time_t now;
-
- if (strchr(path, '*')) {
- return tail_scan_pattern(path, ctx);
- }
-
- /* No wildcard involved. Let's just handle the file... */
- now = time(NULL);
- ret = tail_register_file(path, ctx, now);
- if (ret == 0) {
- n_added++;
- }
-
- return n_added;
-}
diff --git a/fluent-bit/plugins/in_tail/tail_signal.h b/fluent-bit/plugins/in_tail/tail_signal.h
deleted file mode 100644
index 1a81fec64..000000000
--- a/fluent-bit/plugins/in_tail/tail_signal.h
+++ /dev/null
@@ -1,98 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_SIGNAL_H
-#define FLB_TAIL_SIGNAL_H
-
-#include "tail_config.h"
-
-static inline int tail_signal_manager(struct flb_tail_config *ctx)
-{
- int n;
- uint64_t val = 0xc001;
-
- /*
- * The number of signal reads might be less than the written signals, this
- * means that some event is still pending in the queue. On that case we
- * don't need to signal it again.
- */
- if (ctx->ch_reads < ctx->ch_writes) {
- return 1;
- }
-
- /* Reset counters: prevent an overflow, unlikely..but let's keep safe */
- if (ctx->ch_reads == ctx->ch_writes) {
- ctx->ch_reads = 0;
- ctx->ch_writes = 0;
- }
-
- /* Insert a dummy event into the channel manager */
- n = flb_pipe_w(ctx->ch_manager[1], (const char *) &val, sizeof(val));
- if (n == -1) {
- flb_errno();
- return -1;
- }
- else {
- ctx->ch_writes++;
- }
-
- return n;
-}
-
-static inline int tail_signal_pending(struct flb_tail_config *ctx)
-{
- int n;
- uint64_t val = 0xc002;
-
- /* Insert a dummy event into the 'pending' channel */
- n = flb_pipe_w(ctx->ch_pending[1], (const char *) &val, sizeof(val));
-
- /*
- * If we get EAGAIN, it simply means pending channel is full. As
- * notification is already pending, it's safe to ignore.
- */
- if (n == -1 && !FLB_PIPE_WOULDBLOCK()) {
- flb_errno();
- return -1;
- }
-
- return n;
-}
-
-static inline int tail_consume_pending(struct flb_tail_config *ctx)
-{
- int ret;
- uint64_t val;
-
- /*
- * We need to consume the pending bytes. Loop until we would have
- * blocked (pipe is empty).
- */
- do {
- ret = flb_pipe_r(ctx->ch_pending[0], (char *) &val, sizeof(val));
- if (ret <= 0 && !FLB_PIPE_WOULDBLOCK()) {
- flb_errno();
- return -1;
- }
- } while (!FLB_PIPE_WOULDBLOCK());
-
- return 0;
-}
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/tail_sql.h b/fluent-bit/plugins/in_tail/tail_sql.h
deleted file mode 100644
index 855933a01..000000000
--- a/fluent-bit/plugins/in_tail/tail_sql.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_SQL_H
-#define FLB_TAIL_SQL_H
-
-/*
- * In Fluent Bit we try to have a common convention for table names,
- * if the table belong to an input/output plugin, use plugin name
- * plus to what it's about, e.g:
- *
- * in_tail plugin table to track files: in_tail_files
- */
-#define SQL_CREATE_FILES \
- "CREATE TABLE IF NOT EXISTS in_tail_files (" \
- " id INTEGER PRIMARY KEY," \
- " name TEXT NOT NULL," \
- " offset INTEGER," \
- " inode INTEGER," \
- " created INTEGER," \
- " rotated INTEGER DEFAULT 0" \
- ");"
-
-#define SQL_GET_FILE \
- "SELECT * from in_tail_files WHERE inode=@inode order by id desc;"
-
-#define SQL_INSERT_FILE \
- "INSERT INTO in_tail_files (name, offset, inode, created)" \
- " VALUES (@name, @offset, @inode, @created);"
-
-#define SQL_ROTATE_FILE \
- "UPDATE in_tail_files set name=@name,rotated=1 WHERE id=@id;"
-
-#define SQL_UPDATE_OFFSET \
- "UPDATE in_tail_files set offset=@offset WHERE id=@id;"
-
-#define SQL_DELETE_FILE \
- "DELETE FROM in_tail_files WHERE id=@id;"
-
-#define SQL_PRAGMA_SYNC \
- "PRAGMA synchronous=%i;"
-
-#define SQL_PRAGMA_JOURNAL_MODE \
- "PRAGMA journal_mode=%s;"
-
-#define SQL_PRAGMA_LOCKING_MODE \
- "PRAGMA locking_mode=EXCLUSIVE;"
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/win32.h b/fluent-bit/plugins/in_tail/win32.h
deleted file mode 100644
index a9414f892..000000000
--- a/fluent-bit/plugins/in_tail/win32.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * This is the interface file that replaces POSIX functions
- * with our own custom implementation.
- */
-
-#ifndef FLB_TAIL_WIN32_H
-#define FLB_TAIL_WIN32_H
-
-#include "win32/interface.h"
-
-#undef open
-#undef stat
-#undef lstat
-#undef fstat
-#undef lseek
-
-#undef S_IFDIR
-#undef S_IFCHR
-#undef S_IFIFO
-#undef S_IFREG
-#undef S_IFLNK
-#undef S_IFMT
-#undef S_ISDIR
-#undef S_ISCHR
-#undef S_ISFIFO
-#undef S_ISREG
-#undef S_ISLNK
-
-#define open win32_open
-#define stat win32_stat
-#define lstat win32_lstat
-#define fstat win32_fstat
-
-#define lseek _lseeki64
-
-#define S_IFDIR WIN32_S_IFDIR
-#define S_IFCHR WIN32_S_IFCHR
-#define S_IFIFO WIN32_S_IFIFO
-#define S_IFREG WIN32_S_IFREG
-#define S_IFLNK WIN32_S_IFLNK
-#define S_IFMT WIN32_S_IFMT
-
-#define S_ISDIR(m) (((m) & S_IFMT) == S_IFDIR)
-#define S_ISCHR(m) (((m) & S_IFMT) == S_IFCHR)
-#define S_ISIFO(m) (((m) & S_IFMT) == S_IFIFO)
-#define S_ISREG(m) (((m) & S_IFMT) == S_IFREG)
-#define S_ISLNK(m) (((m) & S_IFMT) == S_IFLNK)
-#endif
diff --git a/fluent-bit/plugins/in_tail/win32/interface.h b/fluent-bit/plugins/in_tail/win32/interface.h
deleted file mode 100644
index 73b2ef233..000000000
--- a/fluent-bit/plugins/in_tail/win32/interface.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_TAIL_WIN32_INTERFACE_H
-#define FLB_TAIL_WIN32_INTERFACE_H
-
-struct win32_stat {
- uint64_t st_ino;
- uint16_t st_mode;
- int64_t st_mtime;
- int16_t st_nlink;
- int64_t st_size;
-};
-
-int win32_stat(const char *path, struct win32_stat *wst);
-int win32_lstat(const char *path, struct win32_stat *wst);
-int win32_fstat(int fd, struct win32_stat *wst);
-
-int win32_open(const char *path, int flags);
-
-#define WIN32_S_IFDIR 0x1000
-#define WIN32_S_IFCHR 0x2000
-#define WIN32_S_IFIFO 0x4000
-#define WIN32_S_IFREG 0x8000
-#define WIN32_S_IFLNK 0xc000
-#define WIN32_S_IFMT 0xf000
-
-#endif
diff --git a/fluent-bit/plugins/in_tail/win32/io.c b/fluent-bit/plugins/in_tail/win32/io.c
deleted file mode 100644
index 45928b04a..000000000
--- a/fluent-bit/plugins/in_tail/win32/io.c
+++ /dev/null
@@ -1,47 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <Windows.h>
-#include <stdlib.h>
-#include <stdint.h>
-#include <fcntl.h>
-#include <io.h>
-#include "interface.h"
-
-/*
- * POSIX IO emulation tailored for in_tail's usage.
- *
- * open(2) that does not acquire an exclusive lock.
- */
-
-int win32_open(const char *path, int flags)
-{
- HANDLE h;
- h = CreateFileA(path,
- GENERIC_READ,
- FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,
- NULL, /* lpSecurityAttributes */
- OPEN_EXISTING, /* dwCreationDisposition */
- 0, /* dwFlagsAndAttributes */
- NULL); /* hTemplateFile */
- if (h == INVALID_HANDLE_VALUE) {
- return -1;
- }
- return _open_osfhandle((intptr_t) h, _O_RDONLY);
-}
diff --git a/fluent-bit/plugins/in_tail/win32/stat.c b/fluent-bit/plugins/in_tail/win32/stat.c
deleted file mode 100644
index bce802749..000000000
--- a/fluent-bit/plugins/in_tail/win32/stat.c
+++ /dev/null
@@ -1,332 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <Windows.h>
-#include <stdlib.h>
-#include <stdint.h>
-#include <io.h>
-#include "interface.h"
-
-/*
- * NTFS stat(2) emulation tailored for in_tail's usage.
- *
- * (1) Support st_ino (inode) for Windows NTFS.
- * (2) Support NTFS symlinks.
- * (3) Support large files >= 2GB.
- *
- * To use it, include "win32.h" and it will transparently
- * replace stat(), lstat() and fstat().
- */
-
-#define UINT64(high, low) ((uint64_t) (high) << 32 | (low))
-#define WINDOWS_TICKS_TO_SECONDS_RATIO 10000000
-#define WINDOWS_EPOCH_TO_UNIX_EPOCH_DELTA 11644473600
-
-/*
- * FILETIME timestamps are represented in 100-nanosecond intervals,
- * because of this, that's why we need to divide the number by 10000000
- * in order to convert it to seconds.
- *
- * While UNIX timestamps use January 1, 1970 as epoch Windows FILETIME
- * timestamps use January 1, 1601. Because of this we need to subtract
- * 11644473600 seconds to account for it.
- *
- * Note: Even though this does not account for leap seconds it should be
- * accurate enough.
- */
-
-static uint64_t filetime_to_epoch(FILETIME *ft)
-{
- ULARGE_INTEGER timestamp;
-
- if (ft == NULL) {
- return 0;
- }
-
- timestamp.HighPart = ft->dwHighDateTime;
- timestamp.LowPart = ft->dwLowDateTime;
-
- timestamp.QuadPart /= WINDOWS_TICKS_TO_SECONDS_RATIO;
- timestamp.QuadPart -= WINDOWS_EPOCH_TO_UNIX_EPOCH_DELTA;
-
- return timestamp.QuadPart;
-}
-
-static void reset_errno()
-{
- errno = 0;
-}
-
-static void propagate_last_error_to_errno()
-{
- DWORD error_code;
-
- error_code = GetLastError();
-
- switch (error_code) {
- case ERROR_INVALID_TARGET_HANDLE:
- case ERROR_INVALID_HANDLE:
- errno = EBADF;
- break;
-
- case ERROR_TOO_MANY_OPEN_FILES:
- errno = EMFILE;
- break;
-
- case ERROR_INVALID_FLAG_NUMBER:
- case ERROR_INVALID_PARAMETER:
- errno = EINVAL;
- break;
-
- case ERROR_NOT_ENOUGH_MEMORY:
- case ERROR_OUTOFMEMORY:
- errno = ENOMEM;
- break;
-
- case ERROR_SHARING_VIOLATION:
- case ERROR_LOCK_VIOLATION:
- case ERROR_PATH_BUSY:
- case ERROR_BUSY:
- errno = EBUSY;
- break;
-
- case ERROR_HANDLE_DISK_FULL:
- case ERROR_DISK_FULL:
- errno = ENOSPC;
- break;
-
- case ERROR_INVALID_ADDRESS:
- errno = EFAULT;
- break;
-
- case ERROR_FILE_TOO_LARGE:
- errno = EFBIG;
- break;
-
- case ERROR_ALREADY_EXISTS:
- case ERROR_FILE_EXISTS:
- errno = EEXIST;
- break;
-
- case ERROR_FILE_NOT_FOUND:
- case ERROR_PATH_NOT_FOUND:
- case ERROR_INVALID_DRIVE:
- case ERROR_BAD_PATHNAME:
- case ERROR_INVALID_NAME:
- case ERROR_BAD_UNIT:
- errno = ENOENT;
- break;
-
- case ERROR_SEEK_ON_DEVICE:
- case ERROR_NEGATIVE_SEEK:
- errno = ESPIPE;
- break;
-
- case ERROR_ACCESS_DENIED:
- errno = EACCES;
- break;
-
- case ERROR_DIR_NOT_EMPTY:
- errno = ENOTEMPTY;
- break;
-
- case ERROR_BROKEN_PIPE:
- errno = EPIPE;
- break;
-
- case ERROR_GEN_FAILURE:
- errno = EIO;
- break;
-
- case ERROR_OPEN_FAILED:
- errno = EIO;
- break;
-
- case ERROR_SUCCESS:
- errno = 0;
- break;
-
- default:
- /* This is just a canary, if you find this
- * error then it means we need to expand the
- * translation list.
- */
-
- errno = EOWNERDEAD;
- break;
- }
-}
-
-static int get_mode(unsigned int attr)
-{
- if (attr & FILE_ATTRIBUTE_DIRECTORY) {
- return WIN32_S_IFDIR;
- }
- return WIN32_S_IFREG;
-}
-
-
-
-static int is_symlink(const char *path)
-{
- WIN32_FIND_DATA data;
- HANDLE h;
-
- SetLastError(0);
- reset_errno();
-
- h = FindFirstFileA(path, &data);
-
- if (h == INVALID_HANDLE_VALUE) {
- propagate_last_error_to_errno();
-
- return 0;
- }
-
- FindClose(h);
-
- /*
- * A NTFS symlink is a file with a bit of metadata ("reparse point"),
- * So (1) check if the file has metadata and then (2) confirm that
- * it is indeed a symlink.
- */
- if (data.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) {
- if (data.dwReserved0 == IO_REPARSE_TAG_SYMLINK) {
- return 1;
- }
- }
-
- return 0;
-}
-
-static int hstat(HANDLE h, struct win32_stat *wst)
-{
- BY_HANDLE_FILE_INFORMATION info;
- FILE_STANDARD_INFO std;
-
- SetLastError(0);
- reset_errno();
-
- if (!GetFileInformationByHandle(h, &info)) {
- propagate_last_error_to_errno();
-
- return -1;
- }
-
- if (!GetFileInformationByHandleEx(h, FileStandardInfo,
- &std, sizeof(std))) {
- propagate_last_error_to_errno();
-
- return -1;
- }
-
- wst->st_nlink = std.NumberOfLinks;
- if (std.DeletePending) {
- wst->st_nlink = 0;
- }
-
- wst->st_mode = get_mode(info.dwFileAttributes);
- wst->st_size = UINT64(info.nFileSizeHigh, info.nFileSizeLow);
- wst->st_ino = UINT64(info.nFileIndexHigh, info.nFileIndexLow);
- wst->st_mtime = filetime_to_epoch(&info.ftLastWriteTime);
-
- return 0;
-}
-
-int win32_stat(const char *path, struct win32_stat *wst)
-{
- HANDLE h;
-
- SetLastError(0);
- reset_errno();
-
- h = CreateFileA(path,
- GENERIC_READ,
- FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,
- NULL, /* lpSecurityAttributes */
- OPEN_EXISTING, /* dwCreationDisposition */
- 0, /* dwFlagsAndAttributes */
- NULL); /* hTemplateFile */
-
- if (h == INVALID_HANDLE_VALUE) {
- propagate_last_error_to_errno();
-
- return -1;
- }
-
- if (hstat(h, wst)) {
- CloseHandle(h);
- return -1;
- }
-
- CloseHandle(h);
- return 0;
-}
-
-int win32_lstat(const char *path, struct win32_stat *wst)
-{
- HANDLE h;
-
- SetLastError(0);
- reset_errno();
-
- h = CreateFileA(path,
- GENERIC_READ,
- FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,
- NULL, /* lpSecurityAttributes */
- OPEN_EXISTING, /* dwCreationDisposition */
- FILE_FLAG_OPEN_REPARSE_POINT,
- NULL); /* hTemplateFile */
-
- if (h == INVALID_HANDLE_VALUE) {
- propagate_last_error_to_errno();
-
- return -1;
- }
-
- if (hstat(h, wst)) {
- CloseHandle(h);
- return -1;
- }
-
- if (is_symlink(path)) {
- wst->st_mode = WIN32_S_IFLNK;
- }
-
- CloseHandle(h);
- return 0;
-}
-
-int win32_fstat(int fd, struct win32_stat *wst)
-{
- HANDLE h;
-
- SetLastError(0);
- reset_errno();
-
- h = (HANDLE) _get_osfhandle(fd);
-
- if (h == INVALID_HANDLE_VALUE) {
- propagate_last_error_to_errno();
-
- return -1;
- }
-
- return hstat(h, wst);
-}