summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/in_tail
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/fluent-bit/plugins/in_tail
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/plugins/in_tail')
-rw-r--r--src/fluent-bit/plugins/in_tail/CMakeLists.txt37
-rw-r--r--src/fluent-bit/plugins/in_tail/tail.c783
-rw-r--r--src/fluent-bit/plugins/in_tail/tail.h45
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_config.c472
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_config.h168
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_db.c277
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_db.h43
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_dockermode.c459
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_dockermode.h38
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_file.c1860
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_file.h137
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_file_internal.h130
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_fs.h96
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_fs_inotify.c433
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_fs_inotify.h37
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_fs_stat.c253
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_fs_stat.h37
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_multiline.c606
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_multiline.h57
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_scan.c71
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_scan.h29
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_scan_glob.c278
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_scan_win32.c245
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_signal.h98
-rw-r--r--src/fluent-bit/plugins/in_tail/tail_sql.h65
-rw-r--r--src/fluent-bit/plugins/in_tail/win32.h67
-rw-r--r--src/fluent-bit/plugins/in_tail/win32/interface.h44
-rw-r--r--src/fluent-bit/plugins/in_tail/win32/io.c47
-rw-r--r--src/fluent-bit/plugins/in_tail/win32/stat.c332
29 files changed, 7244 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/in_tail/CMakeLists.txt b/src/fluent-bit/plugins/in_tail/CMakeLists.txt
new file mode 100644
index 000000000..31d865218
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/CMakeLists.txt
@@ -0,0 +1,37 @@
+set(src
+ tail_file.c
+ tail_dockermode.c
+ tail_scan.c
+ tail_config.c
+ tail_fs_stat.c
+ tail.c)
+
+if(FLB_HAVE_INOTIFY)
+set(src
+ ${src}
+ tail_fs_inotify.c)
+endif()
+
+if(FLB_SQLDB)
+set(src
+ ${src}
+ tail_db.c)
+endif()
+
+if(FLB_PARSER)
+ set(src
+ ${src}
+ tail_multiline.c
+ )
+endif()
+
+if(MSVC)
+ set(src
+ ${src}
+ win32/stat.c
+ win32/io.c
+ )
+ FLB_PLUGIN(in_tail "${src}" "Shlwapi")
+else()
+ FLB_PLUGIN(in_tail "${src}" "")
+endif()
diff --git a/src/fluent-bit/plugins/in_tail/tail.c b/src/fluent-bit/plugins/in_tail/tail.c
new file mode 100644
index 000000000..34a0fec3d
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail.c
@@ -0,0 +1,783 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_config_map.h>
+#include <fluent-bit/flb_error.h>
+#include <fluent-bit/flb_utils.h>
+
+#include "tail.h"
+#include "tail_fs.h"
+#include "tail_db.h"
+#include "tail_file.h"
+#include "tail_scan.h"
+#include "tail_signal.h"
+#include "tail_config.h"
+#include "tail_dockermode.h"
+#include "tail_multiline.h"
+
+static inline int consume_byte(flb_pipefd_t fd)
+{
+ int ret;
+ uint64_t val;
+
+ /* We need to consume the byte */
+ ret = flb_pipe_r(fd, (char *) &val, sizeof(val));
+ if (ret <= 0) {
+ flb_errno();
+ return -1;
+ }
+
+ return 0;
+}
+
+/* cb_collect callback */
+static int in_tail_collect_pending(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ int ret;
+ int active = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_tail_config *ctx = in_context;
+ struct flb_tail_file *file;
+ struct stat st;
+ uint64_t pre;
+ uint64_t total_processed = 0;
+
+ /* Iterate promoted event files with pending bytes */
+ mk_list_foreach_safe(head, tmp, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+
+ if (file->watch_fd == -1 ||
+ (file->offset >= file->size)) {
+ ret = fstat(file->fd, &st);
+
+ if (ret == -1) {
+ flb_errno();
+ flb_tail_file_remove(file);
+ continue;
+ }
+
+ file->size = st.st_size;
+ file->pending_bytes = (file->size - file->offset);
+ }
+ else {
+ memset(&st, 0, sizeof(struct stat));
+ }
+
+ if (file->pending_bytes <= 0) {
+ file->pending_bytes = (file->size - file->offset);
+ }
+
+ if (file->pending_bytes <= 0) {
+ continue;
+ }
+
+ if (ctx->event_batch_size > 0 &&
+ total_processed >= ctx->event_batch_size) {
+ break;
+ }
+
+ /* get initial offset to calculate the number of processed bytes later */
+ pre = file->offset;
+
+ ret = flb_tail_file_chunk(file);
+
+ /* Update the total number of bytes processed */
+ if (file->offset > pre) {
+ total_processed += (file->offset - pre);
+ }
+
+ switch (ret) {
+ case FLB_TAIL_ERROR:
+ /* Could not longer read the file */
+ flb_tail_file_remove(file);
+ break;
+ case FLB_TAIL_OK:
+ case FLB_TAIL_BUSY:
+ /*
+ * Adjust counter to verify if we need a further read(2) later.
+ * For more details refer to tail_fs_inotify.c:96.
+ */
+ if (file->offset < file->size) {
+ file->pending_bytes = (file->size - file->offset);
+ active++;
+ }
+ else {
+ file->pending_bytes = 0;
+ }
+ break;
+ }
+ }
+
+ /* If no more active files, consume pending signal so we don't get called again. */
+ if (active == 0) {
+ tail_consume_pending(ctx);
+ }
+
+ return 0;
+}
+
+/* cb_collect callback */
+static int in_tail_collect_static(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ int ret;
+ int active = 0;
+ int pre_size;
+ int pos_size;
+ int alter_size = 0;
+ int completed = FLB_FALSE;
+ char s_size[32];
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_tail_config *ctx = in_context;
+ struct flb_tail_file *file;
+ uint64_t pre;
+ uint64_t total_processed = 0;
+
+ /* Do a data chunk collection for each file */
+ mk_list_foreach_safe(head, tmp, &ctx->files_static) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+
+ /*
+ * The list 'files_static' represents all the files that were discovered
+ * on startup that already contains data: these are called 'static files'.
+ *
+ * When processing static files, we don't know what kind of content they
+ * have and what kind of 'latency' might add to process all of them in
+ * a row. Despite we always 'try' to do a full round and process a
+ * fraction of them on every invocation of this function if we have a
+ * huge number of files we will face latency and make the main pipeline
+ * to degrade performance.
+ *
+ * In order to avoid this situation, we added a new option to the plugin
+ * called 'static_batch_size' which basically defines how many bytes can
+ * be processed on every invocation to process the static files.
+ *
+ * When the limit is reached, we just break the loop and as a side effect
+ * we allow other events keep processing.
+ */
+ if (ctx->static_batch_size > 0 &&
+ total_processed >= ctx->static_batch_size) {
+ break;
+ }
+
+ /* get initial offset to calculate the number of processed bytes later */
+ pre = file->offset;
+
+ /* Process the file */
+ ret = flb_tail_file_chunk(file);
+
+ /* Update the total number of bytes processed */
+ if (file->offset > pre) {
+ total_processed += (file->offset - pre);
+ }
+
+ switch (ret) {
+ case FLB_TAIL_ERROR:
+ /* Could not longer read the file */
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" collect static ERROR",
+ file->inode);
+ flb_tail_file_remove(file);
+ break;
+ case FLB_TAIL_OK:
+ case FLB_TAIL_BUSY:
+ active++;
+ break;
+ case FLB_TAIL_WAIT:
+ if (file->config->exit_on_eof) {
+ flb_plg_info(ctx->ins, "inode=%"PRIu64" file=%s ended, stop",
+ file->inode, file->name);
+ if (ctx->files_static_count == 1) {
+ flb_engine_exit(config);
+ }
+ }
+ /* Promote file to 'events' type handler */
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s promote to TAIL_EVENT",
+ file->inode, file->name);
+
+ /*
+ * When promoting a file from 'static' to 'event' mode, the promoter
+ * will check if the file has been rotated while it was being
+ * processed on this function, if so, it will try to check for the
+ * following condition:
+ *
+ * "discover a new possible file created due to rotation"
+ *
+ * If the condition above is met, a new file entry will be added to
+ * the list that we are processing and a 'new signal' will be send
+ * to the signal manager. But the signal manager will trigger the
+ * message only if no pending messages exists (to avoid queue size
+ * exhaustion).
+ *
+ * All good, but there is a corner case where if no 'active' files
+ * exists, the signal will be read and this function will not be
+ * called again and since the signal did not triggered the
+ * message, the 'new file' enqueued by the nested function
+ * might stay in stale mode (note that altering the length of this
+ * list will not be reflected yet)
+ *
+ * To fix the corner case, we use a variable called 'alter_size'
+ * that determinate if the size of the list keeps the same after
+ * a rotation, so it means: a new file was added.
+ *
+ * We use 'alter_size' as a helper in the conditional below to know
+ * when to stop processing the static list.
+ */
+ if (alter_size == 0) {
+ pre_size = ctx->files_static_count;
+ }
+ ret = flb_tail_file_to_event(file);
+ if (ret == -1) {
+ flb_plg_debug(ctx->ins, "file=%s cannot promote, unregistering",
+ file->name);
+ flb_tail_file_remove(file);
+ }
+
+ if (alter_size == 0) {
+ pos_size = ctx->files_static_count;
+ if (pre_size == pos_size) {
+ alter_size++;
+ }
+ }
+ break;
+ }
+ }
+
+ /*
+ * If there are no more active static handlers, we consume the 'byte' that
+ * triggered this event so this is not longer called again.
+ */
+ if (active == 0 && alter_size == 0) {
+ consume_byte(ctx->ch_manager[0]);
+ ctx->ch_reads++;
+ completed = FLB_TRUE;
+ }
+
+ /* Debugging number of processed bytes */
+ if (flb_log_check_level(ctx->ins->log_level, FLB_LOG_DEBUG)) {
+ flb_utils_bytes_to_human_readable_size(total_processed,
+ s_size, sizeof(s_size));
+ if (completed) {
+ flb_plg_debug(ctx->ins, "[static files] processed %s, done", s_size);
+ }
+ else {
+ flb_plg_debug(ctx->ins, "[static files] processed %s", s_size);
+ }
+ }
+
+ return 0;
+}
+
+static int in_tail_watcher_callback(struct flb_input_instance *ins,
+ struct flb_config *config, void *context)
+{
+ int ret = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_tail_config *ctx = context;
+ struct flb_tail_file *file;
+ (void) config;
+
+ mk_list_foreach_safe(head, tmp, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ if (file->is_link == FLB_TRUE) {
+ ret = flb_tail_file_is_rotated(ctx, file);
+ if (ret == FLB_FALSE) {
+ continue;
+ }
+
+ /* The symbolic link name has been rotated */
+ flb_tail_file_rotated(file);
+ }
+ }
+ return ret;
+}
+
+int in_tail_collect_event(void *file, struct flb_config *config)
+{
+ int ret;
+ struct stat st;
+ struct flb_tail_file *f = file;
+
+ ret = fstat(f->fd, &st);
+ if (ret == -1) {
+ flb_tail_file_remove(f);
+ return 0;
+ }
+
+ ret = flb_tail_file_chunk(f);
+ switch (ret) {
+ case FLB_TAIL_ERROR:
+ /* Could not longer read the file */
+ flb_tail_file_remove(f);
+ break;
+ case FLB_TAIL_OK:
+ case FLB_TAIL_WAIT:
+ break;
+ }
+
+ return 0;
+}
+
+/* Initialize plugin */
+static int in_tail_init(struct flb_input_instance *in,
+ struct flb_config *config, void *data)
+{
+ int ret = -1;
+ struct flb_tail_config *ctx = NULL;
+
+ /* Allocate space for the configuration */
+ ctx = flb_tail_config_create(in, config);
+ if (!ctx) {
+ return -1;
+ }
+ ctx->ins = in;
+
+ /* Initialize file-system watcher */
+ ret = flb_tail_fs_init(in, ctx, config);
+ if (ret == -1) {
+ flb_tail_config_destroy(ctx);
+ return -1;
+ }
+
+ /* Scan path */
+ flb_tail_scan(ctx->path_list, ctx);
+
+ /*
+ * After the first scan (on start time), all new files discovered needs to be
+ * read from head, so we switch the 'read_from_head' flag to true so any
+ * other file discovered after a scan or a rotation are read from the
+ * beginning.
+ */
+ ctx->read_from_head = FLB_TRUE;
+
+ /* Set plugin context */
+ flb_input_set_context(in, ctx);
+
+ /* Register an event collector */
+ ret = flb_input_set_collector_event(in, in_tail_collect_static,
+ ctx->ch_manager[0], config);
+ if (ret == -1) {
+ flb_tail_config_destroy(ctx);
+ return -1;
+ }
+ ctx->coll_fd_static = ret;
+
+ /* Register re-scan: time managed by 'refresh_interval' property */
+ ret = flb_input_set_collector_time(in, flb_tail_scan_callback,
+ ctx->refresh_interval_sec,
+ ctx->refresh_interval_nsec,
+ config);
+ if (ret == -1) {
+ flb_tail_config_destroy(ctx);
+ return -1;
+ }
+ ctx->coll_fd_scan = ret;
+
+ /* Register watcher, interval managed by 'watcher_interval' property */
+ ret = flb_input_set_collector_time(in, in_tail_watcher_callback,
+ ctx->watcher_interval, 0,
+ config);
+ if (ret == -1) {
+ flb_tail_config_destroy(ctx);
+ return -1;
+ }
+ ctx->coll_fd_watcher = ret;
+
+ /* Register callback to purge rotated files */
+ ret = flb_input_set_collector_time(in, flb_tail_file_purge,
+ ctx->rotate_wait, 0,
+ config);
+ if (ret == -1) {
+ flb_tail_config_destroy(ctx);
+ return -1;
+ }
+ ctx->coll_fd_rotated = ret;
+
+ /* Register callback to process pending bytes in promoted files */
+ ret = flb_input_set_collector_event(in, in_tail_collect_pending,
+ ctx->ch_pending[0], config);//1, 0, config);
+ if (ret == -1) {
+ flb_tail_config_destroy(ctx);
+ return -1;
+ }
+ ctx->coll_fd_pending = ret;
+
+
+ if (ctx->multiline == FLB_TRUE && ctx->parser) {
+ ctx->parser = NULL;
+ flb_plg_warn(in, "on multiline mode 'Parser' is not allowed "
+ "(parser disabled)");
+ }
+
+ /* Register callback to process docker mode queued buffer */
+ if (ctx->docker_mode == FLB_TRUE) {
+ ret = flb_input_set_collector_time(in, flb_tail_dmode_pending_flush,
+ ctx->docker_mode_flush, 0,
+ config);
+ if (ret == -1) {
+ ctx->docker_mode = FLB_FALSE;
+ flb_tail_config_destroy(ctx);
+ return -1;
+ }
+ ctx->coll_fd_dmode_flush = ret;
+ }
+
+#ifdef FLB_HAVE_PARSER
+ /* Register callback to process multiline queued buffer */
+ if (ctx->multiline == FLB_TRUE) {
+ ret = flb_input_set_collector_time(in, flb_tail_mult_pending_flush,
+ ctx->multiline_flush, 0,
+ config);
+ if (ret == -1) {
+ ctx->multiline = FLB_FALSE;
+ flb_tail_config_destroy(ctx);
+ return -1;
+ }
+ ctx->coll_fd_mult_flush = ret;
+ }
+#endif
+
+ return 0;
+}
+
+/* Pre-run callback / before the event loop */
+static int in_tail_pre_run(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ struct flb_tail_config *ctx = in_context;
+ (void) ins;
+
+ return tail_signal_manager(ctx);
+}
+
+static int in_tail_exit(void *data, struct flb_config *config)
+{
+ (void) *config;
+ struct flb_tail_config *ctx = data;
+
+ flb_tail_file_remove_all(ctx);
+ flb_tail_fs_exit(ctx);
+ flb_tail_config_destroy(ctx);
+
+ return 0;
+}
+
+static void in_tail_pause(void *data, struct flb_config *config)
+{
+ struct flb_tail_config *ctx = data;
+
+ /*
+ * Pause general collectors:
+ *
+ * - static : static files lookup before promotion
+ */
+ flb_input_collector_pause(ctx->coll_fd_static, ctx->ins);
+ flb_input_collector_pause(ctx->coll_fd_pending, ctx->ins);
+
+ if (ctx->docker_mode == FLB_TRUE) {
+ flb_input_collector_pause(ctx->coll_fd_dmode_flush, ctx->ins);
+ if (config->is_ingestion_active == FLB_FALSE) {
+ flb_plg_info(ctx->ins, "flushing pending docker mode data...");
+ flb_tail_dmode_pending_flush_all(ctx);
+ }
+ }
+
+ if (ctx->multiline == FLB_TRUE) {
+ flb_input_collector_pause(ctx->coll_fd_mult_flush, ctx->ins);
+ if (config->is_ingestion_active == FLB_FALSE) {
+ flb_plg_info(ctx->ins, "flushing pending multiline data...");
+ flb_tail_mult_pending_flush_all(ctx);
+ }
+ }
+
+ /* Pause file system backend handlers */
+ flb_tail_fs_pause(ctx);
+}
+
+static void in_tail_resume(void *data, struct flb_config *config)
+{
+ struct flb_tail_config *ctx = data;
+
+ flb_input_collector_resume(ctx->coll_fd_static, ctx->ins);
+ flb_input_collector_resume(ctx->coll_fd_pending, ctx->ins);
+
+ if (ctx->docker_mode == FLB_TRUE) {
+ flb_input_collector_resume(ctx->coll_fd_dmode_flush, ctx->ins);
+ }
+
+ if (ctx->multiline == FLB_TRUE) {
+ flb_input_collector_resume(ctx->coll_fd_mult_flush, ctx->ins);
+ }
+
+ /* Pause file system backend handlers */
+ flb_tail_fs_resume(ctx);
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_CLIST, "path", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, path_list),
+ "pattern specifying log files or multiple ones through "
+ "the use of common wildcards."
+ },
+ {
+ FLB_CONFIG_MAP_CLIST, "exclude_path", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, exclude_list),
+ "Set one or multiple shell patterns separated by commas to exclude "
+ "files matching a certain criteria, e.g: 'exclude_path *.gz,*.zip'"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "key", "log",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, key),
+ "when a message is unstructured (no parser applied), it's appended "
+ "as a string under the key name log. This option allows to define an "
+ "alternative name for that key."
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "read_from_head", "false",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, read_from_head),
+ "For new discovered files on start (without a database offset/position), read the "
+ "content from the head of the file, not tail."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "refresh_interval", "60",
+ 0, FLB_FALSE, 0,
+ "interval to refresh the list of watched files expressed in seconds."
+ },
+ {
+ FLB_CONFIG_MAP_TIME, "watcher_interval", "2s",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, watcher_interval),
+ },
+ {
+ FLB_CONFIG_MAP_TIME, "progress_check_interval", "2s",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, progress_check_interval),
+ },
+ {
+ FLB_CONFIG_MAP_INT, "progress_check_interval_nsec", "0",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, progress_check_interval_nsec),
+ },
+ {
+ FLB_CONFIG_MAP_TIME, "rotate_wait", FLB_TAIL_ROTATE_WAIT,
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, rotate_wait),
+ "specify the number of extra time in seconds to monitor a file once is "
+ "rotated in case some pending data is flushed."
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "docker_mode", "false",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, docker_mode),
+ "If enabled, the plugin will recombine split Docker log lines before "
+ "passing them to any parser as configured above. This mode cannot be "
+ "used at the same time as Multiline."
+ },
+ {
+ FLB_CONFIG_MAP_INT, "docker_mode_flush", "4",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, docker_mode_flush),
+ "wait period time in seconds to flush queued unfinished split lines."
+
+ },
+#ifdef FLB_HAVE_REGEX
+ {
+ FLB_CONFIG_MAP_STR, "docker_mode_parser", NULL,
+ 0, FLB_FALSE, 0,
+ "specify the parser name to fetch log first line for muliline log"
+ },
+#endif
+ {
+ FLB_CONFIG_MAP_STR, "path_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, path_key),
+ "set the 'key' name where the name of monitored file will be appended."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "offset_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, offset_key),
+ "set the 'key' name where the offset of monitored file will be appended."
+ },
+ {
+ FLB_CONFIG_MAP_TIME, "ignore_older", "0",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, ignore_older),
+ "ignore files older than 'ignore_older'. Supports m,h,d (minutes, "
+ "hours, days) syntax. Default behavior is to read all the files."
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", FLB_TAIL_CHUNK,
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, buf_chunk_size),
+ "set the initial buffer size to read data from files. This value is "
+ "used too to increase buffer size."
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "buffer_max_size", FLB_TAIL_CHUNK,
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, buf_max_size),
+ "set the limit of the buffer size per monitored file. When a buffer "
+ "needs to be increased (e.g: very long lines), this value is used to "
+ "restrict how much the memory buffer can grow. If reading a file exceed "
+ "this limit, the file is removed from the monitored file list."
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "static_batch_size", FLB_TAIL_STATIC_BATCH_SIZE,
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, static_batch_size),
+ "On start, Fluent Bit might process files which already contains data, "
+ "these files are called 'static' files. The configuration property "
+ "in question set's the maximum number of bytes to process per iteration "
+ "for the static files monitored."
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "event_batch_size", FLB_TAIL_EVENT_BATCH_SIZE,
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, event_batch_size),
+ "When Fluent Bit is processing files in event based mode the amount of"
+ "data available for consumption could be too much and cause the input plugin "
+ "to over extend and smother other plugins"
+ "The configuration property sets the maximum number of bytes to process per iteration "
+ "for the files monitored (in event mode)."
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "skip_long_lines", "false",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, skip_long_lines),
+ "if a monitored file reach it buffer capacity due to a very long line "
+ "(buffer_max_size), the default behavior is to stop monitoring that "
+ "file. This option alter that behavior and instruct Fluent Bit to skip "
+ "long lines and continue processing other lines that fits into the buffer."
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "exit_on_eof", "false",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, exit_on_eof),
+ "exit Fluent Bit when reaching EOF on a monitored file."
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "skip_empty_lines", "false",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, skip_empty_lines),
+ "Allows to skip empty lines."
+ },
+
+#ifdef FLB_HAVE_INOTIFY
+ {
+ FLB_CONFIG_MAP_BOOL, "inotify_watcher", "true",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, inotify_watcher),
+ "set to false to use file stat watcher instead of inotify."
+ },
+#endif
+#ifdef FLB_HAVE_REGEX
+ {
+ FLB_CONFIG_MAP_STR, "parser", NULL,
+ 0, FLB_FALSE, 0,
+ "specify the parser name to process an unstructured message."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "tag_regex", NULL,
+ 0, FLB_FALSE, 0,
+ "set a regex to extract fields from the file name and use them later to "
+ "compose the Tag."
+ },
+#endif
+
+#ifdef FLB_HAVE_SQLDB
+ {
+ FLB_CONFIG_MAP_STR, "db", NULL,
+ 0, FLB_FALSE, 0,
+ "set a database file to keep track of monitored files and it offsets."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "db.sync", "normal",
+ 0, FLB_FALSE, 0,
+ "set a database sync method. values: extra, full, normal and off."
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "db.locking", "false",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, db_locking),
+ "set exclusive locking mode, increase performance but don't allow "
+ "external connections to the database file."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "db.journal_mode", "WAL",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, db_journal_mode),
+ "Option to provide WAL configuration for Work Ahead Logging mechanism (WAL). Enabling WAL "
+ "provides higher performance. Note that WAL is not compatible with "
+ "shared network file systems."
+ },
+#endif
+
+ /* Multiline Options */
+#ifdef FLB_HAVE_PARSER
+ {
+ FLB_CONFIG_MAP_BOOL, "multiline", "false",
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, multiline),
+ "if enabled, the plugin will try to discover multiline messages and use "
+ "the proper parsers to compose the outgoing messages. Note that when this "
+ "option is enabled the Parser option is not used."
+ },
+ {
+ FLB_CONFIG_MAP_TIME, "multiline_flush", FLB_TAIL_MULT_FLUSH,
+ 0, FLB_TRUE, offsetof(struct flb_tail_config, multiline_flush),
+ "wait period time in seconds to process queued multiline messages."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "parser_firstline", NULL,
+ 0, FLB_FALSE, 0,
+ "name of the parser that matches the beginning of a multiline message. "
+ "Note that the regular expression defined in the parser must include a "
+ "group name (named capture)."
+ },
+ {
+ FLB_CONFIG_MAP_STR_PREFIX, "parser_", NULL,
+ 0, FLB_FALSE, 0,
+ "optional extra parser to interpret and structure multiline entries. This "
+ "option can be used to define multiple parsers, e.g: Parser_1 ab1, "
+ "Parser_2 ab2, Parser_N abN."
+ },
+
+ /* Multiline Core Engine based API */
+ {
+ FLB_CONFIG_MAP_CLIST, "multiline.parser", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_tail_config, multiline_parsers),
+ "specify one or multiple multiline parsers: docker, cri, go, java, etc."
+ },
+#endif
+
+ /* EOF */
+ {0}
+};
+
+struct flb_input_plugin in_tail_plugin = {
+ .name = "tail",
+ .description = "Tail files",
+ .cb_init = in_tail_init,
+ .cb_pre_run = in_tail_pre_run,
+ .cb_collect = NULL,
+ .cb_flush_buf = NULL,
+ .cb_pause = in_tail_pause,
+ .cb_resume = in_tail_resume,
+ .cb_exit = in_tail_exit,
+ .config_map = config_map,
+ .flags = 0
+};
diff --git a/src/fluent-bit/plugins/in_tail/tail.h b/src/fluent-bit/plugins/in_tail/tail.h
new file mode 100644
index 000000000..074f7a49f
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail.h
@@ -0,0 +1,45 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_H
+#define FLB_TAIL_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+
+/* Internal return values */
+#define FLB_TAIL_ERROR -1
+#define FLB_TAIL_OK 0
+#define FLB_TAIL_WAIT 1
+#define FLB_TAIL_BUSY 2
+
+/* Consuming mode */
+#define FLB_TAIL_STATIC 0 /* Data is being consumed through read(2) */
+#define FLB_TAIL_EVENT 1 /* Data is being consumed through inotify */
+
+/* Config */
+#define FLB_TAIL_CHUNK "32768" /* buffer chunk = 32KB */
+#define FLB_TAIL_REFRESH 60 /* refresh every 60 seconds */
+#define FLB_TAIL_ROTATE_WAIT "5" /* time to monitor after rotation */
+#define FLB_TAIL_STATIC_BATCH_SIZE "50M" /* static batch size */
+#define FLB_TAIL_EVENT_BATCH_SIZE "50M" /* event batch size */
+
+int in_tail_collect_event(void *file, struct flb_config *config);
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_config.c b/src/fluent-bit/plugins/in_tail/tail_config.c
new file mode 100644
index 000000000..360b3dbea
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_config.c
@@ -0,0 +1,472 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_parser.h>
+
+#include <stdlib.h>
+#include <fcntl.h>
+
+#include "tail_fs.h"
+#include "tail_db.h"
+#include "tail_config.h"
+#include "tail_scan.h"
+#include "tail_sql.h"
+#include "tail_dockermode.h"
+
+#ifdef FLB_HAVE_PARSER
+#include "tail_multiline.h"
+#endif
+
+static int multiline_load_parsers(struct flb_tail_config *ctx)
+{
+ struct mk_list *head;
+ struct mk_list *head_p;
+ struct flb_config_map_val *mv;
+ struct flb_slist_entry *val = NULL;
+ struct flb_ml_parser_ins *parser_i;
+
+ if (!ctx->multiline_parsers) {
+ return 0;
+ }
+
+ /* Create Multiline context using the plugin instance name */
+ ctx->ml_ctx = flb_ml_create(ctx->config, ctx->ins->name);
+ if (!ctx->ml_ctx) {
+ return -1;
+ }
+
+ /*
+ * Iterate all 'multiline.parser' entries. Every entry is considered
+ * a group which can have multiple multiline parser instances.
+ */
+ flb_config_map_foreach(head, mv, ctx->multiline_parsers) {
+ mk_list_foreach(head_p, mv->val.list) {
+ val = mk_list_entry(head_p, struct flb_slist_entry, _head);
+
+ /* Create an instance of the defined parser */
+ parser_i = flb_ml_parser_instance_create(ctx->ml_ctx, val->str);
+ if (!parser_i) {
+ return -1;
+ }
+ }
+ }
+
+ return 0;
+}
+
+struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
+ struct flb_config *config)
+{
+ int ret;
+ int sec;
+ int i;
+ long nsec;
+ const char *tmp;
+ struct flb_tail_config *ctx;
+
+ ctx = flb_calloc(1, sizeof(struct flb_tail_config));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->config = config;
+ ctx->ins = ins;
+ ctx->ignore_older = 0;
+ ctx->skip_long_lines = FLB_FALSE;
+#ifdef FLB_HAVE_SQLDB
+ ctx->db_sync = 1; /* sqlite sync 'normal' */
+#endif
+
+ /* Load the config map */
+ ret = flb_input_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return NULL;
+ }
+
+ /* Create the channel manager */
+ ret = flb_pipe_create(ctx->ch_manager);
+ if (ret == -1) {
+ flb_errno();
+ flb_free(ctx);
+ return NULL;
+ }
+ ctx->ch_reads = 0;
+ ctx->ch_writes = 0;
+
+ /* Create the pending channel */
+ ret = flb_pipe_create(ctx->ch_pending);
+ if (ret == -1) {
+ flb_errno();
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+ /* Make pending channel non-blocking */
+ for (i = 0; i <= 1; i++) {
+ ret = flb_pipe_set_nonblocking(ctx->ch_pending[i]);
+ if (ret == -1) {
+ flb_errno();
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+ }
+
+ /* Config: path/pattern to read files */
+ if (!ctx->path_list || mk_list_size(ctx->path_list) == 0) {
+ flb_plg_error(ctx->ins, "no input 'path' was given");
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+
+ /* Config: seconds interval before to re-scan the path */
+ tmp = flb_input_get_property("refresh_interval", ins);
+ if (!tmp) {
+ ctx->refresh_interval_sec = FLB_TAIL_REFRESH;
+ ctx->refresh_interval_nsec = 0;
+ }
+ else {
+ ret = flb_utils_time_split(tmp, &sec, &nsec);
+ if (ret == 0) {
+ ctx->refresh_interval_sec = sec;
+ ctx->refresh_interval_nsec = nsec;
+
+ if (sec == 0 && nsec == 0) {
+ flb_plg_error(ctx->ins, "invalid 'refresh_interval' config "
+ "value (%s)", tmp);
+ flb_free(ctx);
+ return NULL;
+ }
+
+ if (sec == 0 && nsec <= 1000000) {
+ flb_plg_warn(ctx->ins, "very low refresh_interval "
+ "(%i.%lu nanoseconds) might cause high CPU usage",
+ sec, nsec);
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "invalid 'refresh_interval' config value (%s)",
+ tmp);
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+ }
+
+ /* Config: seconds interval to monitor file after rotation */
+ if (ctx->rotate_wait <= 0) {
+ flb_plg_error(ctx->ins, "invalid 'rotate_wait' config value");
+ flb_free(ctx);
+ return NULL;
+ }
+
+#ifdef FLB_HAVE_PARSER
+ /* Config: multi-line support */
+ if (ctx->multiline == FLB_TRUE) {
+ ret = flb_tail_mult_create(ctx, ins, config);
+ if (ret == -1) {
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+ }
+#endif
+
+ /* Config: Docker mode */
+ if(ctx->docker_mode == FLB_TRUE) {
+ ret = flb_tail_dmode_create(ctx, ins, config);
+ if (ret == -1) {
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+ }
+
+ /* Validate buffer limit */
+ if (ctx->buf_chunk_size > ctx->buf_max_size) {
+ flb_plg_error(ctx->ins, "buffer_max_size must be >= buffer_chunk");
+ flb_free(ctx);
+ return NULL;
+ }
+
+#ifdef FLB_HAVE_REGEX
+ /* Parser / Format */
+ tmp = flb_input_get_property("parser", ins);
+ if (tmp) {
+ ctx->parser = flb_parser_get(tmp, config);
+ if (!ctx->parser) {
+ flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp);
+ }
+ }
+#endif
+
+ mk_list_init(&ctx->files_static);
+ mk_list_init(&ctx->files_event);
+ mk_list_init(&ctx->files_rotated);
+
+ /* hash table for files lookups */
+ ctx->static_hash = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1000, 0);
+ if (!ctx->static_hash) {
+ flb_plg_error(ctx->ins, "could not create static hash");
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+
+ ctx->event_hash = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1000, 0);
+ if (!ctx->event_hash) {
+ flb_plg_error(ctx->ins, "could not create event hash");
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+
+#ifdef FLB_HAVE_SQLDB
+ ctx->db = NULL;
+#endif
+
+#ifdef FLB_HAVE_REGEX
+ tmp = flb_input_get_property("tag_regex", ins);
+ if (tmp) {
+ ctx->tag_regex = flb_regex_create(tmp);
+ if (ctx->tag_regex) {
+ ctx->dynamic_tag = FLB_TRUE;
+ }
+ else {
+ flb_plg_error(ctx->ins, "invalid 'tag_regex' config value");
+ }
+ }
+ else {
+ ctx->tag_regex = NULL;
+ }
+#endif
+
+ /* Check if it should use dynamic tags */
+ tmp = strchr(ins->tag, '*');
+ if (tmp) {
+ ctx->dynamic_tag = FLB_TRUE;
+ }
+
+#ifdef FLB_HAVE_SQLDB
+ /* Database options (needs to be set before the context) */
+ tmp = flb_input_get_property("db.sync", ins);
+ if (tmp) {
+ if (strcasecmp(tmp, "extra") == 0) {
+ ctx->db_sync = 3;
+ }
+ else if (strcasecmp(tmp, "full") == 0) {
+ ctx->db_sync = 2;
+ }
+ else if (strcasecmp(tmp, "normal") == 0) {
+ ctx->db_sync = 1;
+ }
+ else if (strcasecmp(tmp, "off") == 0) {
+ ctx->db_sync = 0;
+ }
+ else {
+ flb_plg_error(ctx->ins, "invalid database 'db.sync' value");
+ }
+ }
+
+ /* Initialize database */
+ tmp = flb_input_get_property("db", ins);
+ if (tmp) {
+ ctx->db = flb_tail_db_open(tmp, ins, ctx, config);
+ if (!ctx->db) {
+ flb_plg_error(ctx->ins, "could not open/create database");
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+ }
+
+ /* Journal mode check */
+ tmp = flb_input_get_property("db.journal_mode", ins);
+ if (tmp) {
+ if (strcasecmp(tmp, "DELETE") != 0 &&
+ strcasecmp(tmp, "TRUNCATE") != 0 &&
+ strcasecmp(tmp, "PERSIST") != 0 &&
+ strcasecmp(tmp, "MEMORY") != 0 &&
+ strcasecmp(tmp, "WAL") != 0 &&
+ strcasecmp(tmp, "OFF") != 0) {
+
+ flb_plg_error(ctx->ins, "invalid db.journal_mode=%s", tmp);
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+ }
+
+ /* Prepare Statement */
+ if (ctx->db) {
+ /* SQL_GET_FILE */
+ ret = sqlite3_prepare_v2(ctx->db->handler,
+ SQL_GET_FILE,
+ -1,
+ &ctx->stmt_get_file,
+ 0);
+ if (ret != SQLITE_OK) {
+ flb_plg_error(ctx->ins, "error preparing database SQL statement");
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+
+ /* SQL_INSERT_FILE */
+ ret = sqlite3_prepare_v2(ctx->db->handler,
+ SQL_INSERT_FILE,
+ -1,
+ &ctx->stmt_insert_file,
+ 0);
+ if (ret != SQLITE_OK) {
+ flb_plg_error(ctx->ins, "error preparing database SQL statement");
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+
+ /* SQL_ROTATE_FILE */
+ ret = sqlite3_prepare_v2(ctx->db->handler,
+ SQL_ROTATE_FILE,
+ -1,
+ &ctx->stmt_rotate_file,
+ 0);
+ if (ret != SQLITE_OK) {
+ flb_plg_error(ctx->ins, "error preparing database SQL statement");
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+
+ /* SQL_UPDATE_OFFSET */
+ ret = sqlite3_prepare_v2(ctx->db->handler,
+ SQL_UPDATE_OFFSET,
+ -1,
+ &ctx->stmt_offset,
+ 0);
+ if (ret != SQLITE_OK) {
+ flb_plg_error(ctx->ins, "error preparing database SQL statement");
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+
+ /* SQL_DELETE_FILE */
+ ret = sqlite3_prepare_v2(ctx->db->handler,
+ SQL_DELETE_FILE,
+ -1,
+ &ctx->stmt_delete_file,
+ 0);
+ if (ret != SQLITE_OK) {
+ flb_plg_error(ctx->ins, "error preparing database SQL statement");
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+
+ }
+#endif
+
+#ifdef FLB_HAVE_PARSER
+ /* Multiline core API */
+ if (ctx->multiline_parsers && mk_list_size(ctx->multiline_parsers) > 0) {
+ ret = multiline_load_parsers(ctx);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "could not load multiline parsers");
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+
+ /* Enable auto-flush routine */
+ ret = flb_ml_auto_flush_init(ctx->ml_ctx);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not start multiline auto-flush");
+ flb_tail_config_destroy(ctx);
+ return NULL;
+ }
+ flb_plg_info(ctx->ins, "multiline core started");
+ }
+#endif
+
+#ifdef FLB_HAVE_METRICS
+ ctx->cmt_files_opened = cmt_counter_create(ins->cmt,
+ "fluentbit", "input",
+ "files_opened_total",
+ "Total number of opened files",
+ 1, (char *[]) {"name"});
+
+ ctx->cmt_files_closed = cmt_counter_create(ins->cmt,
+ "fluentbit", "input",
+ "files_closed_total",
+ "Total number of closed files",
+ 1, (char *[]) {"name"});
+
+ ctx->cmt_files_rotated = cmt_counter_create(ins->cmt,
+ "fluentbit", "input",
+ "files_rotated_total",
+ "Total number of rotated files",
+ 1, (char *[]) {"name"});
+
+ /* OLD metrics */
+ flb_metrics_add(FLB_TAIL_METRIC_F_OPENED,
+ "files_opened", ctx->ins->metrics);
+ flb_metrics_add(FLB_TAIL_METRIC_F_CLOSED,
+ "files_closed", ctx->ins->metrics);
+ flb_metrics_add(FLB_TAIL_METRIC_F_ROTATED,
+ "files_rotated", ctx->ins->metrics);
+#endif
+
+ return ctx;
+}
+
+int flb_tail_config_destroy(struct flb_tail_config *config)
+{
+
+#ifdef FLB_HAVE_PARSER
+ flb_tail_mult_destroy(config);
+
+ if (config->ml_ctx) {
+ flb_ml_destroy(config->ml_ctx);
+ }
+#endif
+
+ /* Close pipe ends */
+ flb_pipe_close(config->ch_manager[0]);
+ flb_pipe_close(config->ch_manager[1]);
+ flb_pipe_close(config->ch_pending[0]);
+ flb_pipe_close(config->ch_pending[1]);
+
+#ifdef FLB_HAVE_REGEX
+ if (config->tag_regex) {
+ flb_regex_destroy(config->tag_regex);
+ }
+#endif
+
+#ifdef FLB_HAVE_SQLDB
+ if (config->db != NULL) {
+ sqlite3_finalize(config->stmt_get_file);
+ sqlite3_finalize(config->stmt_insert_file);
+ sqlite3_finalize(config->stmt_delete_file);
+ sqlite3_finalize(config->stmt_rotate_file);
+ sqlite3_finalize(config->stmt_offset);
+ flb_tail_db_close(config->db);
+ }
+#endif
+
+ if (config->static_hash) {
+ flb_hash_table_destroy(config->static_hash);
+ }
+ if (config->event_hash) {
+ flb_hash_table_destroy(config->event_hash);
+ }
+
+ flb_free(config);
+ return 0;
+}
diff --git a/src/fluent-bit/plugins/in_tail/tail_config.h b/src/fluent-bit/plugins/in_tail/tail_config.h
new file mode 100644
index 000000000..dcfa54e02
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_config.h
@@ -0,0 +1,168 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_CONFIG_H
+#define FLB_TAIL_CONFIG_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_parser.h>
+#include <fluent-bit/flb_macros.h>
+#include <fluent-bit/flb_sqldb.h>
+#include <fluent-bit/flb_metrics.h>
+#include <fluent-bit/flb_log_event.h>
+#ifdef FLB_HAVE_REGEX
+#include <fluent-bit/flb_regex.h>
+#endif
+#ifdef FLB_HAVE_PARSER
+#include <fluent-bit/multiline/flb_ml.h>
+#endif
+
+#include <xxhash.h>
+
+/* Metrics */
+#ifdef FLB_HAVE_METRICS
+#define FLB_TAIL_METRIC_F_OPENED 100 /* number of opened files */
+#define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */
+#define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */
+#endif
+
+struct flb_tail_config {
+ int fd_notify; /* inotify fd */
+ flb_pipefd_t ch_manager[2]; /* pipe: channel manager */
+ flb_pipefd_t ch_pending[2]; /* pipe: pending events */
+ int ch_reads; /* count number if signal reads */
+ int ch_writes; /* count number of signal writes */
+
+ /* Buffer Config */
+ size_t buf_chunk_size; /* allocation chunks */
+ size_t buf_max_size; /* max size of a buffer */
+
+ /* Static files processor */
+ size_t static_batch_size;
+
+ /* Event files processor */
+ size_t event_batch_size;
+
+ /* Collectors */
+ int coll_fd_static;
+ int coll_fd_scan;
+ int coll_fd_watcher;
+ int coll_fd_rotated;
+ int coll_fd_pending;
+ int coll_fd_inactive;
+ int coll_fd_dmode_flush;
+ int coll_fd_mult_flush;
+ int coll_fd_progress_check;
+
+ /* Backend collectors */
+ int coll_fd_fs1; /* used by fs_inotify & fs_stat */
+ int coll_fd_fs2; /* only used by fs_stat */
+
+ /* Configuration */
+ int dynamic_tag; /* dynamic tag ? e.g: abc.* */
+#ifdef FLB_HAVE_REGEX
+ struct flb_regex *tag_regex;/* path to tag regex */
+#endif
+ int refresh_interval_sec; /* seconds to re-scan */
+ long refresh_interval_nsec;/* nanoseconds to re-scan */
+ int read_from_head; /* read new files from head */
+ int rotate_wait; /* sec to wait on rotated files */
+ int watcher_interval; /* watcher interval */
+ int ignore_older; /* ignore fields older than X seconds */
+ time_t last_pending; /* last time a 'pending signal' was emitted' */
+ struct mk_list *path_list; /* list of paths to scan (glob) */
+ flb_sds_t path_key; /* key name of file path */
+ flb_sds_t key; /* key for unstructured record */
+ int skip_long_lines; /* skip long lines */
+ int skip_empty_lines; /* skip empty lines (off) */
+ int exit_on_eof; /* exit fluent-bit on EOF, test */
+
+ int progress_check_interval; /* watcher interval */
+ int progress_check_interval_nsec; /* watcher interval */
+
+#ifdef FLB_HAVE_INOTIFY
+ int inotify_watcher; /* enable/disable inotify monitor */
+#endif
+ flb_sds_t offset_key; /* key name of file offset */
+
+ /* Database */
+#ifdef FLB_HAVE_SQLDB
+ struct flb_sqldb *db;
+ int db_sync;
+ int db_locking;
+ flb_sds_t db_journal_mode;
+ sqlite3_stmt *stmt_get_file;
+ sqlite3_stmt *stmt_insert_file;
+ sqlite3_stmt *stmt_delete_file;
+ sqlite3_stmt *stmt_rotate_file;
+ sqlite3_stmt *stmt_offset;
+#endif
+
+ /* Parser / Format */
+ struct flb_parser *parser;
+
+ /* Multiline */
+ int multiline; /* multiline enabled ? */
+ int multiline_flush; /* multiline flush/wait */
+ struct flb_parser *mult_parser_firstline;
+ struct mk_list mult_parsers;
+
+ /* Docker mode */
+ int docker_mode; /* Docker mode enabled ? */
+ int docker_mode_flush; /* Docker mode flush/wait */
+ struct flb_parser *docker_mode_parser; /* Parser for separate multiline logs */
+
+ /* Multiline core engine */
+ struct flb_ml *ml_ctx;
+ struct mk_list *multiline_parsers;
+
+ uint64_t files_static_count; /* number of items in the static file list */
+ struct mk_list files_static;
+ struct mk_list files_event;
+
+ /* List of rotated files that needs to be removed after 'rotate_wait' */
+ struct mk_list files_rotated;
+
+ /* List of shell patterns used to exclude certain file names */
+ struct mk_list *exclude_list;
+
+ /* Plugin input instance */
+ struct flb_input_instance *ins;
+
+ struct flb_log_event_encoder log_event_encoder;
+ struct flb_log_event_decoder log_event_decoder;
+
+ /* Metrics */
+ struct cmt_counter *cmt_files_opened;
+ struct cmt_counter *cmt_files_closed;
+ struct cmt_counter *cmt_files_rotated;
+
+ /* Hash: hash tables for quick acess to registered files */
+ struct flb_hash_table *static_hash;
+ struct flb_hash_table *event_hash;
+
+ struct flb_config *config;
+};
+
+struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
+ struct flb_config *config);
+int flb_tail_config_destroy(struct flb_tail_config *config);
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_db.c b/src/fluent-bit/plugins/in_tail/tail_db.c
new file mode 100644
index 000000000..664963b6d
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_db.c
@@ -0,0 +1,277 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_sqldb.h>
+
+#include "tail_db.h"
+#include "tail_sql.h"
+#include "tail_file.h"
+
+struct query_status {
+ int id;
+ int rows;
+ int64_t offset;
+};
+
+/* Open or create database required by tail plugin */
+struct flb_sqldb *flb_tail_db_open(const char *path,
+ struct flb_input_instance *in,
+ struct flb_tail_config *ctx,
+ struct flb_config *config)
+{
+ int ret;
+ char tmp[64];
+ struct flb_sqldb *db;
+
+ /* Open/create the database */
+ db = flb_sqldb_open(path, in->name, config);
+ if (!db) {
+ return NULL;
+ }
+
+ /* Create table schema if it don't exists */
+ ret = flb_sqldb_query(db, SQL_CREATE_FILES, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db: could not create 'in_tail_files' table");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+
+ if (ctx->db_sync >= 0) {
+ snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC,
+ ctx->db_sync);
+ ret = flb_sqldb_query(db, tmp, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db could not set pragma 'sync'");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+ }
+
+ if (ctx->db_locking == FLB_TRUE) {
+ ret = flb_sqldb_query(db, SQL_PRAGMA_LOCKING_MODE, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db: could not set pragma 'locking_mode'");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+ }
+
+ if (ctx->db_journal_mode) {
+ snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_JOURNAL_MODE,
+ ctx->db_journal_mode);
+ ret = flb_sqldb_query(db, tmp, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db could not set pragma 'journal_mode'");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+ }
+
+ return db;
+}
+
+int flb_tail_db_close(struct flb_sqldb *db)
+{
+ flb_sqldb_close(db);
+ return 0;
+}
+
+/*
+ * Check if an file inode exists in the database. Return FLB_TRUE or
+ * FLB_FALSE
+ */
+static int db_file_exists(struct flb_tail_file *file,
+ struct flb_tail_config *ctx,
+ uint64_t *id, uint64_t *inode, off_t *offset)
+{
+ int ret;
+ int exists = FLB_FALSE;
+
+ /* Bind parameters */
+ sqlite3_bind_int64(ctx->stmt_get_file, 1, file->inode);
+ ret = sqlite3_step(ctx->stmt_get_file);
+
+ if (ret == SQLITE_ROW) {
+ exists = FLB_TRUE;
+
+ /* id: column 0 */
+ *id = sqlite3_column_int64(ctx->stmt_get_file, 0);
+
+ /* offset: column 2 */
+ *offset = sqlite3_column_int64(ctx->stmt_get_file, 2);
+
+ /* inode: column 3 */
+ *inode = sqlite3_column_int64(ctx->stmt_get_file, 3);
+ }
+ else if (ret == SQLITE_DONE) {
+ /* all good */
+ }
+ else {
+ exists = -1;
+ }
+
+ sqlite3_clear_bindings(ctx->stmt_get_file);
+ sqlite3_reset(ctx->stmt_get_file);
+
+ return exists;
+
+}
+
+static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ctx)
+
+{
+ int ret;
+ time_t created;
+
+ /* Register the file */
+ created = time(NULL);
+
+ /* Bind parameters */
+ sqlite3_bind_text(ctx->stmt_insert_file, 1, file->name, -1, 0);
+ sqlite3_bind_int64(ctx->stmt_insert_file, 2, file->offset);
+ sqlite3_bind_int64(ctx->stmt_insert_file, 3, file->inode);
+ sqlite3_bind_int64(ctx->stmt_insert_file, 4, created);
+
+ /* Run the insert */
+ ret = sqlite3_step(ctx->stmt_insert_file);
+ if (ret != SQLITE_DONE) {
+ sqlite3_clear_bindings(ctx->stmt_insert_file);
+ sqlite3_reset(ctx->stmt_insert_file);
+ flb_plg_error(ctx->ins, "cannot execute insert file %s inode=%lu",
+ file->name, file->inode);
+ return -1;
+ }
+
+ sqlite3_clear_bindings(ctx->stmt_insert_file);
+ sqlite3_reset(ctx->stmt_insert_file);
+
+ /* Get the database ID for this file */
+ return flb_sqldb_last_id(ctx->db);
+}
+
+int flb_tail_db_file_set(struct flb_tail_file *file,
+ struct flb_tail_config *ctx)
+{
+ int ret;
+ uint64_t id = 0;
+ off_t offset = 0;
+ uint64_t inode = 0;
+
+ /* Check if the file exists */
+ ret = db_file_exists(file, ctx, &id, &inode, &offset);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "cannot execute query to check inode: %lu",
+ file->inode);
+ return -1;
+ }
+
+ if (ret == FLB_FALSE) {
+ /* Get the database ID for this file */
+ file->db_id = db_file_insert(file, ctx);
+ }
+ else {
+ file->db_id = id;
+ file->offset = offset;
+ }
+
+ return 0;
+}
+
+/* Update Offset v2 */
+int flb_tail_db_file_offset(struct flb_tail_file *file,
+ struct flb_tail_config *ctx)
+{
+ int ret;
+
+ /* Bind parameters */
+ sqlite3_bind_int64(ctx->stmt_offset, 1, file->offset);
+ sqlite3_bind_int64(ctx->stmt_offset, 2, file->db_id);
+
+ ret = sqlite3_step(ctx->stmt_offset);
+
+ if (ret != SQLITE_DONE) {
+ sqlite3_clear_bindings(ctx->stmt_offset);
+ sqlite3_reset(ctx->stmt_offset);
+ return -1;
+ }
+
+ /* Verify number of updated rows */
+ ret = sqlite3_changes(ctx->db->handler);
+ if (ret == 0) {
+ /*
+ * 'someone' like you 'the reader' or another user has deleted the database
+ * entry, just restore it.
+ */
+ file->db_id = db_file_insert(file, ctx);
+ }
+
+ sqlite3_clear_bindings(ctx->stmt_offset);
+ sqlite3_reset(ctx->stmt_offset);
+
+ return 0;
+}
+
+/* Mark a file as rotated v2 */
+int flb_tail_db_file_rotate(const char *new_name,
+ struct flb_tail_file *file,
+ struct flb_tail_config *ctx)
+{
+ int ret;
+
+ /* Bind parameters */
+ sqlite3_bind_text(ctx->stmt_rotate_file, 1, new_name, -1, 0);
+ sqlite3_bind_int64(ctx->stmt_rotate_file, 2, file->db_id);
+
+ ret = sqlite3_step(ctx->stmt_rotate_file);
+
+ sqlite3_clear_bindings(ctx->stmt_rotate_file);
+ sqlite3_reset(ctx->stmt_rotate_file);
+
+ if (ret != SQLITE_DONE) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Delete file entry from the database */
+int flb_tail_db_file_delete(struct flb_tail_file *file,
+ struct flb_tail_config *ctx)
+{
+ int ret;
+
+ /* Bind parameters */
+ sqlite3_bind_int64(ctx->stmt_delete_file, 1, file->db_id);
+ ret = sqlite3_step(ctx->stmt_delete_file);
+
+ sqlite3_clear_bindings(ctx->stmt_delete_file);
+ sqlite3_reset(ctx->stmt_delete_file);
+
+ if (ret != SQLITE_DONE) {
+ flb_plg_error(ctx->ins, "db: error deleting entry from database: %s",
+ file->name);
+ return -1;
+ }
+
+ flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name);
+ return 0;
+}
diff --git a/src/fluent-bit/plugins/in_tail/tail_db.h b/src/fluent-bit/plugins/in_tail/tail_db.h
new file mode 100644
index 000000000..7b5355d22
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_db.h
@@ -0,0 +1,43 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_DB_H
+#define FLB_TAIL_DB_H
+
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_sqldb.h>
+
+#include "tail_file.h"
+
+struct flb_sqldb *flb_tail_db_open(const char *path,
+ struct flb_input_instance *in,
+ struct flb_tail_config *ctx,
+ struct flb_config *config);
+
+int flb_tail_db_close(struct flb_sqldb *db);
+int flb_tail_db_file_set(struct flb_tail_file *file,
+ struct flb_tail_config *ctx);
+int flb_tail_db_file_offset(struct flb_tail_file *file,
+ struct flb_tail_config *ctx);
+int flb_tail_db_file_rotate(const char *new_name,
+ struct flb_tail_file *file,
+ struct flb_tail_config *ctx);
+int flb_tail_db_file_delete(struct flb_tail_file *file,
+ struct flb_tail_config *ctx);
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_dockermode.c b/src/fluent-bit/plugins/in_tail/tail_dockermode.c
new file mode 100644
index 000000000..a8f20f9cd
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_dockermode.c
@@ -0,0 +1,459 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_unescape.h>
+
+#include "tail_config.h"
+#include "tail_dockermode.h"
+#include "tail_file_internal.h"
+
+int flb_tail_dmode_create(struct flb_tail_config *ctx,
+ struct flb_input_instance *ins,
+ struct flb_config *config)
+{
+ const char *tmp;
+
+ if (ctx->multiline == FLB_TRUE) {
+ flb_plg_error(ctx->ins, "Docker mode cannot be enabled when multiline "
+ "is enabled");
+ return -1;
+ }
+
+#ifdef FLB_HAVE_REGEX
+ /* First line Parser */
+ tmp = flb_input_get_property("docker_mode_parser", ins);
+ if (tmp) {
+ ctx->docker_mode_parser = flb_parser_get(tmp, config);
+ if (!ctx->docker_mode_parser) {
+ flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp);
+ }
+ }
+ else {
+ ctx->docker_mode_parser = NULL;
+ }
+#endif
+
+ tmp = flb_input_get_property("docker_mode_flush", ins);
+ if (!tmp) {
+ ctx->docker_mode_flush = FLB_TAIL_DMODE_FLUSH;
+ }
+ else {
+ ctx->docker_mode_flush = atoi(tmp);
+ if (ctx->docker_mode_flush <= 0) {
+ ctx->docker_mode_flush = 1;
+ }
+ }
+
+ return 0;
+}
+
+static int modify_json_cond(char *js, size_t js_len,
+ char **val, size_t *val_len,
+ char **out, size_t *out_len,
+ int cond(char*, size_t),
+ int mod(char*, size_t, char**, size_t*, void*), void *data)
+{
+ int ret;
+ struct flb_pack_state state;
+ jsmntok_t *t;
+ jsmntok_t *t_val = NULL;
+ int i;
+ int i_root = -1;
+ int i_key = -1;
+ char *old_val;
+ size_t old_val_len;
+ char *new_val = NULL;
+ size_t new_val_len = 0;
+ size_t mod_len;
+
+ ret = flb_pack_state_init(&state);
+ if (ret != 0) {
+ ret = -1;
+ goto modify_json_cond_end;
+ }
+
+ ret = flb_json_tokenise(js, js_len, &state);
+ if (ret != 0 || state.tokens_count == 0) {
+ ret = -1;
+ goto modify_json_cond_end;
+ }
+
+ for (i = 0; i < state.tokens_count; i++) {
+ t = &state.tokens[i];
+
+ if (i_key >= 0) {
+ if (t->parent == i_key) {
+ if (t->type == JSMN_STRING) {
+ t_val = t;
+ }
+ break;
+ }
+ continue;
+ }
+
+ if (t->start == 0 && t->parent == -1 && t->type == JSMN_OBJECT) {
+ i_root = i;
+ continue;
+ }
+ if (i_root == -1) {
+ continue;
+ }
+
+ if (t->parent == i_root && t->type == JSMN_STRING && t->end - t->start == 3 && strncmp(js + t->start, "log", 3) == 0) {
+ i_key = i;
+ }
+ }
+
+ if (!t_val) {
+ ret = -1;
+ goto modify_json_cond_end;
+ }
+
+ *out = js;
+ *out_len = js_len;
+
+ if (val) {
+ *val = js + t_val->start;
+ }
+ if (val_len) {
+ *val_len = t_val->end - t_val->start;
+ }
+
+ if (!cond || cond(js + t_val->start, t_val->end - t_val->start)) {
+ old_val = js + t_val->start;
+ old_val_len = t_val->end - t_val->start;
+ ret = mod(old_val, old_val_len, &new_val, &new_val_len, data);
+ if (ret != 0) {
+ ret = -1;
+ goto modify_json_cond_end;
+ }
+
+ ret = 1;
+
+ if (new_val == old_val) {
+ goto modify_json_cond_end;
+ }
+
+ mod_len = js_len + new_val_len - old_val_len;
+ *out = flb_malloc(mod_len);
+ if (!*out) {
+ flb_errno();
+ flb_free(new_val);
+ ret = -1;
+ goto modify_json_cond_end;
+ }
+ *out_len = mod_len;
+
+ memcpy(*out, js, t_val->start);
+ memcpy(*out + t_val->start, new_val, new_val_len);
+ memcpy(*out + t_val->start + new_val_len, js + t_val->end, js_len - t_val->end);
+
+ flb_free(new_val);
+ }
+
+ modify_json_cond_end:
+ flb_pack_state_reset(&state);
+ if (ret < 0) {
+ *out = NULL;
+ }
+ return ret;
+}
+
+static int unesc_ends_with_nl(char *str, size_t len)
+{
+ char* unesc;
+ int unesc_len;
+ int nl;
+
+ unesc = flb_malloc(len + 1);
+ if (!unesc) {
+ flb_errno();
+ return FLB_FALSE;
+ }
+ unesc_len = flb_unescape_string(str, len, &unesc);
+ nl = unesc[unesc_len - 1] == '\n';
+ flb_free(unesc);
+ return nl;
+}
+
+static int prepend_sds_to_str(char *str, size_t len, char **out, size_t *out_len, void *data)
+{
+ flb_sds_t sds = data;
+
+ if (flb_sds_len(sds) == 0) {
+ *out = str;
+ *out_len = len;
+ return 0;
+ }
+
+ size_t mod_len = flb_sds_len(sds) + len;
+ *out = flb_malloc(mod_len);
+ if (!*out) {
+ flb_errno();
+ return -1;
+ }
+ *out_len = mod_len;
+
+ memcpy(*out, sds, flb_sds_len(sds));
+ memcpy(*out + flb_sds_len(sds), str, len);
+ return 0;
+}
+
+static int use_sds(char *str, size_t len, char **out, size_t *out_len, void *data)
+{
+ flb_sds_t sds = data;
+ size_t mod_len = flb_sds_len(sds);
+ *out = flb_malloc(mod_len);
+ if (!*out) {
+ flb_errno();
+ return -1;
+ }
+ *out_len = mod_len;
+
+ memcpy(*out, sds, flb_sds_len(sds));
+ return 0;
+}
+
+int flb_tail_dmode_process_content(time_t now,
+ char* line, size_t line_len,
+ char **repl_line, size_t *repl_line_len,
+ struct flb_tail_file *file,
+ struct flb_tail_config *ctx)
+{
+ char* val = NULL;
+ size_t val_len;
+ int ret;
+ void *out_buf = NULL;
+ size_t out_size;
+ struct flb_time out_time = {0};
+ *repl_line = NULL;
+ *repl_line_len = 0;
+ flb_sds_t tmp;
+ flb_sds_t tmp_copy;
+
+#ifdef FLB_HAVE_REGEX
+ if (ctx->docker_mode_parser) {
+ ret = flb_parser_do(ctx->docker_mode_parser, line, line_len,
+ &out_buf, &out_size, &out_time);
+ flb_free(out_buf);
+
+ /*
+ * Set dmode_firstline if the line meets the first-line requirement
+ */
+ if(ret >= 0) {
+ file->dmode_firstline = true;
+ }
+
+ /*
+ * Process if buffer contains full log line
+ */
+ if (flb_sds_len(file->dmode_lastline) > 0 && file->dmode_complete) {
+ /*
+ * Buffered log should be flushed out
+ * as current line meets first-line requirement
+ */
+ if(ret >= 0) {
+ flb_tail_dmode_flush(file, ctx);
+ }
+
+ /*
+ * Flush the buffer if multiline has not been detected yet
+ */
+ if (!file->dmode_firstline) {
+ flb_tail_dmode_flush(file, ctx);
+ }
+ }
+ }
+#endif
+
+ ret = modify_json_cond(line, line_len,
+ &val, &val_len,
+ repl_line, repl_line_len,
+ unesc_ends_with_nl,
+ prepend_sds_to_str, file->dmode_buf);
+ if (ret >= 0) {
+ /* line is a valid json */
+ flb_sds_len_set(file->dmode_lastline, 0);
+
+ /* concatenate current log line with buffered one */
+ tmp = flb_sds_cat(file->dmode_buf, val, val_len);
+ if (!tmp) {
+ flb_errno();
+ return -1;
+ }
+ file->dmode_buf = tmp;
+
+ tmp_copy = flb_sds_copy(file->dmode_lastline, line, line_len);
+ if (!tmp_copy) {
+ flb_errno();
+ return -1;
+ }
+
+ file->dmode_lastline = tmp_copy;
+ file->dmode_flush_timeout = now + (ctx->docker_mode_flush - 1);
+
+ if (ret == 0) {
+ /* Line not ended with newline */
+ file->dmode_complete = false;
+ }
+ else {
+ /* Line ended with newline */
+ file->dmode_complete = true;
+#ifdef FLB_HAVE_REGEX
+ if (!ctx->docker_mode_parser) {
+ flb_tail_dmode_flush(file, ctx);
+ }
+#else
+ flb_tail_dmode_flush(file, ctx);
+#endif
+ }
+ }
+ return ret;
+}
+
+void flb_tail_dmode_flush(struct flb_tail_file *file, struct flb_tail_config *ctx)
+{
+ int ret;
+ char *repl_line = NULL;
+ size_t repl_line_len = 0;
+ void *out_buf = NULL;
+ size_t out_size;
+ struct flb_time out_time = {0};
+ time_t now = time(NULL);
+
+ if (flb_sds_len(file->dmode_lastline) == 0) {
+ return;
+ }
+
+ flb_time_zero(&out_time);
+
+ ret = modify_json_cond(file->dmode_lastline,
+ flb_sds_len(file->dmode_lastline),
+ NULL, NULL,
+ &repl_line, &repl_line_len,
+ NULL,
+ use_sds, file->dmode_buf);
+ if (ret < 0) {
+ return;
+ }
+
+ flb_sds_len_set(file->dmode_buf, 0);
+ flb_sds_len_set(file->dmode_lastline, 0);
+ file->dmode_flush_timeout = 0;
+
+#ifdef FLB_HAVE_REGEX
+ if (ctx->parser) {
+ ret = flb_parser_do(ctx->parser, repl_line, repl_line_len,
+ &out_buf, &out_size, &out_time);
+ if (ret >= 0) {
+ if (flb_time_to_double(&out_time) == 0) {
+ flb_time_get(&out_time);
+ }
+ if (ctx->ignore_older > 0 && (now - ctx->ignore_older) > out_time.tm.tv_sec) {
+ goto dmode_flush_end;
+ }
+
+ flb_tail_pack_line_map(&out_time, (char**) &out_buf, &out_size, file, 0);
+
+ goto dmode_flush_end;
+ }
+ }
+#endif
+
+ flb_tail_file_pack_line(NULL, repl_line, repl_line_len, file, 0);
+
+ dmode_flush_end:
+ flb_free(repl_line);
+ flb_free(out_buf);
+}
+
+static void file_pending_flush(struct flb_tail_config *ctx,
+ struct flb_tail_file *file, time_t now)
+{
+ if (file->dmode_flush_timeout > now) {
+ return;
+ }
+
+ if (flb_sds_len(file->dmode_lastline) == 0) {
+ return;
+ }
+
+ flb_tail_dmode_flush(file, ctx);
+
+ if (file->sl_log_event_encoder->output_length > 0) {
+ flb_input_log_append(ctx->ins,
+ file->tag_buf,
+ file->tag_len,
+ file->sl_log_event_encoder->output_buffer,
+ file->sl_log_event_encoder->output_length);
+
+ flb_log_event_encoder_reset(file->sl_log_event_encoder);
+ }
+}
+
+int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx)
+{
+ time_t expired;
+ struct mk_list *head;
+ struct flb_tail_file *file;
+
+ expired = time(NULL) + 3600;
+
+ /* Iterate promoted event files with pending bytes */
+ mk_list_foreach(head, &ctx->files_static) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ file_pending_flush(ctx, file, expired);
+ }
+
+ /* Iterate promoted event files with pending bytes */
+ mk_list_foreach(head, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ file_pending_flush(ctx, file, expired);
+ }
+
+ return 0;
+}
+
+int flb_tail_dmode_pending_flush(struct flb_input_instance *ins,
+ struct flb_config *config, void *context)
+{
+ time_t now;
+ struct mk_list *head;
+ struct flb_tail_file *file;
+ struct flb_tail_config *ctx = context;
+
+ now = time(NULL);
+
+ /* Iterate static event files with pending bytes */
+ mk_list_foreach(head, &ctx->files_static) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ file_pending_flush(ctx, file, now);
+ }
+
+ /* Iterate promoted event files with pending bytes */
+ mk_list_foreach(head, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ file_pending_flush(ctx, file, now);
+ }
+
+ return 0;
+}
diff --git a/src/fluent-bit/plugins/in_tail/tail_dockermode.h b/src/fluent-bit/plugins/in_tail/tail_dockermode.h
new file mode 100644
index 000000000..50869ff62
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_dockermode.h
@@ -0,0 +1,38 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_DOCKERMODE_H
+#define FLB_TAIL_DOCKERMODE_H
+
+#include "tail_file.h"
+#define FLB_TAIL_DMODE_FLUSH 4
+
+int flb_tail_dmode_create(struct flb_tail_config *ctx,
+ struct flb_input_instance *ins, struct flb_config *config);
+int flb_tail_dmode_process_content(time_t now,
+ char* line, size_t line_len,
+ char **repl_line, size_t *repl_line_len,
+ struct flb_tail_file *file,
+ struct flb_tail_config *ctx);
+void flb_tail_dmode_flush(struct flb_tail_file *file, struct flb_tail_config *ctx);
+int flb_tail_dmode_pending_flush(struct flb_input_instance *ins,
+ struct flb_config *config, void *context);
+int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx);
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_file.c b/src/fluent-bit/plugins/in_tail/tail_file.c
new file mode 100644
index 000000000..2385f0626
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_file.c
@@ -0,0 +1,1860 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <time.h>
+#ifdef FLB_SYSTEM_FREEBSD
+#include <sys/user.h>
+#include <libutil.h>
+#endif
+
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_parser.h>
+#ifdef FLB_HAVE_REGEX
+#include <fluent-bit/flb_regex.h>
+#include <fluent-bit/flb_hash_table.h>
+#endif
+
+#include "tail.h"
+#include "tail_file.h"
+#include "tail_config.h"
+#include "tail_db.h"
+#include "tail_signal.h"
+#include "tail_dockermode.h"
+#include "tail_multiline.h"
+#include "tail_scan.h"
+
+#ifdef FLB_SYSTEM_WINDOWS
+#include "win32.h"
+#endif
+
+#include <cfl/cfl.h>
+
+static inline void consume_bytes(char *buf, int bytes, int length)
+{
+ memmove(buf, buf + bytes, length - bytes);
+}
+
+static uint64_t stat_get_st_dev(struct stat *st)
+{
+#ifdef FLB_SYSTEM_WINDOWS
+ /* do you want to contribute with a way to extract volume serial number ? */
+ return 0;
+#else
+ return st->st_dev;
+#endif
+}
+
+static int stat_to_hash_bits(struct flb_tail_config *ctx, struct stat *st,
+ uint64_t *out_hash)
+{
+ int len;
+ uint64_t st_dev;
+ char tmp[64];
+
+ st_dev = stat_get_st_dev(st);
+
+ len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 ":%" PRIu64,
+ st_dev, (uint64_t)st->st_ino);
+
+ *out_hash = cfl_hash_64bits(tmp, len);
+ return 0;
+}
+
+static int stat_to_hash_key(struct flb_tail_config *ctx, struct stat *st,
+ flb_sds_t *key)
+{
+ uint64_t st_dev;
+ flb_sds_t tmp;
+ flb_sds_t buf;
+
+ buf = flb_sds_create_size(64);
+ if (!buf) {
+ return -1;
+ }
+
+ st_dev = stat_get_st_dev(st);
+ tmp = flb_sds_printf(&buf, "%" PRIu64 ":%" PRIu64,
+ st_dev, (uint64_t)st->st_ino);
+ if (!tmp) {
+ flb_sds_destroy(buf);
+ return -1;
+ }
+
+ *key = buf;
+ return 0;
+}
+
+/* Append custom keys and report the number of records processed */
+static int record_append_custom_keys(struct flb_tail_file *file,
+ char *in_data, size_t in_size,
+ char **out_data, size_t *out_size)
+{
+ int i;
+ int ret;
+ int records = 0;
+ msgpack_object k;
+ msgpack_object v;
+ struct flb_log_event event;
+ struct flb_tail_config *ctx;
+ struct flb_log_event_encoder encoder;
+ struct flb_log_event_decoder decoder;
+
+ ctx = (struct flb_tail_config *) file->config;
+
+ ret = flb_log_event_decoder_init(&decoder, in_data, in_size);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ return -1;
+ }
+
+ ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_log_event_decoder_destroy(&decoder);
+
+ return -2;
+ }
+
+ while (flb_log_event_decoder_next(&decoder, &event) ==
+ FLB_EVENT_DECODER_SUCCESS) {
+
+ ret = flb_log_event_encoder_begin_record(&encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_timestamp(&encoder, &event.timestamp);
+ }
+
+ /* append previous map keys */
+ for (i = 0; i < event.body->via.map.size; i++) {
+ k = event.body->via.map.ptr[i].key;
+ v = event.body->via.map.ptr[i].val;
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_msgpack_object(
+ &encoder,
+ &k);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_msgpack_object(
+ &encoder,
+ &v);
+ }
+ }
+
+ /* path_key */
+ if (ctx->path_key != NULL) {
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_cstring(
+ &encoder,
+ file->config->path_key);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_cstring(
+ &encoder,
+ file->orig_name);
+ }
+ }
+
+ /* offset_key */
+ if (ctx->offset_key != NULL) {
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_cstring(
+ &encoder,
+ file->config->offset_key);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_uint64(
+ &encoder,
+ file->offset +
+ file->last_processed_bytes);
+ }
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_commit_record(&encoder);
+ }
+ else {
+ flb_plg_error(file->config->ins, "error packing event : %d", ret);
+
+ flb_log_event_encoder_rollback_record(&encoder);
+ }
+
+ /* counter */
+ records++;
+ }
+
+ *out_data = encoder.output_buffer;
+ *out_size = encoder.output_length;
+
+ /* This function transfers ownership of the internal memory allocated by
+ * sbuffer using msgpack_sbuffer_release which means the caller is
+ * responsible for releasing the memory.
+ */
+ flb_log_event_encoder_claim_internal_buffer_ownership(&encoder);
+
+ flb_log_event_decoder_destroy(&decoder);
+ flb_log_event_encoder_destroy(&encoder);
+
+ return records;
+}
+
+static int flb_tail_repack_map(struct flb_log_event_encoder *encoder,
+ char *data,
+ size_t data_size)
+{
+ msgpack_unpacked source_map;
+ size_t offset;
+ int result;
+ size_t index;
+ msgpack_object value;
+ msgpack_object key;
+
+ result = FLB_EVENT_ENCODER_SUCCESS;
+
+ if (data_size > 0) {
+ msgpack_unpacked_init(&source_map);
+
+ offset = 0;
+ result = msgpack_unpack_next(&source_map,
+ data,
+ data_size,
+ &offset);
+
+ if (result == MSGPACK_UNPACK_SUCCESS) {
+ result = FLB_EVENT_ENCODER_SUCCESS;
+ }
+ else {
+ result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE;
+ }
+
+ for (index = 0;
+ index < source_map.data.via.map.size &&
+ result == FLB_EVENT_ENCODER_SUCCESS;
+ index++) {
+ key = source_map.data.via.map.ptr[index].key;
+ value = source_map.data.via.map.ptr[index].val;
+
+ result = flb_log_event_encoder_append_body_msgpack_object(
+ encoder,
+ &key);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_msgpack_object(
+ encoder,
+ &value);
+ }
+ }
+
+ msgpack_unpacked_destroy(&source_map);
+ }
+
+ return result;
+}
+
+int flb_tail_pack_line_map(struct flb_time *time, char **data,
+ size_t *data_size, struct flb_tail_file *file,
+ size_t processed_bytes)
+{
+ int result;
+
+ result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_timestamp(
+ file->sl_log_event_encoder, time);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_tail_repack_map(file->sl_log_event_encoder,
+ *data,
+ *data_size);
+ }
+
+ /* path_key */
+ if (file->config->path_key != NULL) {
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ file->sl_log_event_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key),
+ FLB_LOG_EVENT_STRING_VALUE(file->orig_name,
+ file->orig_name_len));
+ }
+ }
+
+ /* offset_key */
+ if (file->config->offset_key != NULL) {
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ file->sl_log_event_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key),
+ FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes));
+ }
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder);
+ }
+ else {
+ flb_log_event_encoder_rollback_record(file->sl_log_event_encoder);
+ }
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(file->config->ins, "error packing event");
+
+ return -1;
+ }
+
+ return 0;
+}
+
+int flb_tail_file_pack_line(struct flb_time *time, char *data, size_t data_size,
+ struct flb_tail_file *file, size_t processed_bytes)
+{
+ int result;
+
+ result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_timestamp(
+ file->sl_log_event_encoder, time);
+ }
+
+ /* path_key */
+ if (file->config->path_key != NULL) {
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ file->sl_log_event_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key),
+ FLB_LOG_EVENT_STRING_VALUE(file->orig_name,
+ file->orig_name_len));
+ }
+ }
+
+ /* offset_key */
+ if (file->config->offset_key != NULL) {
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ file->sl_log_event_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key),
+ FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes));
+ }
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ file->sl_log_event_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(file->config->key),
+ FLB_LOG_EVENT_STRING_VALUE(data,
+ data_size));
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder);
+ }
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(file->config->ins, "error packing event : %d", result);
+
+ return -1;
+ }
+
+ return 0;
+}
+
+static int ml_stream_buffer_append(struct flb_tail_file *file, char *buf_data, size_t buf_size)
+{
+ int result;
+
+ result = flb_log_event_encoder_emit_raw_record(
+ file->ml_log_event_encoder,
+ buf_data, buf_size);
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(file->config->ins,
+ "log event raw append error : %d",
+ result);
+
+ return -1;
+ }
+
+ return 0;
+}
+
+static int ml_stream_buffer_flush(struct flb_tail_config *ctx, struct flb_tail_file *file)
+{
+ if (file->ml_log_event_encoder->output_length > 0) {
+ flb_input_log_append(ctx->ins,
+ file->tag_buf,
+ file->tag_len,
+ file->ml_log_event_encoder->output_buffer,
+ file->ml_log_event_encoder->output_length);
+
+ flb_log_event_encoder_reset(file->ml_log_event_encoder);
+ }
+
+ return 0;
+}
+
+static int process_content(struct flb_tail_file *file, size_t *bytes)
+{
+ size_t len;
+ int lines = 0;
+ int ret;
+ size_t processed_bytes = 0;
+ char *data;
+ char *end;
+ char *p;
+ void *out_buf;
+ size_t out_size;
+ int crlf;
+ char *line;
+ size_t line_len;
+ char *repl_line;
+ size_t repl_line_len;
+ time_t now = time(NULL);
+ struct flb_time out_time = {0};
+ struct flb_tail_config *ctx;
+
+ ctx = (struct flb_tail_config *) file->config;
+
+ /* Parse the data content */
+ data = file->buf_data;
+ end = data + file->buf_len;
+
+ /* reset last processed bytes */
+ file->last_processed_bytes = 0;
+
+ /* Skip null characters from the head (sometimes introduced by copy-truncate log rotation) */
+ while (data < end && *data == '\0') {
+ data++;
+ processed_bytes++;
+ }
+
+ while (data < end && (p = memchr(data, '\n', end - data))) {
+ len = (p - data);
+ crlf = 0;
+ if (file->skip_next == FLB_TRUE) {
+ data += len + 1;
+ processed_bytes += len + 1;
+ file->skip_next = FLB_FALSE;
+ continue;
+ }
+
+ /*
+ * Empty line (just breakline)
+ * ---------------------------
+ * [NOTE] with the new Multiline core feature and Multiline Filter on
+ * Fluent Bit v1.8.2, there are a couple of cases where stack traces
+ * or multi line patterns expects an empty line (meaning only the
+ * breakline), skipping empty lines on this plugin will break that
+ * functionality.
+ *
+ * We are introducing 'skip_empty_lines=off' configuration
+ * property to revert this behavior if some user is affected by
+ * this change.
+ */
+
+ if (len == 0 && ctx->skip_empty_lines) {
+ data++;
+ processed_bytes++;
+ continue;
+ }
+
+ /* Process '\r\n' */
+ if (len >= 2) {
+ crlf = (data[len-1] == '\r');
+ if (len == 1 && crlf) {
+ data += 2;
+ processed_bytes += 2;
+ continue;
+ }
+ }
+
+ /* Reset time for each line */
+ flb_time_zero(&out_time);
+
+ line = data;
+ line_len = len - crlf;
+ repl_line = NULL;
+
+ if (ctx->ml_ctx) {
+ ret = flb_ml_append_text(ctx->ml_ctx,
+ file->ml_stream_id,
+ &out_time,
+ line,
+ line_len);
+ goto go_next;
+ }
+ else if (ctx->docker_mode) {
+ ret = flb_tail_dmode_process_content(now, line, line_len,
+ &repl_line, &repl_line_len,
+ file, ctx);
+ if (ret >= 0) {
+ if (repl_line == line) {
+ repl_line = NULL;
+ }
+ else {
+ line = repl_line;
+ line_len = repl_line_len;
+ }
+ /* Skip normal parsers flow */
+ goto go_next;
+ }
+ else {
+ flb_tail_dmode_flush(file, ctx);
+ }
+ }
+
+#ifdef FLB_HAVE_PARSER
+ if (ctx->parser) {
+ /* Common parser (non-multiline) */
+ ret = flb_parser_do(ctx->parser, line, line_len,
+ &out_buf, &out_size, &out_time);
+ if (ret >= 0) {
+ if (flb_time_to_nanosec(&out_time) == 0L) {
+ flb_time_get(&out_time);
+ }
+
+ /* If multiline is enabled, flush any buffered data */
+ if (ctx->multiline == FLB_TRUE) {
+ flb_tail_mult_flush(file, ctx);
+ }
+
+ flb_tail_pack_line_map(&out_time,
+ (char**) &out_buf, &out_size, file,
+ processed_bytes);
+
+ flb_free(out_buf);
+ }
+ else {
+ /* Parser failed, pack raw text */
+ flb_tail_file_pack_line(NULL, data, len, file, processed_bytes);
+ }
+ }
+ else if (ctx->multiline == FLB_TRUE) {
+ ret = flb_tail_mult_process_content(now,
+ line, line_len,
+ file, ctx, processed_bytes);
+
+ /* No multiline */
+ if (ret == FLB_TAIL_MULT_NA) {
+ flb_tail_mult_flush(file, ctx);
+
+ flb_tail_file_pack_line(NULL,
+ line, line_len, file, processed_bytes);
+ }
+ else if (ret == FLB_TAIL_MULT_MORE) {
+ /* we need more data, do nothing */
+ goto go_next;
+ }
+ else if (ret == FLB_TAIL_MULT_DONE) {
+ /* Finalized */
+ }
+ }
+ else {
+ flb_tail_file_pack_line(NULL,
+ line, line_len, file, processed_bytes);
+ }
+#else
+ flb_tail_file_pack_line(NULL,
+ line, line_len, file, processed_bytes);
+#endif
+
+ go_next:
+ flb_free(repl_line);
+ repl_line = NULL;
+ /* Adjust counters */
+ data += len + 1;
+ processed_bytes += len + 1;
+ lines++;
+ file->parsed = 0;
+ file->last_processed_bytes += processed_bytes;
+ }
+ file->parsed = file->buf_len;
+
+ if (lines > 0) {
+ /* Append buffer content to a chunk */
+ *bytes = processed_bytes;
+
+ if (file->sl_log_event_encoder->output_length > 0) {
+ flb_input_log_append_records(ctx->ins,
+ lines,
+ file->tag_buf,
+ file->tag_len,
+ file->sl_log_event_encoder->output_buffer,
+ file->sl_log_event_encoder->output_length);
+
+ flb_log_event_encoder_reset(file->sl_log_event_encoder);
+ }
+ }
+ else if (file->skip_next) {
+ *bytes = file->buf_len;
+ }
+ else {
+ *bytes = processed_bytes;
+ }
+
+ if (ctx->ml_ctx) {
+ ml_stream_buffer_flush(ctx, file);
+ }
+
+ return lines;
+}
+
+static inline void drop_bytes(char *buf, size_t len, int pos, int bytes)
+{
+ memmove(buf + pos,
+ buf + pos + bytes,
+ len - pos - bytes);
+}
+
+#ifdef FLB_HAVE_REGEX
+static void cb_results(const char *name, const char *value,
+ size_t vlen, void *data)
+{
+ struct flb_hash_table *ht = data;
+
+ if (vlen == 0) {
+ return;
+ }
+
+ flb_hash_table_add(ht, name, strlen(name), (void *) value, vlen);
+}
+#endif
+
+#ifdef FLB_HAVE_REGEX
+static int tag_compose(char *tag, struct flb_regex *tag_regex, char *fname,
+ char *out_buf, size_t *out_size,
+ struct flb_tail_config *ctx)
+#else
+static int tag_compose(char *tag, char *fname, char *out_buf, size_t *out_size,
+ struct flb_tail_config *ctx)
+#endif
+{
+ int i;
+ size_t len;
+ char *p;
+ size_t buf_s = 0;
+#ifdef FLB_HAVE_REGEX
+ ssize_t n;
+ struct flb_regex_search result;
+ struct flb_hash_table *ht;
+ char *beg;
+ char *end;
+ int ret;
+ const char *tmp;
+ size_t tmp_s;
+#endif
+
+#ifdef FLB_HAVE_REGEX
+ if (tag_regex) {
+ n = flb_regex_do(tag_regex, fname, strlen(fname), &result);
+ if (n <= 0) {
+ flb_plg_error(ctx->ins, "invalid tag_regex pattern for file %s",
+ fname);
+ return -1;
+ }
+ else {
+ ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE,
+ FLB_HASH_TABLE_SIZE, FLB_HASH_TABLE_SIZE);
+ flb_regex_parse(tag_regex, &result, cb_results, ht);
+
+ for (p = tag, beg = p; (beg = strchr(p, '<')); p = end + 2) {
+ if (beg != p) {
+ len = (beg - p);
+ memcpy(out_buf + buf_s, p, len);
+ buf_s += len;
+ }
+
+ beg++;
+
+ end = strchr(beg, '>');
+ if (end && !memchr(beg, '<', end - beg)) {
+ end--;
+
+ len = end - beg + 1;
+ ret = flb_hash_table_get(ht, beg, len, (void *) &tmp, &tmp_s);
+ if (ret != -1) {
+ memcpy(out_buf + buf_s, tmp, tmp_s);
+ buf_s += tmp_s;
+ }
+ else {
+ memcpy(out_buf + buf_s, "_", 1);
+ buf_s++;
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "missing closing angle bracket in tag %s "
+ "at position %lu", tag, beg - tag);
+ flb_hash_table_destroy(ht);
+ return -1;
+ }
+ }
+
+ flb_hash_table_destroy(ht);
+ if (*p) {
+ len = strlen(p);
+ memcpy(out_buf + buf_s, p, len);
+ buf_s += len;
+ }
+ }
+ }
+ else {
+#endif
+ p = strchr(tag, '*');
+ if (!p) {
+ return -1;
+ }
+
+ /* Copy tag prefix if any */
+ len = (p - tag);
+ if (len > 0) {
+ memcpy(out_buf, tag, len);
+ buf_s += len;
+ }
+
+ /* Append file name */
+ len = strlen(fname);
+ memcpy(out_buf + buf_s, fname, len);
+ buf_s += len;
+
+ /* Tag suffix (if any) */
+ p++;
+ if (*p) {
+ len = strlen(tag);
+ memcpy(out_buf + buf_s, p, (len - (p - tag)));
+ buf_s += (len - (p - tag));
+ }
+
+ /* Sanitize buffer */
+ for (i = 0; i < buf_s; i++) {
+ if (out_buf[i] == '/' || out_buf[i] == '\\' || out_buf[i] == ':') {
+ if (i > 0) {
+ out_buf[i] = '.';
+ }
+ else {
+ drop_bytes(out_buf, buf_s, i, 1);
+ buf_s--;
+ i--;
+ }
+ }
+
+ if (i > 0 && out_buf[i] == '.') {
+ if (out_buf[i - 1] == '.') {
+ drop_bytes(out_buf, buf_s, i, 1);
+ buf_s--;
+ i--;
+ }
+ }
+ else if (out_buf[i] == '*') {
+ drop_bytes(out_buf, buf_s, i, 1);
+ buf_s--;
+ i--;
+ }
+ }
+
+ /* Check for an ending '.' */
+ if (out_buf[buf_s - 1] == '.') {
+ drop_bytes(out_buf, buf_s, buf_s - 1, 1);
+ buf_s--;
+ }
+#ifdef FLB_HAVE_REGEX
+ }
+#endif
+
+ out_buf[buf_s] = '\0';
+ *out_size = buf_s;
+
+ return 0;
+}
+
+static inline int flb_tail_file_exists(struct stat *st,
+ struct flb_tail_config *ctx)
+{
+ int ret;
+ uint64_t hash;
+
+ ret = stat_to_hash_bits(ctx, st, &hash);
+ if (ret != 0) {
+ return -1;
+ }
+
+ /* static hash */
+ if (flb_hash_table_exists(ctx->static_hash, hash)) {
+ return FLB_TRUE;
+ }
+
+ /* event hash */
+ if (flb_hash_table_exists(ctx->event_hash, hash)) {
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+/*
+ * Based in the configuration or database offset, set the proper 'offset' for the
+ * file in question.
+ */
+static int set_file_position(struct flb_tail_config *ctx,
+ struct flb_tail_file *file)
+{
+ int64_t ret;
+
+#ifdef FLB_HAVE_SQLDB
+ /*
+ * If the database option is enabled, try to gather the file position. The
+ * database function updates the file->offset entry.
+ */
+ if (ctx->db) {
+ ret = flb_tail_db_file_set(file, ctx);
+ if (ret == 0) {
+ if (file->offset > 0) {
+ ret = lseek(file->fd, file->offset, SEEK_SET);
+ if (ret == -1) {
+ flb_errno();
+ return -1;
+ }
+ }
+ else if (ctx->read_from_head == FLB_FALSE) {
+ ret = lseek(file->fd, 0, SEEK_END);
+ if (ret == -1) {
+ flb_errno();
+ return -1;
+ }
+ file->offset = ret;
+ flb_tail_db_file_offset(file, ctx);
+ }
+ return 0;
+ }
+ }
+#endif
+
+ if (ctx->read_from_head == FLB_TRUE) {
+ /* no need to seek, offset position is already zero */
+ return 0;
+ }
+
+ /* tail... */
+ ret = lseek(file->fd, 0, SEEK_END);
+ if (ret == -1) {
+ flb_errno();
+ return -1;
+ }
+ file->offset = ret;
+
+ return 0;
+}
+
+/* Multiline flush callback: invoked every time some content is complete */
+static int ml_flush_callback(struct flb_ml_parser *parser,
+ struct flb_ml_stream *mst,
+ void *data, char *buf_data, size_t buf_size)
+{
+ int result;
+ size_t mult_size = 0;
+ char *mult_buf = NULL;
+ struct flb_tail_file *file = data;
+ struct flb_tail_config *ctx = file->config;
+
+ if (ctx->path_key == NULL && ctx->offset_key == NULL) {
+ ml_stream_buffer_append(file, buf_data, buf_size);
+ }
+ else {
+ /* adjust the records in a new buffer */
+ result = record_append_custom_keys(file,
+ buf_data,
+ buf_size,
+ &mult_buf,
+ &mult_size);
+
+ if (result < 0) {
+ ml_stream_buffer_append(file, buf_data, buf_size);
+ }
+ else {
+ ml_stream_buffer_append(file, mult_buf, mult_size);
+
+ flb_free(mult_buf);
+ }
+ }
+
+ if (mst->forced_flush) {
+ ml_stream_buffer_flush(ctx, file);
+ }
+
+ return 0;
+}
+
+int flb_tail_file_append(char *path, struct stat *st, int mode,
+ struct flb_tail_config *ctx)
+{
+ int fd;
+ int ret;
+ uint64_t stream_id;
+ uint64_t ts;
+ uint64_t hash_bits;
+ flb_sds_t hash_key;
+ size_t len;
+ char *tag;
+ char *name;
+ size_t tag_len;
+ struct flb_tail_file *file;
+ struct stat lst;
+ flb_sds_t inode_str;
+
+ if (!S_ISREG(st->st_mode)) {
+ return -1;
+ }
+
+ if (flb_tail_file_exists(st, ctx) == FLB_TRUE) {
+ return -1;
+ }
+
+ fd = open(path, O_RDONLY);
+ if (fd == -1) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "cannot open %s", path);
+ return -1;
+ }
+
+ file = flb_calloc(1, sizeof(struct flb_tail_file));
+ if (!file) {
+ flb_errno();
+ goto error;
+ }
+
+ /* Initialize */
+ file->watch_fd = -1;
+ file->fd = fd;
+
+ /* On non-windows environments check if the original path is a link */
+ ret = lstat(path, &lst);
+ if (ret == 0) {
+ if (S_ISLNK(lst.st_mode)) {
+ file->is_link = FLB_TRUE;
+ file->link_inode = lst.st_ino;
+ }
+ }
+
+ /* get unique hash for this file */
+ ret = stat_to_hash_bits(ctx, st, &hash_bits);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "error procesisng hash bits for file %s", path);
+ goto error;
+ }
+ file->hash_bits = hash_bits;
+
+ /* store the hash key used for hash_bits */
+ ret = stat_to_hash_key(ctx, st, &hash_key);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "error procesisng hash key for file %s", path);
+ goto error;
+ }
+ file->hash_key = hash_key;
+
+ file->inode = st->st_ino;
+ file->offset = 0;
+ file->size = st->st_size;
+ file->buf_len = 0;
+ file->parsed = 0;
+ file->config = ctx;
+ file->tail_mode = mode;
+ file->tag_len = 0;
+ file->tag_buf = NULL;
+ file->rotated = 0;
+ file->pending_bytes = 0;
+ file->mult_firstline = FLB_FALSE;
+ file->mult_keys = 0;
+ file->mult_flush_timeout = 0;
+ file->mult_skipping = FLB_FALSE;
+
+ /*
+ * Duplicate string into 'file' structure, the called function
+ * take cares to resolve real-name of the file in case we are
+ * running in a non-Linux system.
+ *
+ * Depending of the operating system, the way to obtain the file
+ * name associated to it file descriptor can have different behaviors
+ * specifically if it root path it's under a symbolic link. On Linux
+ * we can trust the file name but in others it's better to solve it
+ * with some extra calls.
+ */
+ ret = flb_tail_file_name_dup(path, file);
+ if (!file->name) {
+ flb_errno();
+ goto error;
+ }
+
+ /* We keep a copy of the initial filename in orig_name. This is required
+ * for path_key to continue working after rotation. */
+ file->orig_name = flb_strdup(file->name);
+ if (!file->orig_name) {
+ flb_free(file->name);
+ flb_errno();
+ goto error;
+ }
+ file->orig_name_len = file->name_len;
+
+ /* multiline msgpack buffers */
+ file->mult_records = 0;
+ msgpack_sbuffer_init(&file->mult_sbuf);
+ msgpack_packer_init(&file->mult_pck, &file->mult_sbuf,
+ msgpack_sbuffer_write);
+
+ /* docker mode */
+ file->dmode_flush_timeout = 0;
+ file->dmode_complete = true;
+ file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0);
+ file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0);
+ file->dmode_firstline = false;
+#ifdef FLB_HAVE_SQLDB
+ file->db_id = 0;
+#endif
+ file->skip_next = FLB_FALSE;
+ file->skip_warn = FLB_FALSE;
+
+ /* Multiline core mode */
+ if (ctx->ml_ctx) {
+ /*
+ * Create inode str to get stream_id.
+ *
+ * If stream_id is created by filename,
+ * it will be same after file rotation and it causes invalid destruction:
+ *
+ * - https://github.com/fluent/fluent-bit/issues/4190
+ */
+ inode_str = flb_sds_create_size(64);
+ flb_sds_printf(&inode_str, "%"PRIu64, file->inode);
+
+ /* Create a stream for this file */
+ ret = flb_ml_stream_create(ctx->ml_ctx,
+ inode_str, flb_sds_len(inode_str),
+ ml_flush_callback, file,
+ &stream_id);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins,
+ "could not create multiline stream for file: %s",
+ inode_str);
+ flb_sds_destroy(inode_str);
+ goto error;
+ }
+ file->ml_stream_id = stream_id;
+ flb_sds_destroy(inode_str);
+
+ /*
+ * Multiline core file buffer: the multiline core functionality invokes a callback everytime a message is ready
+ * to be processed by the caller, this can be a multiline message or a message that is considered 'complete'. In
+ * the previous version of Tail, when it received a message this message was automatically ingested into the pipeline
+ * without any previous buffering which leads to performance degradation.
+ *
+ * The msgpack buffer 'ml_sbuf' keeps all ML provided records and it's flushed just when the file processor finish
+ * processing the "read() bytes".
+ */
+ }
+
+ /* Local buffer */
+ file->buf_size = ctx->buf_chunk_size;
+ file->buf_data = flb_malloc(file->buf_size);
+ if (!file->buf_data) {
+ flb_errno();
+ goto error;
+ }
+
+ /* Initialize (optional) dynamic tag */
+ if (ctx->dynamic_tag == FLB_TRUE) {
+ len = ctx->ins->tag_len + strlen(path) + 1;
+ tag = flb_malloc(len);
+ if (!tag) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "failed to allocate tag buffer");
+ goto error;
+ }
+#ifdef FLB_HAVE_REGEX
+ ret = tag_compose(ctx->ins->tag, ctx->tag_regex, path, tag, &tag_len, ctx);
+#else
+ ret = tag_compose(ctx->ins->tag, path, tag, &tag_len, ctx);
+#endif
+ if (ret == 0) {
+ file->tag_len = tag_len;
+ file->tag_buf = flb_strdup(tag);
+ }
+ flb_free(tag);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "failed to compose tag for file: %s", path);
+ goto error;
+ }
+ }
+ else {
+ file->tag_len = strlen(ctx->ins->tag);
+ file->tag_buf = flb_strdup(ctx->ins->tag);
+ }
+ if (!file->tag_buf) {
+ flb_plg_error(ctx->ins, "failed to set tag for file: %s", path);
+ flb_errno();
+ goto error;
+ }
+
+ if (mode == FLB_TAIL_STATIC) {
+ mk_list_add(&file->_head, &ctx->files_static);
+ ctx->files_static_count++;
+ flb_hash_table_add(ctx->static_hash, file->hash_key, flb_sds_len(file->hash_key),
+ file, sizeof(file));
+ tail_signal_manager(file->config);
+ }
+ else if (mode == FLB_TAIL_EVENT) {
+ mk_list_add(&file->_head, &ctx->files_event);
+ flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key),
+ file, sizeof(file));
+
+ /* Register this file into the fs_event monitoring */
+ ret = flb_tail_fs_add(ctx, file);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not register file into fs_events");
+ goto error;
+ }
+ }
+
+ /* Set the file position (database offset, head or tail) */
+ ret = set_file_position(ctx, file);
+ if (ret == -1) {
+ flb_tail_file_remove(file);
+ goto error;
+ }
+
+ /* Remaining bytes to read */
+ file->pending_bytes = file->size - file->offset;
+
+#ifdef FLB_HAVE_METRICS
+ name = (char *) flb_input_name(ctx->ins);
+ ts = cfl_time_now();
+ cmt_counter_inc(ctx->cmt_files_opened, ts, 1, (char *[]) {name});
+
+ /* Old api */
+ flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics);
+#endif
+
+ file->sl_log_event_encoder = flb_log_event_encoder_create(
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (file->sl_log_event_encoder == NULL) {
+ flb_tail_file_remove(file);
+
+ goto error;
+ }
+
+ file->ml_log_event_encoder = flb_log_event_encoder_create(
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (file->ml_log_event_encoder == NULL) {
+ flb_tail_file_remove(file);
+
+ goto error;
+ }
+
+ flb_plg_debug(ctx->ins,
+ "inode=%"PRIu64" with offset=%"PRId64" appended as %s",
+ file->inode, file->offset, path);
+ return 0;
+
+error:
+ if (file) {
+ if (file->buf_data) {
+ flb_free(file->buf_data);
+ }
+ if (file->name) {
+ flb_free(file->name);
+ }
+ flb_free(file);
+ }
+ close(fd);
+
+ return -1;
+}
+
+void flb_tail_file_remove(struct flb_tail_file *file)
+{
+ uint64_t ts;
+ char *name;
+ struct flb_tail_config *ctx;
+
+ ctx = file->config;
+
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" removing file name %s",
+ file->inode, file->name);
+
+ /* remove the multiline.core stream */
+ if (ctx->ml_ctx && file->ml_stream_id > 0) {
+ /* destroy ml stream */
+ flb_ml_stream_id_destroy_all(ctx->ml_ctx, file->ml_stream_id);
+ }
+
+ if (file->rotated > 0) {
+#ifdef FLB_HAVE_SQLDB
+ /*
+ * Make sure to remove a the file entry from the database if the file
+ * was rotated and it's not longer being monitored.
+ */
+ if (ctx->db) {
+ flb_tail_db_file_delete(file, file->config);
+ }
+#endif
+ mk_list_del(&file->_rotate_head);
+ }
+
+ msgpack_sbuffer_destroy(&file->mult_sbuf);
+
+ if (file->sl_log_event_encoder != NULL) {
+ flb_log_event_encoder_destroy(file->sl_log_event_encoder);
+ }
+
+ if (file->ml_log_event_encoder != NULL) {
+ flb_log_event_encoder_destroy(file->ml_log_event_encoder);
+ }
+
+ flb_sds_destroy(file->dmode_buf);
+ flb_sds_destroy(file->dmode_lastline);
+ mk_list_del(&file->_head);
+ flb_tail_fs_remove(ctx, file);
+
+ /* avoid deleting file with -1 fd */
+ if (file->fd != -1) {
+ close(file->fd);
+ }
+ if (file->tag_buf) {
+ flb_free(file->tag_buf);
+ }
+
+ /* remove any potential entry from the hash tables */
+ flb_hash_table_del(ctx->static_hash, file->hash_key);
+ flb_hash_table_del(ctx->event_hash, file->hash_key);
+
+ flb_free(file->buf_data);
+ flb_free(file->name);
+ flb_free(file->orig_name);
+ flb_free(file->real_name);
+ flb_sds_destroy(file->hash_key);
+
+#ifdef FLB_HAVE_METRICS
+ name = (char *) flb_input_name(ctx->ins);
+ ts = cfl_time_now();
+ cmt_counter_inc(ctx->cmt_files_closed, ts, 1, (char *[]) {name});
+
+ /* old api */
+ flb_metrics_sum(FLB_TAIL_METRIC_F_CLOSED, 1, ctx->ins->metrics);
+#endif
+
+ flb_free(file);
+}
+
+int flb_tail_file_remove_all(struct flb_tail_config *ctx)
+{
+ int count = 0;
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_tail_file *file;
+
+ mk_list_foreach_safe(head, tmp, &ctx->files_static) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ flb_tail_file_remove(file);
+ count++;
+ }
+
+ mk_list_foreach_safe(head, tmp, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ flb_tail_file_remove(file);
+ count++;
+ }
+
+ return count;
+}
+
+static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *file)
+{
+ int ret;
+ int64_t offset;
+ struct stat st;
+
+ ret = fstat(file->fd, &st);
+ if (ret == -1) {
+ flb_errno();
+ return FLB_TAIL_ERROR;
+ }
+
+ /* Check if the file was truncated */
+ if (file->offset > st.st_size) {
+ offset = lseek(file->fd, 0, SEEK_SET);
+ if (offset == -1) {
+ flb_errno();
+ return FLB_TAIL_ERROR;
+ }
+
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s",
+ file->inode, file->name);
+ file->offset = offset;
+ file->buf_len = 0;
+
+ /* Update offset in the database file */
+#ifdef FLB_HAVE_SQLDB
+ if (ctx->db) {
+ flb_tail_db_file_offset(file, ctx);
+ }
+#endif
+ }
+ else {
+ file->size = st.st_size;
+ file->pending_bytes = (st.st_size - file->offset);
+ }
+
+ return FLB_TAIL_OK;
+}
+
+int flb_tail_file_chunk(struct flb_tail_file *file)
+{
+ int ret;
+ char *tmp;
+ size_t size;
+ size_t capacity;
+ size_t processed_bytes;
+ ssize_t bytes;
+ struct flb_tail_config *ctx;
+
+ /* Check if we the engine issued a pause */
+ ctx = file->config;
+ if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) {
+ return FLB_TAIL_BUSY;
+ }
+
+ capacity = (file->buf_size - file->buf_len) - 1;
+ if (capacity < 1) {
+ /*
+ * If there is no more room for more data, try to increase the
+ * buffer under the limit of buffer_max_size.
+ */
+ if (file->buf_size >= ctx->buf_max_size) {
+ if (ctx->skip_long_lines == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, "
+ "lines are too long. Skipping file.", file->name);
+ return FLB_TAIL_ERROR;
+ }
+
+ /* Warn the user */
+ if (file->skip_warn == FLB_FALSE) {
+ flb_plg_warn(ctx->ins, "file=%s have long lines. "
+ "Skipping long lines.", file->name);
+ file->skip_warn = FLB_TRUE;
+ }
+
+ /* Do buffer adjustments */
+ file->offset += file->buf_len;
+ file->buf_len = 0;
+ file->skip_next = FLB_TRUE;
+ }
+ else {
+ size = file->buf_size + ctx->buf_chunk_size;
+ if (size > ctx->buf_max_size) {
+ size = ctx->buf_max_size;
+ }
+
+ /* Increase the buffer size */
+ tmp = flb_realloc(file->buf_data, size);
+ if (tmp) {
+ flb_plg_trace(ctx->ins, "file=%s increase buffer size "
+ "%lu => %lu bytes",
+ file->name, file->buf_size, size);
+ file->buf_data = tmp;
+ file->buf_size = size;
+ }
+ else {
+ flb_errno();
+ flb_plg_error(ctx->ins, "cannot increase buffer size for %s, "
+ "skipping file.", file->name);
+ return FLB_TAIL_ERROR;
+ }
+ }
+ capacity = (file->buf_size - file->buf_len) - 1;
+ }
+
+ bytes = read(file->fd, file->buf_data + file->buf_len, capacity);
+ if (bytes > 0) {
+ /* we read some data, let the content processor take care of it */
+ file->buf_len += bytes;
+ file->buf_data[file->buf_len] = '\0';
+
+ /* Now that we have some data in the buffer, call the data processor
+ * which aims to cut lines and register the entries into the engine.
+ *
+ * The returned value is the absolute offset the file must be seek
+ * now. It may need to get back a few bytes at the beginning of a new
+ * line.
+ */
+ ret = process_content(file, &processed_bytes);
+ if (ret < 0) {
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR",
+ file->inode, file->name);
+ return FLB_TAIL_ERROR;
+ }
+
+ /* Adjust the file offset and buffer */
+ file->offset += processed_bytes;
+ consume_bytes(file->buf_data, processed_bytes, file->buf_len);
+ file->buf_len -= processed_bytes;
+ file->buf_data[file->buf_len] = '\0';
+
+#ifdef FLB_HAVE_SQLDB
+ if (file->config->db) {
+ flb_tail_db_file_offset(file, file->config);
+ }
+#endif
+
+ /* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */
+ ret = adjust_counters(ctx, file);
+
+ /* Data was consumed but likely some bytes still remain */
+ return ret;
+ }
+ else if (bytes == 0) {
+ /* We reached the end of file, let's wait for some incoming data */
+ ret = adjust_counters(ctx, file);
+ if (ret == FLB_TAIL_OK) {
+ return FLB_TAIL_WAIT;
+ }
+ else {
+ return FLB_TAIL_ERROR;
+ }
+ }
+ else {
+ /* error */
+ flb_errno();
+ flb_plg_error(ctx->ins, "error reading %s", file->name);
+ return FLB_TAIL_ERROR;
+ }
+
+ return FLB_TAIL_ERROR;
+}
+
+/* Returns FLB_TRUE if a file has been rotated, otherwise FLB_FALSE */
+int flb_tail_file_is_rotated(struct flb_tail_config *ctx,
+ struct flb_tail_file *file)
+{
+ int ret;
+ char *name;
+ struct stat st;
+
+ /*
+ * Do not double-check already rotated files since the caller of this
+ * function will trigger a rotation.
+ */
+ if (file->rotated != 0) {
+ return FLB_FALSE;
+ }
+
+ /* Check if the 'original monitored file' is a link and rotated */
+ if (file->is_link == FLB_TRUE) {
+ ret = lstat(file->name, &st);
+ if (ret == -1) {
+ /* Broken link or missing file */
+ if (errno == ENOENT) {
+ flb_plg_info(ctx->ins, "inode=%"PRIu64" link_rotated: %s",
+ file->link_inode, file->name);
+ return FLB_TRUE;
+ }
+ else {
+ flb_errno();
+ flb_plg_error(ctx->ins,
+ "link_inode=%"PRIu64" cannot detect if file: %s",
+ file->link_inode, file->name);
+ return -1;
+ }
+ }
+ else {
+ /* The file name is there, check if the same that we have */
+ if (st.st_ino != file->link_inode) {
+ return FLB_TRUE;
+ }
+ }
+ }
+
+ /* Retrieve the real file name, operating system lookup */
+ name = flb_tail_file_name(file);
+ if (!name) {
+ flb_plg_error(ctx->ins,
+ "inode=%"PRIu64" cannot detect if file was rotated: %s",
+ file->inode, file->name);
+ return -1;
+ }
+
+
+ /* Get stats from the file name */
+ ret = stat(name, &st);
+ if (ret == -1) {
+ flb_errno();
+ flb_free(name);
+ return -1;
+ }
+
+ /* Compare inodes and names */
+ if (file->inode == st.st_ino &&
+ flb_tail_target_file_name_cmp(name, file) == 0) {
+ flb_free(name);
+ return FLB_FALSE;
+ }
+
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated: %s => %s",
+ file->inode, file->name, name);
+
+ flb_free(name);
+ return FLB_TRUE;
+}
+
+/* Promote a event in the static list to the dynamic 'events' interface */
+int flb_tail_file_to_event(struct flb_tail_file *file)
+{
+ int ret;
+ struct stat st;
+ struct flb_tail_config *ctx = file->config;
+
+ /* Check if the file promoted have pending bytes */
+ ret = fstat(file->fd, &st);
+ if (ret != 0) {
+ flb_errno();
+ return -1;
+ }
+
+ if (file->offset < st.st_size) {
+ file->pending_bytes = (st.st_size - file->offset);
+ tail_signal_pending(file->config);
+ }
+ else {
+ file->pending_bytes = 0;
+ }
+
+ /* Check if the file has been rotated */
+ ret = flb_tail_file_is_rotated(ctx, file);
+ if (ret == FLB_TRUE) {
+ flb_tail_file_rotated(file);
+ }
+
+ /* Notify the fs-event handler that we will start monitoring this 'file' */
+ ret = flb_tail_fs_add(ctx, file);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* List swap: change from 'static' to 'event' list */
+ mk_list_del(&file->_head);
+ ctx->files_static_count--;
+ flb_hash_table_del(ctx->static_hash, file->hash_key);
+
+ mk_list_add(&file->_head, &file->config->files_event);
+ flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key),
+ file, sizeof(file));
+
+ file->tail_mode = FLB_TAIL_EVENT;
+
+ return 0;
+}
+
+/*
+ * Given an open file descriptor, return the filename. This function is a
+ * bit slow and it aims to be used only when a file is rotated.
+ */
+char *flb_tail_file_name(struct flb_tail_file *file)
+{
+ int ret;
+ char *buf;
+#ifdef __linux__
+ ssize_t s;
+ char tmp[128];
+#elif defined(__APPLE__)
+ char path[PATH_MAX];
+#elif defined(FLB_SYSTEM_WINDOWS)
+ HANDLE h;
+#elif defined(FLB_SYSTEM_FREEBSD)
+ struct kinfo_file *file_entries;
+ int file_count;
+ int file_index;
+#endif
+
+ buf = flb_malloc(PATH_MAX);
+ if (!buf) {
+ flb_errno();
+ return NULL;
+ }
+
+#ifdef __linux__
+ ret = snprintf(tmp, sizeof(tmp) - 1, "/proc/%i/fd/%i", getpid(), file->fd);
+ if (ret == -1) {
+ flb_errno();
+ flb_free(buf);
+ return NULL;
+ }
+
+ s = readlink(tmp, buf, PATH_MAX);
+ if (s == -1) {
+ flb_free(buf);
+ flb_errno();
+ return NULL;
+ }
+ buf[s] = '\0';
+
+#elif __APPLE__
+ int len;
+
+ ret = fcntl(file->fd, F_GETPATH, path);
+ if (ret == -1) {
+ flb_errno();
+ flb_free(buf);
+ return NULL;
+ }
+
+ len = strlen(path);
+ memcpy(buf, path, len);
+ buf[len] = '\0';
+
+#elif defined(FLB_SYSTEM_WINDOWS)
+ int len;
+
+ h = (HANDLE) _get_osfhandle(file->fd);
+ if (h == INVALID_HANDLE_VALUE) {
+ flb_errno();
+ flb_free(buf);
+ return NULL;
+ }
+
+ /* This function returns the length of the string excluding "\0"
+ * and the resulting path has a "\\?\" prefix.
+ */
+ len = GetFinalPathNameByHandleA(h, buf, PATH_MAX, FILE_NAME_NORMALIZED);
+ if (len == 0 || len >= PATH_MAX) {
+ flb_free(buf);
+ return NULL;
+ }
+
+ if (strstr(buf, "\\\\?\\")) {
+ memmove(buf, buf + 4, len + 1);
+ }
+#elif defined(FLB_SYSTEM_FREEBSD)
+ if ((file_entries = kinfo_getfile(getpid(), &file_count)) == NULL) {
+ flb_free(buf);
+ return NULL;
+ }
+
+ for (file_index=0; file_index < file_count; file_index++) {
+ if (file_entries[file_index].kf_fd == file->fd) {
+ strncpy(buf, file_entries[file_index].kf_path, PATH_MAX - 1);
+ buf[PATH_MAX - 1] = 0;
+ break;
+ }
+ }
+ free(file_entries);
+#endif
+ return buf;
+}
+
+int flb_tail_file_name_dup(char *path, struct flb_tail_file *file)
+{
+ file->name = flb_strdup(path);
+ if (!file->name) {
+ flb_errno();
+ return -1;
+ }
+ file->name_len = strlen(file->name);
+
+ if (file->real_name) {
+ flb_free(file->real_name);
+ }
+
+ file->real_name = flb_tail_file_name(file);
+ if (!file->real_name) {
+ flb_errno();
+ flb_free(file->name);
+ file->name = NULL;
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Invoked every time a file was rotated */
+int flb_tail_file_rotated(struct flb_tail_file *file)
+{
+ int ret;
+ uint64_t ts;
+ char *name;
+ char *i_name;
+ char *tmp;
+ struct stat st;
+ struct flb_tail_config *ctx = file->config;
+
+ /* Get the new file name */
+ name = flb_tail_file_name(file);
+ if (!name) {
+ return -1;
+ }
+
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated %s -> %s",
+ file->inode, file->name, name);
+
+ /* Update local file entry */
+ tmp = file->name;
+ flb_tail_file_name_dup(name, file);
+ flb_plg_info(ctx->ins, "inode=%"PRIu64" handle rotation(): %s => %s",
+ file->inode, tmp, file->name);
+ if (file->rotated == 0) {
+ file->rotated = time(NULL);
+ mk_list_add(&file->_rotate_head, &file->config->files_rotated);
+
+ /* Rotate the file in the database */
+#ifdef FLB_HAVE_SQLDB
+ if (file->config->db) {
+ ret = flb_tail_db_file_rotate(name, file, file->config);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not rotate file %s->%s in database",
+ file->name, name);
+ }
+ }
+#endif
+
+#ifdef FLB_HAVE_METRICS
+ i_name = (char *) flb_input_name(ctx->ins);
+ ts = cfl_time_now();
+ cmt_counter_inc(ctx->cmt_files_rotated, ts, 1, (char *[]) {i_name});
+
+ /* OLD api */
+ flb_metrics_sum(FLB_TAIL_METRIC_F_ROTATED,
+ 1, file->config->ins->metrics);
+#endif
+
+ /* Check if a new file has been created */
+ ret = stat(tmp, &st);
+ if (ret == 0 && st.st_ino != file->inode) {
+ if (flb_tail_file_exists(&st, ctx) == FLB_FALSE) {
+ ret = flb_tail_file_append(tmp, &st, FLB_TAIL_STATIC, ctx);
+ if (ret == -1) {
+ flb_tail_scan(ctx->path_list, ctx);
+ }
+ else {
+ tail_signal_manager(file->config);
+ }
+ }
+ }
+ }
+ flb_free(tmp);
+ flb_free(name);
+
+ return 0;
+}
+
+static int check_purge_deleted_file(struct flb_tail_config *ctx,
+ struct flb_tail_file *file, time_t ts)
+{
+ int ret;
+ struct stat st;
+
+ ret = fstat(file->fd, &st);
+ if (ret == -1) {
+ flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name);
+ flb_tail_file_remove(file);
+ return FLB_TRUE;
+ }
+
+ if (st.st_nlink == 0) {
+ flb_plg_debug(ctx->ins, "purge: monitored file has been deleted: %s",
+ file->name);
+#ifdef FLB_HAVE_SQLDB
+ if (ctx->db) {
+ /* Remove file entry from the database */
+ flb_tail_db_file_delete(file, file->config);
+ }
+#endif
+ /* Remove file from the monitored list */
+ flb_tail_file_remove(file);
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+/* Purge rotated and deleted files */
+int flb_tail_file_purge(struct flb_input_instance *ins,
+ struct flb_config *config, void *context)
+{
+ int ret;
+ int count = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_tail_file *file;
+ struct flb_tail_config *ctx = context;
+ time_t now;
+ struct stat st;
+
+ /* Rotated files */
+ now = time(NULL);
+ mk_list_foreach_safe(head, tmp, &ctx->files_rotated) {
+ file = mk_list_entry(head, struct flb_tail_file, _rotate_head);
+ if ((file->rotated + ctx->rotate_wait) <= now) {
+ ret = fstat(file->fd, &st);
+ if (ret == 0) {
+ flb_plg_debug(ctx->ins,
+ "inode=%"PRIu64" purge rotated file %s " \
+ "(offset=%"PRId64" / size = %"PRIu64")",
+ file->inode, file->name, file->offset, (uint64_t)st.st_size);
+ if (file->pending_bytes > 0 && flb_input_buf_paused(ins)) {
+ flb_plg_warn(ctx->ins, "purged rotated file while data "
+ "ingestion is paused, consider increasing "
+ "rotate_wait");
+ }
+ }
+ else {
+ flb_plg_debug(ctx->ins,
+ "inode=%"PRIu64" purge rotated file %s (offset=%"PRId64")",
+ file->inode, file->name, file->offset);
+ }
+
+ flb_tail_file_remove(file);
+ count++;
+ }
+ }
+
+ /*
+ * Deleted files: under high load scenarios, exists the chances that in
+ * our event loop we miss some notifications about a file. In order to
+ * sanitize our list of monitored files we will iterate all of them and check
+ * if they have been deleted or not.
+ */
+ mk_list_foreach_safe(head, tmp, &ctx->files_static) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ check_purge_deleted_file(ctx, file, now);
+ }
+ mk_list_foreach_safe(head, tmp, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ check_purge_deleted_file(ctx, file, now);
+ }
+
+ return count;
+}
diff --git a/src/fluent-bit/plugins/in_tail/tail_file.h b/src/fluent-bit/plugins/in_tail/tail_file.h
new file mode 100644
index 000000000..796224c37
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_file.h
@@ -0,0 +1,137 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_FILE_H
+#define FLB_TAIL_FILE_H
+
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_input.h>
+
+#include "tail.h"
+#include "tail_fs.h"
+#include "tail_config.h"
+#include "tail_file_internal.h"
+
+#ifdef FLB_SYSTEM_WINDOWS
+#include "win32.h"
+#endif
+
+#ifdef FLB_HAVE_REGEX
+#define FLB_HASH_TABLE_SIZE 50
+#endif
+
+/* return the file modification time in seconds since epoch */
+static inline int64_t flb_tail_stat_mtime(struct stat *st)
+{
+#if defined(FLB_HAVE_WINDOWS)
+ return (int64_t) st->st_mtime;
+#elif defined(__APPLE__) && !defined(_POSIX_C_SOURCE)
+ return (int64_t) st->st_mtimespec.tv_sec;
+#elif (_POSIX_C_SOURCE >= 200809L || \
+ defined(_BSD_SOURCE) || defined(_SVID_SOURCE) || \
+ defined(__BIONIC__) || (defined (__SVR4) && defined (__sun)) || \
+ defined(__FreeBSD__) || defined (__linux__))
+ return (int64_t) st->st_mtim.tv_sec;
+#elif defined(_AIX)
+ return (int64_t) st->st_mtime;
+#else
+ return (int64_t) st->st_mtime;
+#endif
+
+ /* backend unsupported: submit a PR :) */
+ return -1;
+}
+
+static inline int flb_tail_target_file_name_cmp(char *name,
+ struct flb_tail_file *file)
+{
+ int ret;
+ char *name_a = NULL;
+ char *name_b = NULL;
+ char *base_a = NULL;
+ char *base_b = NULL;
+
+ name_a = flb_strdup(name);
+ if (!name_a) {
+ flb_errno();
+ ret = -1;
+ goto out;
+ }
+
+ base_a = flb_strdup(basename(name_a));
+ if (!base_a) {
+ flb_errno();
+ ret = -1;
+ goto out;
+ }
+
+#if defined(FLB_SYSTEM_WINDOWS)
+ name_b = flb_strdup(file->real_name);
+ if (!name_b) {
+ flb_errno();
+ ret = -1;
+ goto out;
+ }
+
+ base_b = basename(name_b);
+ ret = _stricmp(base_a, base_b);
+#else
+ name_b = flb_strdup(file->real_name);
+ if (!name_b) {
+ flb_errno();
+ ret = -1;
+ goto out;
+ }
+ base_b = basename(name_b);
+ ret = strcmp(base_a, base_b);
+#endif
+
+ out:
+ flb_free(name_a);
+ flb_free(name_b);
+ flb_free(base_a);
+
+ /* FYI: 'base_b' never points to a new allocation, no flb_free is needed */
+
+ return ret;
+}
+
+int flb_tail_file_name_dup(char *path, struct flb_tail_file *file);
+int flb_tail_file_to_event(struct flb_tail_file *file);
+int flb_tail_file_chunk(struct flb_tail_file *file);
+int flb_tail_file_append(char *path, struct stat *st, int mode,
+ struct flb_tail_config *ctx);
+void flb_tail_file_remove(struct flb_tail_file *file);
+int flb_tail_file_remove_all(struct flb_tail_config *ctx);
+char *flb_tail_file_name(struct flb_tail_file *file);
+int flb_tail_file_is_rotated(struct flb_tail_config *ctx,
+ struct flb_tail_file *file);
+int flb_tail_file_rotated(struct flb_tail_file *file);
+int flb_tail_file_purge(struct flb_input_instance *ins,
+ struct flb_config *config, void *context);
+int flb_tail_pack_line_map(struct flb_time *time, char **data,
+ size_t *data_size, struct flb_tail_file *file,
+ size_t processed_bytes);
+int flb_tail_file_pack_line(struct flb_time *time, char *data, size_t data_size,
+ struct flb_tail_file *file, size_t processed_bytes);
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_file_internal.h b/src/fluent-bit/plugins/in_tail/tail_file_internal.h
new file mode 100644
index 000000000..6d95c87c1
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_file_internal.h
@@ -0,0 +1,130 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_INTERNAL_H
+#define FLB_TAIL_INTERNAL_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#ifdef FLB_HAVE_PARSER
+#include <fluent-bit/multiline/flb_ml.h>
+#endif
+
+#include "tail.h"
+#include "tail_config.h"
+
+struct flb_tail_file {
+ /* Inotify */
+ int watch_fd;
+ /* file lookup info */
+ int fd;
+ int64_t size;
+ int64_t offset;
+ int64_t last_line;
+ uint64_t dev_id;
+ uint64_t inode;
+ uint64_t link_inode;
+ int is_link;
+ char *name; /* target file name given by scan routine */
+ char *real_name; /* real file name in the file system */
+ char *orig_name; /* original file name (before rotation) */
+ size_t name_len;
+ size_t orig_name_len;
+ time_t rotated;
+ int64_t pending_bytes;
+
+ /* dynamic tag for this file */
+ int tag_len;
+ char *tag_buf;
+
+ /* OLD multiline */
+ time_t mult_flush_timeout; /* time when multiline started */
+ int mult_firstline; /* bool: mult firstline found ? */
+ int mult_firstline_append; /* bool: mult firstline appendable ? */
+ int mult_skipping; /* skipping because ignode_older than ? */
+ int mult_keys; /* total number of buffered keys */
+
+ int mult_records; /* multiline records counter mult_sbuf */
+ msgpack_sbuffer mult_sbuf; /* temporary msgpack buffer */
+ msgpack_packer mult_pck; /* temporary msgpack packer */
+ struct flb_time mult_time; /* multiline time parsed from first line */
+
+ /* OLD docker mode */
+ time_t dmode_flush_timeout; /* time when docker mode started */
+ flb_sds_t dmode_buf; /* buffer for docker mode */
+ flb_sds_t dmode_lastline; /* last incomplete line */
+ bool dmode_complete; /* buffer contains completed log */
+ bool dmode_firstline; /* dmode mult firstline found ? */
+
+ /* multiline engine: file stream_id and local buffers */
+ uint64_t ml_stream_id;
+
+ /* content parsing, positions and buffer */
+ size_t parsed;
+ size_t buf_len;
+ size_t buf_size;
+ char *buf_data;
+
+ /*
+ * This value represent the number of bytes procesed by process_content()
+ * in the last iteration.
+ */
+ size_t last_processed_bytes;
+
+ /*
+ * Long-lines handling: this flag is enabled when a previous line was
+ * too long and the buffer did not contain a \n, so when reaching the
+ * missing \n, skip that content and move forward.
+ *
+ * This flag is only set when Skip_Long_Lines is On.
+ */
+ int skip_next;
+
+ /* Did the plugin already warn the user about long lines ? */
+ int skip_warn;
+
+ /* Opaque data type for specific fs-event backend data */
+ void *fs_backend;
+
+ /* database reference */
+ uint64_t db_id;
+
+ uint64_t hash_bits;
+ flb_sds_t hash_key;
+
+ /* There are dedicated log event encoders for
+ * single and multi line events because I am respecting
+ * the old behavior which resulted in grouping both types
+ * of logs in tail_file.c but I don't know if this is
+ * strictly necessary.
+ */
+ struct flb_log_event_encoder *ml_log_event_encoder;
+ struct flb_log_event_encoder *sl_log_event_encoder;
+
+ /* reference */
+ int tail_mode;
+ struct flb_tail_config *config;
+ struct mk_list _head;
+ struct mk_list _rotate_head;
+};
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_fs.h b/src/fluent-bit/plugins/in_tail/tail_fs.h
new file mode 100644
index 000000000..948954333
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_fs.h
@@ -0,0 +1,96 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_FS_H
+#define FLB_TAIL_FS_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+
+#include "tail_config.h"
+#include "tail_file_internal.h"
+
+#include "tail_fs_stat.h"
+#ifdef FLB_HAVE_INOTIFY
+#include "tail_fs_inotify.h"
+#endif
+
+static inline int flb_tail_fs_init(struct flb_input_instance *in,
+ struct flb_tail_config *ctx, struct flb_config *config)
+{
+#ifdef FLB_HAVE_INOTIFY
+ if (ctx->inotify_watcher) {
+ return flb_tail_fs_inotify_init(in, ctx, config);
+ }
+#endif
+ return flb_tail_fs_stat_init(in, ctx, config);
+}
+
+static inline void flb_tail_fs_pause(struct flb_tail_config *ctx)
+{
+#ifdef FLB_HAVE_INOTIFY
+ if (ctx->inotify_watcher) {
+ return flb_tail_fs_inotify_pause(ctx);
+ }
+#endif
+ return flb_tail_fs_stat_pause(ctx);
+}
+
+static inline void flb_tail_fs_resume(struct flb_tail_config *ctx)
+{
+#ifdef FLB_HAVE_INOTIFY
+ if (ctx->inotify_watcher) {
+ return flb_tail_fs_inotify_resume(ctx);
+ }
+#endif
+ return flb_tail_fs_stat_resume(ctx);
+}
+
+static inline int flb_tail_fs_add(struct flb_tail_config *ctx, struct flb_tail_file *file)
+{
+#ifdef FLB_HAVE_INOTIFY
+ if (ctx->inotify_watcher) {
+ return flb_tail_fs_inotify_add(file);
+ }
+#endif
+ return flb_tail_fs_stat_add(file);
+}
+
+static inline int flb_tail_fs_remove(struct flb_tail_config *ctx, struct flb_tail_file *file)
+{
+#ifdef FLB_HAVE_INOTIFY
+ if (ctx->inotify_watcher) {
+ return flb_tail_fs_inotify_remove(file);
+ }
+#endif
+ return flb_tail_fs_stat_remove(file);
+}
+
+static inline int flb_tail_fs_exit(struct flb_tail_config *ctx)
+{
+#ifdef FLB_HAVE_INOTIFY
+ if (ctx->inotify_watcher) {
+ return flb_tail_fs_inotify_exit(ctx);
+ }
+#endif
+ return flb_tail_fs_stat_exit(ctx);
+}
+
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_fs_inotify.c b/src/fluent-bit/plugins/in_tail/tail_fs_inotify.c
new file mode 100644
index 000000000..59d10ca08
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_fs_inotify.c
@@ -0,0 +1,433 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _DEFAULT_SOURCE
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_input_plugin.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/inotify.h>
+
+#include "tail_config.h"
+#include "tail_file.h"
+#include "tail_db.h"
+#include "tail_signal.h"
+
+#include <limits.h>
+#include <fcntl.h>
+
+#include <sys/ioctl.h>
+
+static int debug_event_mask(struct flb_tail_config *ctx,
+ struct flb_tail_file *file,
+ uint32_t mask)
+{
+ flb_sds_t buf;
+ int buf_size = 256;
+
+ /* Only enter this function if debug mode is allowed */
+ if (flb_log_check(FLB_LOG_DEBUG) == 0) {
+ return 0;
+ }
+
+ if (file) {
+ buf_size = file->name_len + 128;
+ }
+
+ if (buf_size < 256) {
+ buf_size = 256;
+ }
+
+ /* Create buffer */
+ buf = flb_sds_create_size(buf_size);
+ if (!buf) {
+ return -1;
+ }
+
+ /* Print info into sds */
+ if (file) {
+ flb_sds_printf(&buf, "inode=%"PRIu64", %s, events: ", file->inode, file->name);
+ }
+ else {
+ flb_sds_printf(&buf, "events: ");
+ }
+
+ if (mask & IN_ATTRIB) {
+ flb_sds_printf(&buf, "IN_ATTRIB ");
+ }
+ if (mask & IN_IGNORED) {
+ flb_sds_printf(&buf, "IN_IGNORED ");
+ }
+ if (mask & IN_MODIFY) {
+ flb_sds_printf(&buf, "IN_MODIFY ");
+ }
+ if (mask & IN_MOVE_SELF) {
+ flb_sds_printf(&buf, "IN_MOVE_SELF ");
+ }
+ if (mask & IN_Q_OVERFLOW) {
+ flb_sds_printf(&buf, "IN_Q_OVERFLOW ");
+ }
+
+ flb_plg_debug(ctx->ins, "%s", buf);
+ flb_sds_destroy(buf);
+
+ return 0;
+}
+
+static int tail_fs_add(struct flb_tail_file *file, int check_rotated)
+{
+ int flags;
+ int watch_fd;
+ char *name;
+ struct flb_tail_config *ctx = file->config;
+
+ /*
+ * If there is no watcher associated, we only want to monitor events if
+ * this file is rotated to somewhere. Note at this point we are polling
+ * lines from the file and once we reach EOF (and a watch_fd exists),
+ * we update the flags to receive notifications.
+ */
+ flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW;
+
+ if (check_rotated == FLB_TRUE) {
+ flags |= IN_MOVE_SELF;
+ }
+
+ /*
+ * Double check real name of the file associated to the inode:
+ *
+ * Inotify interface in the Kernel uses the inode number as a real reference
+ * for the file we have opened. If for some reason the file we are pointing
+ * out in file->name has been rotated and not been updated, we might not add
+ * the watch to the real file we aim to.
+ *
+ * A case like this can generate the issue:
+ *
+ * 1. inode=1 : file a.log is being watched
+ * 2. inode=1 : file a.log is rotated to a.log.1, but notification not
+ * delivered yet.
+ * 3. inode=2 : new file 'a.log' is created
+ * 4. inode=2 : the scan_path routine discover the new 'a.log' file
+ * 5. inode=2 : add an inotify watcher for 'a.log'
+ * 6. conflict: inotify_add_watch() receives the path 'a.log',
+ */
+
+ name = flb_tail_file_name(file);
+ if (!name) {
+ flb_plg_error(ctx->ins, "inode=%"PRIu64" cannot get real filename for inotify",
+ file->inode);
+ return -1;
+ }
+
+ /* Register or update the flags */
+ watch_fd = inotify_add_watch(ctx->fd_notify, name, flags);
+ flb_free(name);
+
+ if (watch_fd == -1) {
+ flb_errno();
+ if (errno == ENOSPC) {
+ flb_plg_error(ctx->ins, "inotify: The user limit on the total "
+ "number of inotify watches was reached or the kernel "
+ "failed to allocate a needed resource (ENOSPC)");
+ }
+ return -1;
+ }
+ file->watch_fd = watch_fd;
+ flb_plg_info(ctx->ins, "inotify_fs_add(): inode=%"PRIu64" watch_fd=%i name=%s",
+ file->inode, watch_fd, file->name);
+ return 0;
+}
+
+static int flb_tail_fs_add_rotated(struct flb_tail_file *file)
+{
+ return tail_fs_add(file, FLB_FALSE);
+}
+
+static int tail_fs_event(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ int ret;
+ int64_t offset;
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_tail_config *ctx = in_context;
+ struct flb_tail_file *file = NULL;
+ struct inotify_event ev;
+ struct stat st;
+
+ /* Read the event */
+ ret = read(ctx->fd_notify, &ev, sizeof(struct inotify_event));
+ if (ret < 1) {
+ return -1;
+ }
+
+ /* Lookup watched file */
+ mk_list_foreach_safe(head, tmp, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ if (file->watch_fd != ev.wd) {
+ file = NULL;
+ continue;
+ }
+ break;
+ }
+
+ if (!file) {
+ return -1;
+ }
+
+ /* Debug event */
+ debug_event_mask(ctx, file, ev.mask);
+
+ if (ev.mask & IN_IGNORED) {
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" watch_fd=%i IN_IGNORED",
+ file->inode, ev.wd);
+ return -1;
+ }
+
+ /* Check file rotation (only if it has not been rotated before) */
+ if (ev.mask & IN_MOVE_SELF && file->rotated == 0) {
+ flb_plg_debug(ins, "inode=%"PRIu64" rotated IN_MOVE SELF '%s'",
+ file->inode, file->name);
+
+ /* A rotated file must be re-registered */
+ flb_tail_file_rotated(file);
+ flb_tail_fs_remove(ctx, file);
+ flb_tail_fs_add_rotated(file);
+ }
+
+ ret = fstat(file->fd, &st);
+ if (ret == -1) {
+ flb_plg_debug(ins, "inode=%"PRIu64" error stat(2) %s, removing",
+ file->inode, file->name);
+ flb_tail_file_remove(file);
+ return 0;
+ }
+ file->size = st.st_size;
+ file->pending_bytes = (file->size - file->offset);
+
+ /* File was removed ? */
+ if (ev.mask & IN_ATTRIB) {
+ /* Check if the file have been deleted */
+ if (st.st_nlink == 0) {
+ flb_plg_debug(ins, "inode=%"PRIu64" file has been deleted: %s",
+ file->inode, file->name);
+
+#ifdef FLB_HAVE_SQLDB
+ if (ctx->db) {
+ /* Remove file entry from the database */
+ flb_tail_db_file_delete(file, ctx);
+ }
+#endif
+ /* Remove file from the monitored list */
+ flb_tail_file_remove(file);
+ return 0;
+ }
+ }
+
+ if (ev.mask & IN_MODIFY) {
+ /*
+ * The file was modified, check how many new bytes do
+ * we have.
+ */
+
+ /* Check if the file was truncated */
+ if (file->offset > st.st_size) {
+ offset = lseek(file->fd, 0, SEEK_SET);
+ if (offset == -1) {
+ flb_errno();
+ return -1;
+ }
+
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s",
+ file->inode, file->name);
+ file->offset = offset;
+ file->buf_len = 0;
+
+ /* Update offset in the database file */
+#ifdef FLB_HAVE_SQLDB
+ if (ctx->db) {
+ flb_tail_db_file_offset(file, ctx);
+ }
+#endif
+ }
+ }
+
+ /* Collect the data */
+ ret = in_tail_collect_event(file, config);
+ if (ret != FLB_TAIL_ERROR) {
+ /*
+ * Due to read buffer size capacity, there are some cases where the
+ * read operation cannot consume all new data available on one
+ * round; upon successfull read(2) some data can still remain.
+ *
+ * If that is the case, we set in the structure how
+ * many bytes are available 'now', so then the further
+ * routine that check pending bytes and then the inotified-file
+ * can process them properly after an internal signal.
+ *
+ * The goal to defer this routine is to avoid a blocking
+ * read(2) operation, that might kill performance. Just let's
+ * wait a second and do a good job.
+ */
+ tail_signal_pending(ctx);
+ }
+ else {
+ return ret;
+ }
+
+ return 0;
+}
+
+static int in_tail_progress_check_callback(struct flb_input_instance *ins,
+ struct flb_config *config, void *context)
+{
+ int ret = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_tail_config *ctx = context;
+ struct flb_tail_file *file;
+ int pending_data_detected;
+ struct stat st;
+
+ (void) config;
+
+ pending_data_detected = FLB_FALSE;
+
+ mk_list_foreach_safe(head, tmp, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+
+ if (file->offset < file->size) {
+ pending_data_detected = FLB_TRUE;
+
+ continue;
+ }
+
+ ret = fstat(file->fd, &st);
+ if (ret == -1) {
+ flb_errno();
+ flb_plg_error(ins, "fstat error");
+
+ continue;
+ }
+
+ if (file->offset < st.st_size) {
+ file->size = st.st_size;
+ file->pending_bytes = (file->size - file->offset);
+
+ pending_data_detected = FLB_TRUE;
+ }
+ }
+
+ if (pending_data_detected) {
+ tail_signal_pending(ctx);
+ }
+
+ return 0;
+}
+
+/* File System events based on Inotify(2). Linux >= 2.6.32 is suggested */
+int flb_tail_fs_inotify_init(struct flb_input_instance *in,
+ struct flb_tail_config *ctx, struct flb_config *config)
+{
+ int fd;
+ int ret;
+
+ flb_plg_debug(ctx->ins, "flb_tail_fs_inotify_init() initializing inotify tail input");
+
+ /* Create inotify instance */
+ fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
+ if (fd == -1) {
+ flb_errno();
+ return -1;
+ }
+ flb_plg_debug(ctx->ins, "inotify watch fd=%i", fd);
+ ctx->fd_notify = fd;
+
+ /* This backend use Fluent Bit event-loop to trigger notifications */
+ ret = flb_input_set_collector_event(in, tail_fs_event,
+ ctx->fd_notify, config);
+ if (ret < 0) {
+ close(fd);
+ return -1;
+ }
+ ctx->coll_fd_fs1 = ret;
+
+ /* Register callback to check current tail offsets */
+ ret = flb_input_set_collector_time(in, in_tail_progress_check_callback,
+ ctx->progress_check_interval,
+ ctx->progress_check_interval_nsec,
+ config);
+ if (ret == -1) {
+ flb_tail_config_destroy(ctx);
+ return -1;
+ }
+ ctx->coll_fd_progress_check = ret;
+
+ return 0;
+}
+
+void flb_tail_fs_inotify_pause(struct flb_tail_config *ctx)
+{
+ flb_input_collector_pause(ctx->coll_fd_fs1, ctx->ins);
+}
+
+void flb_tail_fs_inotify_resume(struct flb_tail_config *ctx)
+{
+ flb_input_collector_resume(ctx->coll_fd_fs1, ctx->ins);
+}
+
+int flb_tail_fs_inotify_add(struct flb_tail_file *file)
+{
+ int ret;
+ struct flb_tail_config *ctx = file->config;
+
+ ret = tail_fs_add(file, FLB_TRUE);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "inode=%"PRIu64" cannot register file %s",
+ file->inode, file->name);
+ return -1;
+ }
+
+ return 0;
+}
+
+int flb_tail_fs_inotify_remove(struct flb_tail_file *file)
+{
+ struct flb_tail_config *ctx = file->config;
+
+ if (file->watch_fd == -1) {
+ return 0;
+ }
+
+ flb_plg_info(ctx->ins, "inotify_fs_remove(): inode=%"PRIu64" watch_fd=%i",
+ file->inode, file->watch_fd);
+
+ inotify_rm_watch(file->config->fd_notify, file->watch_fd);
+ file->watch_fd = -1;
+ return 0;
+}
+
+int flb_tail_fs_inotify_exit(struct flb_tail_config *ctx)
+{
+ return close(ctx->fd_notify);
+}
diff --git a/src/fluent-bit/plugins/in_tail/tail_fs_inotify.h b/src/fluent-bit/plugins/in_tail/tail_fs_inotify.h
new file mode 100644
index 000000000..128ab0624
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_fs_inotify.h
@@ -0,0 +1,37 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_FS_INOTIFY_H
+#define FLB_TAIL_FS_INOTIFY_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+
+#include "tail_config.h"
+#include "tail_file_internal.h"
+
+int flb_tail_fs_inotify_init(struct flb_input_instance *in,
+ struct flb_tail_config *ctx, struct flb_config *config);
+int flb_tail_fs_inotify_add(struct flb_tail_file *file);
+int flb_tail_fs_inotify_remove(struct flb_tail_file *file);
+int flb_tail_fs_inotify_exit(struct flb_tail_config *ctx);
+void flb_tail_fs_inotify_pause(struct flb_tail_config *ctx);
+void flb_tail_fs_inotify_resume(struct flb_tail_config *ctx);
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_fs_stat.c b/src/fluent-bit/plugins/in_tail/tail_fs_stat.c
new file mode 100644
index 000000000..6b312c9bd
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_fs_stat.c
@@ -0,0 +1,253 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _DEFAULT_SOURCE
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input_plugin.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "tail_file.h"
+#include "tail_db.h"
+#include "tail_config.h"
+#include "tail_signal.h"
+
+#ifdef FLB_SYSTEM_WINDOWS
+#include "win32.h"
+#endif
+
+struct fs_stat {
+ /* last time check */
+ time_t checked;
+
+ /* previous status */
+ struct stat st;
+};
+
+static int tail_fs_event(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ int ret;
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_tail_config *ctx = in_context;
+ struct flb_tail_file *file = NULL;
+ struct fs_stat *fst;
+ struct stat st;
+ time_t t;
+
+ t = time(NULL);
+
+ /* Lookup watched file */
+ mk_list_foreach_safe(head, tmp, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ fst = file->fs_backend;
+
+ /* Check current status of the file */
+ ret = fstat(file->fd, &st);
+ if (ret == -1) {
+ flb_errno();
+ continue;
+ }
+
+ /* Check if the file was modified */
+ if ((fst->st.st_mtime != st.st_mtime) ||
+ (fst->st.st_size != st.st_size)) {
+ /* Update stat info and trigger the notification */
+ memcpy(&fst->st, &st, sizeof(struct stat));
+ fst->checked = t;
+ in_tail_collect_event(file, config);
+ }
+ }
+
+ return 0;
+}
+
+static int tail_fs_check(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ int ret;
+ int64_t offset;
+ char *name;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_tail_config *ctx = in_context;
+ struct flb_tail_file *file = NULL;
+ struct fs_stat *fst;
+ struct stat st;
+
+ /* Lookup watched file */
+ mk_list_foreach_safe(head, tmp, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ fst = file->fs_backend;
+
+ ret = fstat(file->fd, &st);
+ if (ret == -1) {
+ flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name);
+ flb_tail_file_remove(file);
+ continue;
+ }
+
+ /* Check if the file have been deleted */
+ if (st.st_nlink == 0) {
+ flb_plg_debug(ctx->ins, "file has been deleted: %s", file->name);
+#ifdef FLB_HAVE_SQLDB
+ if (ctx->db) {
+ /* Remove file entry from the database */
+ flb_tail_db_file_delete(file, ctx);
+ }
+#endif
+ flb_tail_file_remove(file);
+ continue;
+ }
+
+ /* Check if the file was truncated */
+ if (file->offset > st.st_size) {
+ offset = lseek(file->fd, 0, SEEK_SET);
+ if (offset == -1) {
+ flb_errno();
+ return -1;
+ }
+
+ flb_plg_debug(ctx->ins, "file truncated %s", file->name);
+ file->offset = offset;
+ file->buf_len = 0;
+ memcpy(&fst->st, &st, sizeof(struct stat));
+
+#ifdef FLB_HAVE_SQLDB
+ /* Update offset in database file */
+ if (ctx->db) {
+ flb_tail_db_file_offset(file, ctx);
+ }
+#endif
+ }
+
+ if (file->offset < st.st_size) {
+ file->pending_bytes = (st.st_size - file->offset);
+ tail_signal_pending(ctx);
+ }
+ else {
+ file->pending_bytes = 0;
+ }
+
+
+ /* Discover the current file name for the open file descriptor */
+ name = flb_tail_file_name(file);
+ if (!name) {
+ flb_plg_debug(ctx->ins, "could not resolve %s, removing", file->name);
+ flb_tail_file_remove(file);
+ continue;
+ }
+
+ /*
+ * Check if file still exists. This method requires explicity that the
+ * user is using an absolute path, otherwise we will be rotating the
+ * wrong file.
+ *
+ * flb_tail_target_file_name_cmp is a deeper compare than
+ * flb_tail_file_name_cmp. If applicable, it compares to the underlying
+ * real_name of the file.
+ */
+ if (flb_tail_file_is_rotated(ctx, file) == FLB_TRUE) {
+ flb_tail_file_rotated(file);
+ }
+ flb_free(name);
+
+ }
+
+ return 0;
+}
+
+/* File System events based on stat(2) */
+int flb_tail_fs_stat_init(struct flb_input_instance *in,
+ struct flb_tail_config *ctx, struct flb_config *config)
+{
+ int ret;
+
+ flb_plg_debug(ctx->ins, "flb_tail_fs_stat_init() initializing stat tail input");
+
+ /* Set a manual timer to collect events every 0.250 seconds */
+ ret = flb_input_set_collector_time(in, tail_fs_event,
+ 0, 250000000, config);
+ if (ret < 0) {
+ return -1;
+ }
+ ctx->coll_fd_fs1 = ret;
+
+ /* Set a manual timer to check deleted/rotated files every 2.5 seconds */
+ ret = flb_input_set_collector_time(in, tail_fs_check,
+ 2, 500000000, config);
+ if (ret < 0) {
+ return -1;
+ }
+ ctx->coll_fd_fs2 = ret;
+
+ return 0;
+}
+
+void flb_tail_fs_stat_pause(struct flb_tail_config *ctx)
+{
+ flb_input_collector_pause(ctx->coll_fd_fs1, ctx->ins);
+ flb_input_collector_pause(ctx->coll_fd_fs2, ctx->ins);
+}
+
+void flb_tail_fs_stat_resume(struct flb_tail_config *ctx)
+{
+ flb_input_collector_resume(ctx->coll_fd_fs1, ctx->ins);
+ flb_input_collector_resume(ctx->coll_fd_fs2, ctx->ins);
+}
+
+int flb_tail_fs_stat_add(struct flb_tail_file *file)
+{
+ int ret;
+ struct fs_stat *fst;
+
+ fst = flb_malloc(sizeof(struct fs_stat));
+ if (!fst) {
+ flb_errno();
+ return -1;
+ }
+
+ fst->checked = time(NULL);
+ ret = stat(file->name, &fst->st);
+ if (ret == -1) {
+ flb_errno();
+ flb_free(fst);
+ return -1;
+ }
+ file->fs_backend = fst;
+
+ return 0;
+}
+
+int flb_tail_fs_stat_remove(struct flb_tail_file *file)
+{
+ if (file->tail_mode == FLB_TAIL_EVENT) {
+ flb_free(file->fs_backend);
+ }
+ return 0;
+}
+
+int flb_tail_fs_stat_exit(struct flb_tail_config *ctx)
+{
+ (void) ctx;
+ return 0;
+}
diff --git a/src/fluent-bit/plugins/in_tail/tail_fs_stat.h b/src/fluent-bit/plugins/in_tail/tail_fs_stat.h
new file mode 100644
index 000000000..21a0704cb
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_fs_stat.h
@@ -0,0 +1,37 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_FS_STAT_H
+#define FLB_TAIL_FS_STAT_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+
+#include "tail_config.h"
+#include "tail_file_internal.h"
+
+int flb_tail_fs_stat_init(struct flb_input_instance *in,
+ struct flb_tail_config *ctx, struct flb_config *config);
+int flb_tail_fs_stat_add(struct flb_tail_file *file);
+int flb_tail_fs_stat_remove(struct flb_tail_file *file);
+int flb_tail_fs_stat_exit(struct flb_tail_config *ctx);
+void flb_tail_fs_stat_pause(struct flb_tail_config *ctx);
+void flb_tail_fs_stat_resume(struct flb_tail_config *ctx);
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_multiline.c b/src/fluent-bit/plugins/in_tail/tail_multiline.c
new file mode 100644
index 000000000..71c031014
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_multiline.c
@@ -0,0 +1,606 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_kv.h>
+
+#include "tail_config.h"
+#include "tail_multiline.h"
+
+static int tail_mult_append(struct flb_parser *parser,
+ struct flb_tail_config *ctx)
+{
+ struct flb_tail_mult *mp;
+
+ mp = flb_malloc(sizeof(struct flb_tail_mult));
+ if (!mp) {
+ flb_errno();
+ return -1;
+ }
+
+ mp->parser = parser;
+ mk_list_add(&mp->_head, &ctx->mult_parsers);
+
+ return 0;
+}
+
+int flb_tail_mult_create(struct flb_tail_config *ctx,
+ struct flb_input_instance *ins,
+ struct flb_config *config)
+{
+ int ret;
+ const char *tmp;
+ struct mk_list *head;
+ struct flb_parser *parser;
+ struct flb_kv *kv;
+
+ if (ctx->multiline_flush <= 0) {
+ ctx->multiline_flush = 1;
+ }
+
+ mk_list_init(&ctx->mult_parsers);
+
+ /* Get firstline parser */
+ tmp = flb_input_get_property("parser_firstline", ins);
+ if (!tmp) {
+ flb_plg_error(ctx->ins, "multiline: no parser defined for firstline");
+ return -1;
+ }
+ parser = flb_parser_get(tmp, config);
+ if (!parser) {
+ flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", tmp);
+ return -1;
+ }
+
+ ctx->mult_parser_firstline = parser;
+
+ /* Read all multiline rules */
+ mk_list_foreach(head, &ins->properties) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+ if (strcasecmp("parser_firstline", kv->key) == 0) {
+ continue;
+ }
+
+ if (strncasecmp("parser_", kv->key, 7) == 0) {
+ parser = flb_parser_get(kv->val, config);
+ if (!parser) {
+ flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", kv->val);
+ return -1;
+ }
+
+ ret = tail_mult_append(parser, ctx);
+ if (ret == -1) {
+ return -1;
+ }
+ }
+ }
+
+ return 0;
+}
+
+int flb_tail_mult_destroy(struct flb_tail_config *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_tail_mult *mp;
+
+ if (ctx->multiline == FLB_FALSE) {
+ return 0;
+ }
+
+ mk_list_foreach_safe(head, tmp, &ctx->mult_parsers) {
+ mp = mk_list_entry(head, struct flb_tail_mult, _head);
+ mk_list_del(&mp->_head);
+ flb_free(mp);
+ }
+
+ return 0;
+}
+
+/* Process the result of a firstline match */
+int flb_tail_mult_process_first(time_t now,
+ char *buf, size_t size,
+ struct flb_time *out_time,
+ struct flb_tail_file *file,
+ struct flb_tail_config *ctx)
+{
+ int ret;
+ size_t off;
+ msgpack_object map;
+ msgpack_unpacked result;
+
+ /* If a previous multiline context already exists, flush first */
+ if (file->mult_firstline && !file->mult_skipping) {
+ flb_tail_mult_flush(file, ctx);
+ }
+
+ /* Remark as first multiline message */
+ file->mult_firstline = FLB_TRUE;
+
+ /* Validate obtained time, if not set, set the current time */
+ if (flb_time_to_nanosec(out_time) == 0L) {
+ flb_time_get(out_time);
+ }
+
+ /* Should we skip this multiline record ? */
+ if (ctx->ignore_older > 0) {
+ if ((now - ctx->ignore_older) > out_time->tm.tv_sec) {
+ flb_free(buf);
+ file->mult_skipping = FLB_TRUE;
+ file->mult_firstline = FLB_TRUE;
+
+ /* we expect more data to skip */
+ return FLB_TAIL_MULT_MORE;
+ }
+ }
+
+ /* Re-initiate buffers */
+ msgpack_sbuffer_init(&file->mult_sbuf);
+ msgpack_packer_init(&file->mult_pck, &file->mult_sbuf, msgpack_sbuffer_write);
+
+ /*
+ * flb_parser_do() always return a msgpack buffer, so we tweak our
+ * local msgpack reference to avoid an extra allocation. The only
+ * concern is that we don't know what's the real size of the memory
+ * allocated, so we assume it's just 'out_size'.
+ */
+ file->mult_flush_timeout = now + (ctx->multiline_flush - 1);
+ file->mult_sbuf.data = buf;
+ file->mult_sbuf.size = size;
+ file->mult_sbuf.alloc = size;
+
+ /* Set multiline status */
+ file->mult_firstline = FLB_TRUE;
+ file->mult_skipping = FLB_FALSE;
+ flb_time_copy(&file->mult_time, out_time);
+
+ off = 0;
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, buf, size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ msgpack_sbuffer_destroy(&file->mult_sbuf);
+ msgpack_unpacked_destroy(&result);
+ return FLB_TAIL_MULT_NA;
+ }
+
+ map = result.data;
+ file->mult_keys = map.via.map.size;
+ msgpack_unpacked_destroy(&result);
+
+ /* We expect more data */
+ return FLB_TAIL_MULT_MORE;
+}
+
+/* Append a raw log entry to the last structured field in the mult buffer */
+static inline void flb_tail_mult_append_raw(char *buf, int size,
+ struct flb_tail_file *file,
+ struct flb_tail_config *config)
+{
+ /* Append the raw string */
+ msgpack_pack_str(&file->mult_pck, size);
+ msgpack_pack_str_body(&file->mult_pck, buf, size);
+}
+
+/* Check if the last key value type of a map is string or not */
+static inline int is_last_key_val_string(char *buf, size_t size)
+{
+ int ret = FLB_FALSE;
+ size_t off;
+ msgpack_unpacked result;
+ msgpack_object v;
+ msgpack_object root;
+
+ off = 0;
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, buf, size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ return ret;
+ }
+
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_MAP) {
+ ret = FLB_FALSE;
+ }
+ else {
+ if (root.via.map.size == 0) {
+ ret = FLB_FALSE;
+ }
+ else {
+ v = root.via.map.ptr[root.via.map.size - 1].val;
+ if (v.type == MSGPACK_OBJECT_STR) {
+ ret = FLB_TRUE;
+ }
+ }
+ }
+
+ msgpack_unpacked_destroy(&result);
+ return ret;
+}
+
+int flb_tail_mult_process_content(time_t now,
+ char *buf, size_t len,
+ struct flb_tail_file *file,
+ struct flb_tail_config *ctx,
+ size_t processed_bytes)
+{
+ int ret;
+ size_t off;
+ void *out_buf;
+ size_t out_size = 0;
+ struct mk_list *head;
+ struct flb_tail_mult *mult_parser = NULL;
+ struct flb_time out_time = {0};
+ msgpack_object map;
+ msgpack_unpacked result;
+
+ /* Always check if this line is the beginning of a new multiline message */
+ ret = flb_parser_do(ctx->mult_parser_firstline,
+ buf, len,
+ &out_buf, &out_size, &out_time);
+ if (ret >= 0) {
+ /*
+ * The content is a candidate for a firstline, but we need to perform
+ * the extra-mandatory check where the last key value type must be
+ * a string, otherwise no string concatenation with continuation lines
+ * will be possible.
+ */
+ ret = is_last_key_val_string(out_buf, out_size);
+ if (ret == FLB_TRUE)
+ file->mult_firstline_append = FLB_TRUE;
+ else
+ file->mult_firstline_append = FLB_FALSE;
+
+ flb_tail_mult_process_first(now, out_buf, out_size, &out_time,
+ file, ctx);
+ return FLB_TAIL_MULT_MORE;
+ }
+
+ if (file->mult_skipping == FLB_TRUE) {
+ return FLB_TAIL_MULT_MORE;
+ }
+
+ /*
+ * Once here means we have some data that is a continuation, iterate
+ * parsers trying to find a match
+ */
+ out_buf = NULL;
+ mk_list_foreach(head, &ctx->mult_parsers) {
+ mult_parser = mk_list_entry(head, struct flb_tail_mult, _head);
+
+ /* Process line text with current parser */
+ out_buf = NULL;
+ out_size = 0;
+ ret = flb_parser_do(mult_parser->parser,
+ buf, len,
+ &out_buf, &out_size, &out_time);
+ if (ret < 0) {
+ mult_parser = NULL;
+ continue;
+ }
+
+ /* The line was processed, break the loop and buffer the data */
+ break;
+ }
+
+ if (!mult_parser) {
+ /*
+ * If no parser was found means the string log must be appended
+ * to the last structured field.
+ */
+ if (file->mult_firstline && file->mult_firstline_append) {
+ flb_tail_mult_append_raw(buf, len, file, ctx);
+ }
+ else {
+ flb_tail_file_pack_line(NULL, buf, len, file, processed_bytes);
+ }
+
+ return FLB_TAIL_MULT_MORE;
+ }
+
+ off = 0;
+ msgpack_unpacked_init(&result);
+ msgpack_unpack_next(&result, out_buf, out_size, &off);
+ map = result.data;
+
+ /* Append new map to our local msgpack buffer */
+ file->mult_keys += map.via.map.size;
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_write(&file->mult_sbuf, out_buf, out_size);
+ flb_free(out_buf);
+
+ return FLB_TAIL_MULT_MORE;
+}
+
+static int flb_tail_mult_pack_line_body(
+ struct flb_log_event_encoder *context,
+ struct flb_tail_file *file)
+{
+ size_t adjacent_object_offset;
+ size_t continuation_length;
+ msgpack_unpacked adjacent_object;
+ msgpack_unpacked current_object;
+ size_t entry_index;
+ msgpack_object entry_value;
+ msgpack_object entry_key;
+ msgpack_object_map *data_map;
+ int map_size;
+ size_t offset;
+ struct flb_tail_config *config;
+ int result;
+
+ result = FLB_EVENT_ENCODER_SUCCESS;
+ config = (struct flb_tail_config *) file->config;
+
+ /* New Map size */
+ map_size = file->mult_keys;
+
+ if (file->config->path_key != NULL) {
+ map_size++;
+
+ result = flb_log_event_encoder_append_body_values(
+ context,
+ FLB_LOG_EVENT_CSTRING_VALUE(config->path_key),
+ FLB_LOG_EVENT_CSTRING_VALUE(file->name));
+ }
+
+
+ msgpack_unpacked_init(&current_object);
+ msgpack_unpacked_init(&adjacent_object);
+
+ offset = 0;
+
+ while (result == FLB_EVENT_ENCODER_SUCCESS &&
+ msgpack_unpack_next(&current_object,
+ file->mult_sbuf.data,
+ file->mult_sbuf.size,
+ &offset) == MSGPACK_UNPACK_SUCCESS) {
+ if (current_object.data.type != MSGPACK_OBJECT_MAP) {
+ continue;
+ }
+
+ data_map = &current_object.data.via.map;
+
+ continuation_length = 0;
+
+ for (entry_index = 0; entry_index < data_map->size; entry_index++) {
+ entry_key = data_map->ptr[entry_index].key;
+ entry_value = data_map->ptr[entry_index].val;
+
+ result = flb_log_event_encoder_append_body_msgpack_object(context,
+ &entry_key);
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ break;
+ }
+
+ /* Check if this is the last entry in the map and if that is
+ * the case then add the lengths of all the trailing string
+ * objects after the map in order to append them to the value
+ * but only if the value object is a string
+ */
+ if (entry_index + 1 == data_map->size &&
+ entry_value.type == MSGPACK_OBJECT_STR) {
+ adjacent_object_offset = offset;
+
+ while (msgpack_unpack_next(
+ &adjacent_object,
+ file->mult_sbuf.data,
+ file->mult_sbuf.size,
+ &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) {
+ if (adjacent_object.data.type != MSGPACK_OBJECT_STR) {
+ break;
+ }
+
+ /* Sum total bytes to append */
+ continuation_length += adjacent_object.data.via.str.size + 1;
+ }
+
+ result = flb_log_event_encoder_append_body_string_length(
+ context,
+ entry_value.via.str.size +
+ continuation_length);
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ break;
+ }
+
+ result = flb_log_event_encoder_append_body_string_body(
+ context,
+ (char *) entry_value.via.str.ptr,
+ entry_value.via.str.size);
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ break;
+ }
+
+ if (continuation_length > 0) {
+ adjacent_object_offset = offset;
+
+ while (msgpack_unpack_next(
+ &adjacent_object,
+ file->mult_sbuf.data,
+ file->mult_sbuf.size,
+ &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) {
+ if (adjacent_object.data.type != MSGPACK_OBJECT_STR) {
+ break;
+ }
+
+ result = flb_log_event_encoder_append_body_string_body(
+ context,
+ "\n",
+ 1);
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ break;
+ }
+
+ result = flb_log_event_encoder_append_body_string_body(
+ context,
+ (char *) adjacent_object.data.via.str.ptr,
+ adjacent_object.data.via.str.size);
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ break;
+ }
+ }
+ }
+ }
+ else {
+ result = flb_log_event_encoder_append_body_msgpack_object(context,
+ &entry_value);
+ }
+ }
+ }
+
+ msgpack_unpacked_destroy(&current_object);
+ msgpack_unpacked_destroy(&adjacent_object);
+
+ /* Reset status */
+ file->mult_firstline = FLB_FALSE;
+ file->mult_skipping = FLB_FALSE;
+ file->mult_keys = 0;
+ file->mult_flush_timeout = 0;
+
+ msgpack_sbuffer_destroy(&file->mult_sbuf);
+
+ file->mult_sbuf.data = NULL;
+
+ flb_time_zero(&file->mult_time);
+
+ return result;
+}
+
+/* Flush any multiline context data into outgoing buffers */
+int flb_tail_mult_flush(struct flb_tail_file *file, struct flb_tail_config *ctx)
+{
+ int result;
+
+ /* nothing to flush */
+ if (file->mult_firstline == FLB_FALSE) {
+ return -1;
+ }
+
+ if (file->mult_keys == 0) {
+ return -1;
+ }
+
+ result = flb_log_event_encoder_begin_record(file->ml_log_event_encoder);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_timestamp(
+ file->ml_log_event_encoder, &file->mult_time);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_tail_mult_pack_line_body(
+ file->ml_log_event_encoder,
+ file);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_record(
+ file->ml_log_event_encoder);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ flb_input_log_append(ctx->ins,
+ file->tag_buf,
+ file->tag_len,
+ file->ml_log_event_encoder->output_buffer,
+ file->ml_log_event_encoder->output_length);
+ result = 0;
+ }
+ else {
+ flb_plg_error(file->config->ins, "error packing event : %d", result);
+
+ result = -1;
+ }
+
+ flb_log_event_encoder_reset(file->ml_log_event_encoder);
+
+ return result;
+}
+
+static void file_pending_flush(struct flb_tail_config *ctx,
+ struct flb_tail_file *file, time_t now)
+{
+ if (file->mult_flush_timeout > now) {
+ return;
+ }
+
+ if (file->mult_firstline == FLB_FALSE) {
+ if (file->mult_sbuf.data == NULL || file->mult_sbuf.size <= 0) {
+ return;
+ }
+ }
+
+ flb_tail_mult_flush(file, ctx);
+}
+
+int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx)
+{
+ time_t expired;
+ struct mk_list *head;
+ struct flb_tail_file *file;
+
+ expired = time(NULL) + 3600;
+
+ /* Iterate promoted event files with pending bytes */
+ mk_list_foreach(head, &ctx->files_static) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ file_pending_flush(ctx, file, expired);
+ }
+
+ /* Iterate promoted event files with pending bytes */
+ mk_list_foreach(head, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ file_pending_flush(ctx, file, expired);
+ }
+
+ return 0;
+}
+
+int flb_tail_mult_pending_flush(struct flb_input_instance *ins,
+ struct flb_config *config, void *context)
+{
+ time_t now;
+ struct mk_list *head;
+ struct flb_tail_file *file;
+ struct flb_tail_config *ctx = context;
+
+ now = time(NULL);
+
+ /* Iterate promoted event files with pending bytes */
+ mk_list_foreach(head, &ctx->files_static) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+
+ file_pending_flush(ctx, file, now);
+ }
+
+ /* Iterate promoted event files with pending bytes */
+ mk_list_foreach(head, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+
+ file_pending_flush(ctx, file, now);
+ }
+
+ return 0;
+}
diff --git a/src/fluent-bit/plugins/in_tail/tail_multiline.h b/src/fluent-bit/plugins/in_tail/tail_multiline.h
new file mode 100644
index 000000000..d7f7539b1
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_multiline.h
@@ -0,0 +1,57 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_TAIL_MULT_H
+#define FLB_TAIL_TAIL_MULT_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+
+#include "tail_config.h"
+#include "tail_file.h"
+
+#define FLB_TAIL_MULT_NA -1 /* not applicable as a multiline stream */
+#define FLB_TAIL_MULT_DONE 0 /* finished a multiline stream */
+#define FLB_TAIL_MULT_MORE 1 /* expect more lines to come */
+#define FLB_TAIL_MULT_FLUSH "4" /* max flush time for multiline: 4 seconds */
+
+struct flb_tail_mult {
+ struct flb_parser *parser;
+ struct mk_list _head;
+};
+
+int flb_tail_mult_create(struct flb_tail_config *ctx,
+ struct flb_input_instance *ins,
+ struct flb_config *config);
+
+int flb_tail_mult_destroy(struct flb_tail_config *ctx);
+
+int flb_tail_mult_process_content(time_t now,
+ char *buf, size_t len,
+ struct flb_tail_file *file,
+ struct flb_tail_config *ctx,
+ size_t processed_bytes);
+int flb_tail_mult_flush(struct flb_tail_file *file,
+ struct flb_tail_config *ctx);
+
+int flb_tail_mult_pending_flush(struct flb_input_instance *ins,
+ struct flb_config *config, void *context);
+int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx);
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_scan.c b/src/fluent-bit/plugins/in_tail/tail_scan.c
new file mode 100644
index 000000000..ccb8e070a
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_scan.c
@@ -0,0 +1,71 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+#include "tail.h"
+#include "tail_config.h"
+
+/*
+ * Include proper scan backend
+ */
+#ifdef FLB_SYSTEM_WINDOWS
+#include "tail_scan_win32.c"
+#else
+#include "tail_scan_glob.c"
+#endif
+
+int flb_tail_scan(struct mk_list *path_list, struct flb_tail_config *ctx)
+{
+ int ret;
+ struct mk_list *head;
+ struct flb_slist_entry *pattern;
+
+ mk_list_foreach(head, path_list) {
+ pattern = mk_list_entry(head, struct flb_slist_entry, _head);
+ ret = tail_scan_path(pattern->str, ctx);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins, "error scanning path: %s", pattern->str);
+ }
+ else {
+ flb_plg_debug(ctx->ins, "%i new files found on path '%s'",
+ ret, pattern->str);
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Triggered by refresh_interval, it re-scan the path looking for new files
+ * that match the original path pattern.
+ */
+int flb_tail_scan_callback(struct flb_input_instance *ins,
+ struct flb_config *config, void *context)
+{
+ int ret;
+ struct flb_tail_config *ctx = context;
+ (void) config;
+
+ ret = flb_tail_scan(ctx->path_list, ctx);
+ if (ret > 0) {
+ flb_plg_debug(ins, "%i new files found", ret);
+ }
+
+ return ret;
+}
diff --git a/src/fluent-bit/plugins/in_tail/tail_scan.h b/src/fluent-bit/plugins/in_tail/tail_scan.h
new file mode 100644
index 000000000..ec3c96a2a
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_scan.h
@@ -0,0 +1,29 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_SCAN_H
+#define FLB_TAIL_SCAN_H
+
+#include "tail_config.h"
+
+int flb_tail_scan(struct mk_list *path, struct flb_tail_config *ctx);
+int flb_tail_scan_callback(struct flb_input_instance *ins,
+ struct flb_config *config, void *context);
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_scan_glob.c b/src/fluent-bit/plugins/in_tail/tail_scan_glob.c
new file mode 100644
index 000000000..b330b7c3b
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_scan_glob.c
@@ -0,0 +1,278 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <glob.h>
+#include <fnmatch.h>
+
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_utils.h>
+
+#include "tail.h"
+#include "tail_file.h"
+#include "tail_signal.h"
+#include "tail_scan.h"
+#include "tail_config.h"
+
+/* Define missing GLOB_TILDE if not exists */
+#ifndef GLOB_TILDE
+#define GLOB_TILDE 1<<2 /* use GNU Libc value */
+#define UNSUP_TILDE 1
+
+/* we need these extra headers for path resolution */
+#include <limits.h>
+#include <sys/types.h>
+#include <pwd.h>
+
+static char *expand_tilde(const char *path)
+{
+ int len;
+ char user[256];
+ char *p = NULL;
+ char *dir = NULL;
+ char *tmp = NULL;
+ struct passwd *uinfo = NULL;
+
+ if (path[0] == '~') {
+ p = strchr(path, '/');
+
+ if (p) {
+ /* check case '~/' */
+ if ((p - path) == 1) {
+ dir = getenv("HOME");
+ if (!dir) {
+ return path;
+ }
+ }
+ else {
+ /*
+ * it refers to a different user: ~user/abc, first step grab
+ * the user name.
+ */
+ len = (p - path) - 1;
+ memcpy(user, path + 1, len);
+ user[len] = '\0';
+
+ /* use getpwnam() to resolve user information */
+ uinfo = getpwnam(user);
+ if (!uinfo) {
+ return path;
+ }
+
+ dir = uinfo->pw_dir;
+ }
+ }
+ else {
+ dir = getenv("HOME");
+ if (!dir) {
+ return path;
+ }
+ }
+
+ if (p) {
+ tmp = flb_malloc(PATH_MAX);
+ if (!tmp) {
+ flb_errno();
+ return NULL;
+ }
+ snprintf(tmp, PATH_MAX - 1, "%s%s", dir, p);
+ }
+ else {
+ dir = getenv("HOME");
+ if (!dir) {
+ return path;
+ }
+
+ tmp = flb_strdup(dir);
+ if (!tmp) {
+ return path;
+ }
+ }
+
+ return tmp;
+ }
+
+ return path;
+}
+#endif
+
+static int tail_is_excluded(char *path, struct flb_tail_config *ctx)
+{
+ struct mk_list *head;
+ struct flb_slist_entry *pattern;
+
+ if (!ctx->exclude_list) {
+ return FLB_FALSE;
+ }
+
+ mk_list_foreach(head, ctx->exclude_list) {
+ pattern = mk_list_entry(head, struct flb_slist_entry, _head);
+ if (fnmatch(pattern->str, path, 0) == 0) {
+ return FLB_TRUE;
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+static inline int do_glob(const char *pattern, int flags,
+ void *not_used, glob_t *pglob)
+{
+ int ret;
+ int new_flags;
+ char *tmp = NULL;
+ int tmp_needs_free = FLB_FALSE;
+ (void) not_used;
+
+ /* Save current values */
+ new_flags = flags;
+
+ if (flags & GLOB_TILDE) {
+#ifdef UNSUP_TILDE
+ /*
+ * Some libc libraries like Musl do not support GLOB_TILDE for tilde
+ * expansion. A workaround is to use wordexp(3) but looking at it
+ * implementation in Musl it looks quite expensive:
+ *
+ * http://git.musl-libc.org/cgit/musl/tree/src/misc/wordexp.c
+ *
+ * the workaround is to do our own tilde expansion in a temporary buffer.
+ */
+
+ /* Look for a tilde */
+ tmp = expand_tilde(pattern);
+ if (tmp != pattern) {
+ /* the path was expanded */
+ pattern = tmp;
+ tmp_needs_free = FLB_TRUE;
+ }
+
+ /* remove unused flag */
+ new_flags &= ~GLOB_TILDE;
+#endif
+ }
+
+ /* invoke glob with new parameters */
+ ret = glob(pattern, new_flags, NULL, pglob);
+
+ /* remove temporary buffer, if allocated by expand_tilde above.
+ * Note that this buffer is only used for libc implementations
+ * that do not support the GLOB_TILDE flag, like musl. */
+ if ((tmp != NULL) && (tmp_needs_free == FLB_TRUE)) {
+ flb_free(tmp);
+ }
+
+ return ret;
+}
+
+/* Scan a path, register the entries and return how many */
+static int tail_scan_path(const char *path, struct flb_tail_config *ctx)
+{
+ int i;
+ int ret;
+ int count = 0;
+ glob_t globbuf;
+ time_t now;
+ int64_t mtime;
+ struct stat st;
+
+ flb_plg_debug(ctx->ins, "scanning path %s", path);
+
+ /* Safe reset for globfree() */
+ globbuf.gl_pathv = NULL;
+
+ /* Scan the given path */
+ ret = do_glob(path, GLOB_TILDE | GLOB_ERR, NULL, &globbuf);
+ if (ret != 0) {
+ switch (ret) {
+ case GLOB_NOSPACE:
+ flb_plg_error(ctx->ins, "no memory space available");
+ return -1;
+ case GLOB_ABORTED:
+ flb_plg_error(ctx->ins, "read error, check permissions: %s", path);
+ return -1;
+ case GLOB_NOMATCH:
+ ret = stat(path, &st);
+ if (ret == -1) {
+ flb_plg_debug(ctx->ins, "cannot read info from: %s", path);
+ }
+ else {
+ ret = access(path, R_OK);
+ if (ret == -1 && errno == EACCES) {
+ flb_plg_error(ctx->ins, "NO read access for path: %s", path);
+ }
+ else {
+ flb_plg_debug(ctx->ins, "NO matches for path: %s", path);
+ }
+ }
+ return 0;
+ }
+ }
+
+
+ /* For every entry found, generate an output list */
+ now = time(NULL);
+ for (i = 0; i < globbuf.gl_pathc; i++) {
+ ret = stat(globbuf.gl_pathv[i], &st);
+ if (ret == 0 && S_ISREG(st.st_mode)) {
+ /* Check if this file is blacklisted */
+ if (tail_is_excluded(globbuf.gl_pathv[i], ctx) == FLB_TRUE) {
+ flb_plg_debug(ctx->ins, "excluded=%s", globbuf.gl_pathv[i]);
+ continue;
+ }
+
+ if (ctx->ignore_older > 0) {
+ mtime = flb_tail_stat_mtime(&st);
+ if (mtime > 0) {
+ if ((now - ctx->ignore_older) > mtime) {
+ flb_plg_debug(ctx->ins, "excluded=%s (ignore_older)",
+ globbuf.gl_pathv[i]);
+ continue;
+ }
+ }
+ }
+
+ /* Append file to list */
+ ret = flb_tail_file_append(globbuf.gl_pathv[i], &st,
+ FLB_TAIL_STATIC, ctx);
+ if (ret == 0) {
+ flb_plg_debug(ctx->ins, "scan_glob add(): %s, inode %li",
+ globbuf.gl_pathv[i], st.st_ino);
+ count++;
+ }
+ else {
+ flb_plg_debug(ctx->ins, "scan_blog add(): dismissed: %s, inode %li",
+ globbuf.gl_pathv[i], st.st_ino);
+ }
+ }
+ else {
+ flb_plg_debug(ctx->ins, "skip (invalid) entry=%s",
+ globbuf.gl_pathv[i]);
+ }
+ }
+
+ if (count > 0) {
+ tail_signal_manager(ctx);
+ }
+
+ globfree(&globbuf);
+ return count;
+}
diff --git a/src/fluent-bit/plugins/in_tail/tail_scan_win32.c b/src/fluent-bit/plugins/in_tail/tail_scan_win32.c
new file mode 100644
index 000000000..94733f065
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_scan_win32.c
@@ -0,0 +1,245 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This file implements glob-like patch matching feature for Windows
+ * based on Win32 API.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_utils.h>
+
+#include <shlwapi.h>
+
+#include "tail.h"
+#include "tail_file.h"
+#include "tail_signal.h"
+#include "tail_config.h"
+
+#include "win32.h"
+
+static int tail_is_excluded(char *path, struct flb_tail_config *ctx)
+{
+ struct mk_list *head;
+ struct flb_slist_entry *pattern;
+
+ if (!ctx->exclude_list) {
+ return FLB_FALSE;
+ }
+
+ mk_list_foreach(head, ctx->exclude_list) {
+ pattern = mk_list_entry(head, struct flb_slist_entry, _head);
+ if (PathMatchSpecA(path, pattern->str)) {
+ return FLB_TRUE;
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+/*
+ * This function is a thin wrapper over flb_tail_file_append(),
+ * adding normalization and sanity checks on top of it.
+ */
+static int tail_register_file(const char *target, struct flb_tail_config *ctx,
+ time_t ts)
+{
+ int64_t mtime;
+ struct stat st;
+ char path[MAX_PATH];
+
+ if (_fullpath(path, target, MAX_PATH) == NULL) {
+ flb_plg_error(ctx->ins, "cannot get absolute path of %s", target);
+ return -1;
+ }
+
+ if (stat(path, &st) != 0 || !S_ISREG(st.st_mode)) {
+ return -1;
+ }
+
+ if (ctx->ignore_older > 0) {
+ mtime = flb_tail_stat_mtime(&st);
+ if (mtime > 0) {
+ if ((ts - ctx->ignore_older) > mtime) {
+ flb_plg_debug(ctx->ins, "excluded=%s (ignore_older)",
+ target);
+ return -1;
+ }
+ }
+ }
+
+ if (tail_is_excluded(path, ctx) == FLB_TRUE) {
+ flb_plg_trace(ctx->ins, "skip '%s' (excluded)", path);
+ return -1;
+ }
+
+ return flb_tail_file_append(path, &st, FLB_TAIL_STATIC, ctx);
+}
+
+/*
+ * Perform patern match on the given path string. This function
+ * supports patterns with "nested" wildcards like below.
+ *
+ * tail_scan_pattern("C:\fluent-bit\*\*.txt", ctx);
+ *
+ * On success, the number of files found is returned (zero indicates
+ * "no file found"). On error, -1 is returned.
+ */
+static int tail_scan_pattern(const char *path, struct flb_tail_config *ctx)
+{
+ char *star, *p0, *p1;
+ char pattern[MAX_PATH];
+ char buf[MAX_PATH];
+ int ret;
+ int n_added = 0;
+ time_t now;
+ int64_t mtime;
+ HANDLE h;
+ WIN32_FIND_DATA data;
+
+ if (strlen(path) > MAX_PATH - 1) {
+ flb_plg_error(ctx->ins, "path too long '%s'");
+ return -1;
+ }
+
+ star = strchr(path, '*');
+ if (star == NULL) {
+ return -1;
+ }
+
+ /*
+ * C:\data\tmp\input_*.conf
+ * 0<-----|
+ */
+ p0 = star;
+ while (path <= p0 && *p0 != '\\') {
+ p0--;
+ }
+
+ /*
+ * C:\data\tmp\input_*.conf
+ * |---->1
+ */
+ p1 = star;
+ while (*p1 && *p1 != '\\') {
+ p1++;
+ }
+
+ memcpy(pattern, path, (p1 - path));
+ pattern[p1 - path] = '\0';
+
+ h = FindFirstFileA(pattern, &data);
+ if (h == INVALID_HANDLE_VALUE) {
+ return 0; /* none matched */
+ }
+
+ now = time(NULL);
+ do {
+ /* Ignore the current and parent dirs */
+ if (!strcmp(".", data.cFileName) || !strcmp("..", data.cFileName)) {
+ continue;
+ }
+
+ /* Avoid an infinite loop */
+ if (strchr(data.cFileName, '*')) {
+ continue;
+ }
+
+ /* Create a path (prefix + filename + suffix) */
+ memcpy(buf, path, p0 - path + 1);
+ buf[p0 - path + 1] = '\0';
+
+ if (strlen(buf) + strlen(data.cFileName) + strlen(p1) > MAX_PATH - 1) {
+ flb_plg_warn(ctx->ins, "'%s%s%s' is too long", buf, data.cFileName, p1);
+ continue;
+ }
+ strcat(buf, data.cFileName);
+ strcat(buf, p1);
+
+ if (strchr(p1, '*')) {
+ ret = tail_scan_pattern(buf, ctx); /* recursive */
+ if (ret >= 0) {
+ n_added += ret;
+ }
+ continue;
+ }
+
+ /* Try to register the target file */
+ ret = tail_register_file(buf, ctx, now);
+ if (ret == 0) {
+ n_added++;
+ }
+ } while (FindNextFileA(h, &data) != 0);
+
+ FindClose(h);
+ return n_added;
+}
+
+static int tail_filepath(char *buf, int len, const char *basedir, const char *filename)
+{
+ char drive[_MAX_DRIVE];
+ char dir[_MAX_DIR];
+ char fname[_MAX_FNAME];
+ char ext[_MAX_EXT];
+ char tmp[MAX_PATH];
+ int ret;
+
+ ret = _splitpath_s(basedir, drive, _MAX_DRIVE, dir, _MAX_DIR, NULL, 0, NULL, 0);
+ if (ret) {
+ return -1;
+ }
+
+ ret = _splitpath_s(filename, NULL, 0, NULL, 0, fname, _MAX_FNAME, ext, _MAX_EXT);
+ if (ret) {
+ return -1;
+ }
+
+ ret = _makepath_s(tmp, MAX_PATH, drive, dir, fname, ext);
+ if (ret) {
+ return -1;
+ }
+
+ if (_fullpath(buf, tmp, len) == NULL) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static int tail_scan_path(const char *path, struct flb_tail_config *ctx)
+{
+ int ret;
+ int n_added = 0;
+ time_t now;
+
+ if (strchr(path, '*')) {
+ return tail_scan_pattern(path, ctx);
+ }
+
+ /* No wildcard involved. Let's just handle the file... */
+ now = time(NULL);
+ ret = tail_register_file(path, ctx, now);
+ if (ret == 0) {
+ n_added++;
+ }
+
+ return n_added;
+}
diff --git a/src/fluent-bit/plugins/in_tail/tail_signal.h b/src/fluent-bit/plugins/in_tail/tail_signal.h
new file mode 100644
index 000000000..1a81fec64
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_signal.h
@@ -0,0 +1,98 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_SIGNAL_H
+#define FLB_TAIL_SIGNAL_H
+
+#include "tail_config.h"
+
+static inline int tail_signal_manager(struct flb_tail_config *ctx)
+{
+ int n;
+ uint64_t val = 0xc001;
+
+ /*
+ * The number of signal reads might be less than the written signals, this
+ * means that some event is still pending in the queue. On that case we
+ * don't need to signal it again.
+ */
+ if (ctx->ch_reads < ctx->ch_writes) {
+ return 1;
+ }
+
+ /* Reset counters: prevent an overflow, unlikely..but let's keep safe */
+ if (ctx->ch_reads == ctx->ch_writes) {
+ ctx->ch_reads = 0;
+ ctx->ch_writes = 0;
+ }
+
+ /* Insert a dummy event into the channel manager */
+ n = flb_pipe_w(ctx->ch_manager[1], (const char *) &val, sizeof(val));
+ if (n == -1) {
+ flb_errno();
+ return -1;
+ }
+ else {
+ ctx->ch_writes++;
+ }
+
+ return n;
+}
+
+static inline int tail_signal_pending(struct flb_tail_config *ctx)
+{
+ int n;
+ uint64_t val = 0xc002;
+
+ /* Insert a dummy event into the 'pending' channel */
+ n = flb_pipe_w(ctx->ch_pending[1], (const char *) &val, sizeof(val));
+
+ /*
+ * If we get EAGAIN, it simply means pending channel is full. As
+ * notification is already pending, it's safe to ignore.
+ */
+ if (n == -1 && !FLB_PIPE_WOULDBLOCK()) {
+ flb_errno();
+ return -1;
+ }
+
+ return n;
+}
+
+static inline int tail_consume_pending(struct flb_tail_config *ctx)
+{
+ int ret;
+ uint64_t val;
+
+ /*
+ * We need to consume the pending bytes. Loop until we would have
+ * blocked (pipe is empty).
+ */
+ do {
+ ret = flb_pipe_r(ctx->ch_pending[0], (char *) &val, sizeof(val));
+ if (ret <= 0 && !FLB_PIPE_WOULDBLOCK()) {
+ flb_errno();
+ return -1;
+ }
+ } while (!FLB_PIPE_WOULDBLOCK());
+
+ return 0;
+}
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/tail_sql.h b/src/fluent-bit/plugins/in_tail/tail_sql.h
new file mode 100644
index 000000000..855933a01
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/tail_sql.h
@@ -0,0 +1,65 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_SQL_H
+#define FLB_TAIL_SQL_H
+
+/*
+ * In Fluent Bit we try to have a common convention for table names,
+ * if the table belong to an input/output plugin, use plugin name
+ * plus to what it's about, e.g:
+ *
+ * in_tail plugin table to track files: in_tail_files
+ */
+#define SQL_CREATE_FILES \
+ "CREATE TABLE IF NOT EXISTS in_tail_files (" \
+ " id INTEGER PRIMARY KEY," \
+ " name TEXT NOT NULL," \
+ " offset INTEGER," \
+ " inode INTEGER," \
+ " created INTEGER," \
+ " rotated INTEGER DEFAULT 0" \
+ ");"
+
+#define SQL_GET_FILE \
+ "SELECT * from in_tail_files WHERE inode=@inode order by id desc;"
+
+#define SQL_INSERT_FILE \
+ "INSERT INTO in_tail_files (name, offset, inode, created)" \
+ " VALUES (@name, @offset, @inode, @created);"
+
+#define SQL_ROTATE_FILE \
+ "UPDATE in_tail_files set name=@name,rotated=1 WHERE id=@id;"
+
+#define SQL_UPDATE_OFFSET \
+ "UPDATE in_tail_files set offset=@offset WHERE id=@id;"
+
+#define SQL_DELETE_FILE \
+ "DELETE FROM in_tail_files WHERE id=@id;"
+
+#define SQL_PRAGMA_SYNC \
+ "PRAGMA synchronous=%i;"
+
+#define SQL_PRAGMA_JOURNAL_MODE \
+ "PRAGMA journal_mode=%s;"
+
+#define SQL_PRAGMA_LOCKING_MODE \
+ "PRAGMA locking_mode=EXCLUSIVE;"
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/win32.h b/src/fluent-bit/plugins/in_tail/win32.h
new file mode 100644
index 000000000..a9414f892
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/win32.h
@@ -0,0 +1,67 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This is the interface file that replaces POSIX functions
+ * with our own custom implementation.
+ */
+
+#ifndef FLB_TAIL_WIN32_H
+#define FLB_TAIL_WIN32_H
+
+#include "win32/interface.h"
+
+#undef open
+#undef stat
+#undef lstat
+#undef fstat
+#undef lseek
+
+#undef S_IFDIR
+#undef S_IFCHR
+#undef S_IFIFO
+#undef S_IFREG
+#undef S_IFLNK
+#undef S_IFMT
+#undef S_ISDIR
+#undef S_ISCHR
+#undef S_ISFIFO
+#undef S_ISREG
+#undef S_ISLNK
+
+#define open win32_open
+#define stat win32_stat
+#define lstat win32_lstat
+#define fstat win32_fstat
+
+#define lseek _lseeki64
+
+#define S_IFDIR WIN32_S_IFDIR
+#define S_IFCHR WIN32_S_IFCHR
+#define S_IFIFO WIN32_S_IFIFO
+#define S_IFREG WIN32_S_IFREG
+#define S_IFLNK WIN32_S_IFLNK
+#define S_IFMT WIN32_S_IFMT
+
+#define S_ISDIR(m) (((m) & S_IFMT) == S_IFDIR)
+#define S_ISCHR(m) (((m) & S_IFMT) == S_IFCHR)
+#define S_ISIFO(m) (((m) & S_IFMT) == S_IFIFO)
+#define S_ISREG(m) (((m) & S_IFMT) == S_IFREG)
+#define S_ISLNK(m) (((m) & S_IFMT) == S_IFLNK)
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/win32/interface.h b/src/fluent-bit/plugins/in_tail/win32/interface.h
new file mode 100644
index 000000000..73b2ef233
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/win32/interface.h
@@ -0,0 +1,44 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TAIL_WIN32_INTERFACE_H
+#define FLB_TAIL_WIN32_INTERFACE_H
+
+struct win32_stat {
+ uint64_t st_ino;
+ uint16_t st_mode;
+ int64_t st_mtime;
+ int16_t st_nlink;
+ int64_t st_size;
+};
+
+int win32_stat(const char *path, struct win32_stat *wst);
+int win32_lstat(const char *path, struct win32_stat *wst);
+int win32_fstat(int fd, struct win32_stat *wst);
+
+int win32_open(const char *path, int flags);
+
+#define WIN32_S_IFDIR 0x1000
+#define WIN32_S_IFCHR 0x2000
+#define WIN32_S_IFIFO 0x4000
+#define WIN32_S_IFREG 0x8000
+#define WIN32_S_IFLNK 0xc000
+#define WIN32_S_IFMT 0xf000
+
+#endif
diff --git a/src/fluent-bit/plugins/in_tail/win32/io.c b/src/fluent-bit/plugins/in_tail/win32/io.c
new file mode 100644
index 000000000..45928b04a
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/win32/io.c
@@ -0,0 +1,47 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <Windows.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <fcntl.h>
+#include <io.h>
+#include "interface.h"
+
+/*
+ * POSIX IO emulation tailored for in_tail's usage.
+ *
+ * open(2) that does not acquire an exclusive lock.
+ */
+
+int win32_open(const char *path, int flags)
+{
+ HANDLE h;
+ h = CreateFileA(path,
+ GENERIC_READ,
+ FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,
+ NULL, /* lpSecurityAttributes */
+ OPEN_EXISTING, /* dwCreationDisposition */
+ 0, /* dwFlagsAndAttributes */
+ NULL); /* hTemplateFile */
+ if (h == INVALID_HANDLE_VALUE) {
+ return -1;
+ }
+ return _open_osfhandle((intptr_t) h, _O_RDONLY);
+}
diff --git a/src/fluent-bit/plugins/in_tail/win32/stat.c b/src/fluent-bit/plugins/in_tail/win32/stat.c
new file mode 100644
index 000000000..bce802749
--- /dev/null
+++ b/src/fluent-bit/plugins/in_tail/win32/stat.c
@@ -0,0 +1,332 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <Windows.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <io.h>
+#include "interface.h"
+
+/*
+ * NTFS stat(2) emulation tailored for in_tail's usage.
+ *
+ * (1) Support st_ino (inode) for Windows NTFS.
+ * (2) Support NTFS symlinks.
+ * (3) Support large files >= 2GB.
+ *
+ * To use it, include "win32.h" and it will transparently
+ * replace stat(), lstat() and fstat().
+ */
+
+#define UINT64(high, low) ((uint64_t) (high) << 32 | (low))
+#define WINDOWS_TICKS_TO_SECONDS_RATIO 10000000
+#define WINDOWS_EPOCH_TO_UNIX_EPOCH_DELTA 11644473600
+
+/*
+ * FILETIME timestamps are represented in 100-nanosecond intervals,
+ * because of this, that's why we need to divide the number by 10000000
+ * in order to convert it to seconds.
+ *
+ * While UNIX timestamps use January 1, 1970 as epoch Windows FILETIME
+ * timestamps use January 1, 1601. Because of this we need to subtract
+ * 11644473600 seconds to account for it.
+ *
+ * Note: Even though this does not account for leap seconds it should be
+ * accurate enough.
+ */
+
+static uint64_t filetime_to_epoch(FILETIME *ft)
+{
+ ULARGE_INTEGER timestamp;
+
+ if (ft == NULL) {
+ return 0;
+ }
+
+ timestamp.HighPart = ft->dwHighDateTime;
+ timestamp.LowPart = ft->dwLowDateTime;
+
+ timestamp.QuadPart /= WINDOWS_TICKS_TO_SECONDS_RATIO;
+ timestamp.QuadPart -= WINDOWS_EPOCH_TO_UNIX_EPOCH_DELTA;
+
+ return timestamp.QuadPart;
+}
+
+static void reset_errno()
+{
+ errno = 0;
+}
+
+static void propagate_last_error_to_errno()
+{
+ DWORD error_code;
+
+ error_code = GetLastError();
+
+ switch (error_code) {
+ case ERROR_INVALID_TARGET_HANDLE:
+ case ERROR_INVALID_HANDLE:
+ errno = EBADF;
+ break;
+
+ case ERROR_TOO_MANY_OPEN_FILES:
+ errno = EMFILE;
+ break;
+
+ case ERROR_INVALID_FLAG_NUMBER:
+ case ERROR_INVALID_PARAMETER:
+ errno = EINVAL;
+ break;
+
+ case ERROR_NOT_ENOUGH_MEMORY:
+ case ERROR_OUTOFMEMORY:
+ errno = ENOMEM;
+ break;
+
+ case ERROR_SHARING_VIOLATION:
+ case ERROR_LOCK_VIOLATION:
+ case ERROR_PATH_BUSY:
+ case ERROR_BUSY:
+ errno = EBUSY;
+ break;
+
+ case ERROR_HANDLE_DISK_FULL:
+ case ERROR_DISK_FULL:
+ errno = ENOSPC;
+ break;
+
+ case ERROR_INVALID_ADDRESS:
+ errno = EFAULT;
+ break;
+
+ case ERROR_FILE_TOO_LARGE:
+ errno = EFBIG;
+ break;
+
+ case ERROR_ALREADY_EXISTS:
+ case ERROR_FILE_EXISTS:
+ errno = EEXIST;
+ break;
+
+ case ERROR_FILE_NOT_FOUND:
+ case ERROR_PATH_NOT_FOUND:
+ case ERROR_INVALID_DRIVE:
+ case ERROR_BAD_PATHNAME:
+ case ERROR_INVALID_NAME:
+ case ERROR_BAD_UNIT:
+ errno = ENOENT;
+ break;
+
+ case ERROR_SEEK_ON_DEVICE:
+ case ERROR_NEGATIVE_SEEK:
+ errno = ESPIPE;
+ break;
+
+ case ERROR_ACCESS_DENIED:
+ errno = EACCES;
+ break;
+
+ case ERROR_DIR_NOT_EMPTY:
+ errno = ENOTEMPTY;
+ break;
+
+ case ERROR_BROKEN_PIPE:
+ errno = EPIPE;
+ break;
+
+ case ERROR_GEN_FAILURE:
+ errno = EIO;
+ break;
+
+ case ERROR_OPEN_FAILED:
+ errno = EIO;
+ break;
+
+ case ERROR_SUCCESS:
+ errno = 0;
+ break;
+
+ default:
+ /* This is just a canary, if you find this
+ * error then it means we need to expand the
+ * translation list.
+ */
+
+ errno = EOWNERDEAD;
+ break;
+ }
+}
+
+static int get_mode(unsigned int attr)
+{
+ if (attr & FILE_ATTRIBUTE_DIRECTORY) {
+ return WIN32_S_IFDIR;
+ }
+ return WIN32_S_IFREG;
+}
+
+
+
+static int is_symlink(const char *path)
+{
+ WIN32_FIND_DATA data;
+ HANDLE h;
+
+ SetLastError(0);
+ reset_errno();
+
+ h = FindFirstFileA(path, &data);
+
+ if (h == INVALID_HANDLE_VALUE) {
+ propagate_last_error_to_errno();
+
+ return 0;
+ }
+
+ FindClose(h);
+
+ /*
+ * A NTFS symlink is a file with a bit of metadata ("reparse point"),
+ * So (1) check if the file has metadata and then (2) confirm that
+ * it is indeed a symlink.
+ */
+ if (data.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) {
+ if (data.dwReserved0 == IO_REPARSE_TAG_SYMLINK) {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+static int hstat(HANDLE h, struct win32_stat *wst)
+{
+ BY_HANDLE_FILE_INFORMATION info;
+ FILE_STANDARD_INFO std;
+
+ SetLastError(0);
+ reset_errno();
+
+ if (!GetFileInformationByHandle(h, &info)) {
+ propagate_last_error_to_errno();
+
+ return -1;
+ }
+
+ if (!GetFileInformationByHandleEx(h, FileStandardInfo,
+ &std, sizeof(std))) {
+ propagate_last_error_to_errno();
+
+ return -1;
+ }
+
+ wst->st_nlink = std.NumberOfLinks;
+ if (std.DeletePending) {
+ wst->st_nlink = 0;
+ }
+
+ wst->st_mode = get_mode(info.dwFileAttributes);
+ wst->st_size = UINT64(info.nFileSizeHigh, info.nFileSizeLow);
+ wst->st_ino = UINT64(info.nFileIndexHigh, info.nFileIndexLow);
+ wst->st_mtime = filetime_to_epoch(&info.ftLastWriteTime);
+
+ return 0;
+}
+
+int win32_stat(const char *path, struct win32_stat *wst)
+{
+ HANDLE h;
+
+ SetLastError(0);
+ reset_errno();
+
+ h = CreateFileA(path,
+ GENERIC_READ,
+ FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,
+ NULL, /* lpSecurityAttributes */
+ OPEN_EXISTING, /* dwCreationDisposition */
+ 0, /* dwFlagsAndAttributes */
+ NULL); /* hTemplateFile */
+
+ if (h == INVALID_HANDLE_VALUE) {
+ propagate_last_error_to_errno();
+
+ return -1;
+ }
+
+ if (hstat(h, wst)) {
+ CloseHandle(h);
+ return -1;
+ }
+
+ CloseHandle(h);
+ return 0;
+}
+
+int win32_lstat(const char *path, struct win32_stat *wst)
+{
+ HANDLE h;
+
+ SetLastError(0);
+ reset_errno();
+
+ h = CreateFileA(path,
+ GENERIC_READ,
+ FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,
+ NULL, /* lpSecurityAttributes */
+ OPEN_EXISTING, /* dwCreationDisposition */
+ FILE_FLAG_OPEN_REPARSE_POINT,
+ NULL); /* hTemplateFile */
+
+ if (h == INVALID_HANDLE_VALUE) {
+ propagate_last_error_to_errno();
+
+ return -1;
+ }
+
+ if (hstat(h, wst)) {
+ CloseHandle(h);
+ return -1;
+ }
+
+ if (is_symlink(path)) {
+ wst->st_mode = WIN32_S_IFLNK;
+ }
+
+ CloseHandle(h);
+ return 0;
+}
+
+int win32_fstat(int fd, struct win32_stat *wst)
+{
+ HANDLE h;
+
+ SetLastError(0);
+ reset_errno();
+
+ h = (HANDLE) _get_osfhandle(fd);
+
+ if (h == INVALID_HANDLE_VALUE) {
+ propagate_last_error_to_errno();
+
+ return -1;
+ }
+
+ return hstat(h, wst);
+}