summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/in_tail/tail_file.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/plugins/in_tail/tail_file.c')
-rw-r--r--fluent-bit/plugins/in_tail/tail_file.c1860
1 files changed, 1860 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_tail/tail_file.c b/fluent-bit/plugins/in_tail/tail_file.c
new file mode 100644
index 000000000..2385f0626
--- /dev/null
+++ b/fluent-bit/plugins/in_tail/tail_file.c
@@ -0,0 +1,1860 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <time.h>
+#ifdef FLB_SYSTEM_FREEBSD
+#include <sys/user.h>
+#include <libutil.h>
+#endif
+
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_parser.h>
+#ifdef FLB_HAVE_REGEX
+#include <fluent-bit/flb_regex.h>
+#include <fluent-bit/flb_hash_table.h>
+#endif
+
+#include "tail.h"
+#include "tail_file.h"
+#include "tail_config.h"
+#include "tail_db.h"
+#include "tail_signal.h"
+#include "tail_dockermode.h"
+#include "tail_multiline.h"
+#include "tail_scan.h"
+
+#ifdef FLB_SYSTEM_WINDOWS
+#include "win32.h"
+#endif
+
+#include <cfl/cfl.h>
+
+static inline void consume_bytes(char *buf, int bytes, int length)
+{
+ memmove(buf, buf + bytes, length - bytes);
+}
+
+static uint64_t stat_get_st_dev(struct stat *st)
+{
+#ifdef FLB_SYSTEM_WINDOWS
+ /* do you want to contribute with a way to extract volume serial number ? */
+ return 0;
+#else
+ return st->st_dev;
+#endif
+}
+
+static int stat_to_hash_bits(struct flb_tail_config *ctx, struct stat *st,
+ uint64_t *out_hash)
+{
+ int len;
+ uint64_t st_dev;
+ char tmp[64];
+
+ st_dev = stat_get_st_dev(st);
+
+ len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 ":%" PRIu64,
+ st_dev, (uint64_t)st->st_ino);
+
+ *out_hash = cfl_hash_64bits(tmp, len);
+ return 0;
+}
+
+static int stat_to_hash_key(struct flb_tail_config *ctx, struct stat *st,
+ flb_sds_t *key)
+{
+ uint64_t st_dev;
+ flb_sds_t tmp;
+ flb_sds_t buf;
+
+ buf = flb_sds_create_size(64);
+ if (!buf) {
+ return -1;
+ }
+
+ st_dev = stat_get_st_dev(st);
+ tmp = flb_sds_printf(&buf, "%" PRIu64 ":%" PRIu64,
+ st_dev, (uint64_t)st->st_ino);
+ if (!tmp) {
+ flb_sds_destroy(buf);
+ return -1;
+ }
+
+ *key = buf;
+ return 0;
+}
+
+/* Append custom keys and report the number of records processed */
+static int record_append_custom_keys(struct flb_tail_file *file,
+ char *in_data, size_t in_size,
+ char **out_data, size_t *out_size)
+{
+ int i;
+ int ret;
+ int records = 0;
+ msgpack_object k;
+ msgpack_object v;
+ struct flb_log_event event;
+ struct flb_tail_config *ctx;
+ struct flb_log_event_encoder encoder;
+ struct flb_log_event_decoder decoder;
+
+ ctx = (struct flb_tail_config *) file->config;
+
+ ret = flb_log_event_decoder_init(&decoder, in_data, in_size);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ return -1;
+ }
+
+ ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_log_event_decoder_destroy(&decoder);
+
+ return -2;
+ }
+
+ while (flb_log_event_decoder_next(&decoder, &event) ==
+ FLB_EVENT_DECODER_SUCCESS) {
+
+ ret = flb_log_event_encoder_begin_record(&encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_timestamp(&encoder, &event.timestamp);
+ }
+
+ /* append previous map keys */
+ for (i = 0; i < event.body->via.map.size; i++) {
+ k = event.body->via.map.ptr[i].key;
+ v = event.body->via.map.ptr[i].val;
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_msgpack_object(
+ &encoder,
+ &k);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_msgpack_object(
+ &encoder,
+ &v);
+ }
+ }
+
+ /* path_key */
+ if (ctx->path_key != NULL) {
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_cstring(
+ &encoder,
+ file->config->path_key);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_cstring(
+ &encoder,
+ file->orig_name);
+ }
+ }
+
+ /* offset_key */
+ if (ctx->offset_key != NULL) {
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_cstring(
+ &encoder,
+ file->config->offset_key);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_uint64(
+ &encoder,
+ file->offset +
+ file->last_processed_bytes);
+ }
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_commit_record(&encoder);
+ }
+ else {
+ flb_plg_error(file->config->ins, "error packing event : %d", ret);
+
+ flb_log_event_encoder_rollback_record(&encoder);
+ }
+
+ /* counter */
+ records++;
+ }
+
+ *out_data = encoder.output_buffer;
+ *out_size = encoder.output_length;
+
+ /* This function transfers ownership of the internal memory allocated by
+ * sbuffer using msgpack_sbuffer_release which means the caller is
+ * responsible for releasing the memory.
+ */
+ flb_log_event_encoder_claim_internal_buffer_ownership(&encoder);
+
+ flb_log_event_decoder_destroy(&decoder);
+ flb_log_event_encoder_destroy(&encoder);
+
+ return records;
+}
+
+static int flb_tail_repack_map(struct flb_log_event_encoder *encoder,
+ char *data,
+ size_t data_size)
+{
+ msgpack_unpacked source_map;
+ size_t offset;
+ int result;
+ size_t index;
+ msgpack_object value;
+ msgpack_object key;
+
+ result = FLB_EVENT_ENCODER_SUCCESS;
+
+ if (data_size > 0) {
+ msgpack_unpacked_init(&source_map);
+
+ offset = 0;
+ result = msgpack_unpack_next(&source_map,
+ data,
+ data_size,
+ &offset);
+
+ if (result == MSGPACK_UNPACK_SUCCESS) {
+ result = FLB_EVENT_ENCODER_SUCCESS;
+ }
+ else {
+ result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE;
+ }
+
+ for (index = 0;
+ index < source_map.data.via.map.size &&
+ result == FLB_EVENT_ENCODER_SUCCESS;
+ index++) {
+ key = source_map.data.via.map.ptr[index].key;
+ value = source_map.data.via.map.ptr[index].val;
+
+ result = flb_log_event_encoder_append_body_msgpack_object(
+ encoder,
+ &key);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_msgpack_object(
+ encoder,
+ &value);
+ }
+ }
+
+ msgpack_unpacked_destroy(&source_map);
+ }
+
+ return result;
+}
+
+int flb_tail_pack_line_map(struct flb_time *time, char **data,
+ size_t *data_size, struct flb_tail_file *file,
+ size_t processed_bytes)
+{
+ int result;
+
+ result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_timestamp(
+ file->sl_log_event_encoder, time);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_tail_repack_map(file->sl_log_event_encoder,
+ *data,
+ *data_size);
+ }
+
+ /* path_key */
+ if (file->config->path_key != NULL) {
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ file->sl_log_event_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key),
+ FLB_LOG_EVENT_STRING_VALUE(file->orig_name,
+ file->orig_name_len));
+ }
+ }
+
+ /* offset_key */
+ if (file->config->offset_key != NULL) {
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ file->sl_log_event_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key),
+ FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes));
+ }
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder);
+ }
+ else {
+ flb_log_event_encoder_rollback_record(file->sl_log_event_encoder);
+ }
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(file->config->ins, "error packing event");
+
+ return -1;
+ }
+
+ return 0;
+}
+
+int flb_tail_file_pack_line(struct flb_time *time, char *data, size_t data_size,
+ struct flb_tail_file *file, size_t processed_bytes)
+{
+ int result;
+
+ result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_timestamp(
+ file->sl_log_event_encoder, time);
+ }
+
+ /* path_key */
+ if (file->config->path_key != NULL) {
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ file->sl_log_event_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key),
+ FLB_LOG_EVENT_STRING_VALUE(file->orig_name,
+ file->orig_name_len));
+ }
+ }
+
+ /* offset_key */
+ if (file->config->offset_key != NULL) {
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ file->sl_log_event_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key),
+ FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes));
+ }
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ file->sl_log_event_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(file->config->key),
+ FLB_LOG_EVENT_STRING_VALUE(data,
+ data_size));
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder);
+ }
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(file->config->ins, "error packing event : %d", result);
+
+ return -1;
+ }
+
+ return 0;
+}
+
+static int ml_stream_buffer_append(struct flb_tail_file *file, char *buf_data, size_t buf_size)
+{
+ int result;
+
+ result = flb_log_event_encoder_emit_raw_record(
+ file->ml_log_event_encoder,
+ buf_data, buf_size);
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(file->config->ins,
+ "log event raw append error : %d",
+ result);
+
+ return -1;
+ }
+
+ return 0;
+}
+
+static int ml_stream_buffer_flush(struct flb_tail_config *ctx, struct flb_tail_file *file)
+{
+ if (file->ml_log_event_encoder->output_length > 0) {
+ flb_input_log_append(ctx->ins,
+ file->tag_buf,
+ file->tag_len,
+ file->ml_log_event_encoder->output_buffer,
+ file->ml_log_event_encoder->output_length);
+
+ flb_log_event_encoder_reset(file->ml_log_event_encoder);
+ }
+
+ return 0;
+}
+
+static int process_content(struct flb_tail_file *file, size_t *bytes)
+{
+ size_t len;
+ int lines = 0;
+ int ret;
+ size_t processed_bytes = 0;
+ char *data;
+ char *end;
+ char *p;
+ void *out_buf;
+ size_t out_size;
+ int crlf;
+ char *line;
+ size_t line_len;
+ char *repl_line;
+ size_t repl_line_len;
+ time_t now = time(NULL);
+ struct flb_time out_time = {0};
+ struct flb_tail_config *ctx;
+
+ ctx = (struct flb_tail_config *) file->config;
+
+ /* Parse the data content */
+ data = file->buf_data;
+ end = data + file->buf_len;
+
+ /* reset last processed bytes */
+ file->last_processed_bytes = 0;
+
+ /* Skip null characters from the head (sometimes introduced by copy-truncate log rotation) */
+ while (data < end && *data == '\0') {
+ data++;
+ processed_bytes++;
+ }
+
+ while (data < end && (p = memchr(data, '\n', end - data))) {
+ len = (p - data);
+ crlf = 0;
+ if (file->skip_next == FLB_TRUE) {
+ data += len + 1;
+ processed_bytes += len + 1;
+ file->skip_next = FLB_FALSE;
+ continue;
+ }
+
+ /*
+ * Empty line (just breakline)
+ * ---------------------------
+ * [NOTE] with the new Multiline core feature and Multiline Filter on
+ * Fluent Bit v1.8.2, there are a couple of cases where stack traces
+ * or multi line patterns expects an empty line (meaning only the
+ * breakline), skipping empty lines on this plugin will break that
+ * functionality.
+ *
+ * We are introducing 'skip_empty_lines=off' configuration
+ * property to revert this behavior if some user is affected by
+ * this change.
+ */
+
+ if (len == 0 && ctx->skip_empty_lines) {
+ data++;
+ processed_bytes++;
+ continue;
+ }
+
+ /* Process '\r\n' */
+ if (len >= 2) {
+ crlf = (data[len-1] == '\r');
+ if (len == 1 && crlf) {
+ data += 2;
+ processed_bytes += 2;
+ continue;
+ }
+ }
+
+ /* Reset time for each line */
+ flb_time_zero(&out_time);
+
+ line = data;
+ line_len = len - crlf;
+ repl_line = NULL;
+
+ if (ctx->ml_ctx) {
+ ret = flb_ml_append_text(ctx->ml_ctx,
+ file->ml_stream_id,
+ &out_time,
+ line,
+ line_len);
+ goto go_next;
+ }
+ else if (ctx->docker_mode) {
+ ret = flb_tail_dmode_process_content(now, line, line_len,
+ &repl_line, &repl_line_len,
+ file, ctx);
+ if (ret >= 0) {
+ if (repl_line == line) {
+ repl_line = NULL;
+ }
+ else {
+ line = repl_line;
+ line_len = repl_line_len;
+ }
+ /* Skip normal parsers flow */
+ goto go_next;
+ }
+ else {
+ flb_tail_dmode_flush(file, ctx);
+ }
+ }
+
+#ifdef FLB_HAVE_PARSER
+ if (ctx->parser) {
+ /* Common parser (non-multiline) */
+ ret = flb_parser_do(ctx->parser, line, line_len,
+ &out_buf, &out_size, &out_time);
+ if (ret >= 0) {
+ if (flb_time_to_nanosec(&out_time) == 0L) {
+ flb_time_get(&out_time);
+ }
+
+ /* If multiline is enabled, flush any buffered data */
+ if (ctx->multiline == FLB_TRUE) {
+ flb_tail_mult_flush(file, ctx);
+ }
+
+ flb_tail_pack_line_map(&out_time,
+ (char**) &out_buf, &out_size, file,
+ processed_bytes);
+
+ flb_free(out_buf);
+ }
+ else {
+ /* Parser failed, pack raw text */
+ flb_tail_file_pack_line(NULL, data, len, file, processed_bytes);
+ }
+ }
+ else if (ctx->multiline == FLB_TRUE) {
+ ret = flb_tail_mult_process_content(now,
+ line, line_len,
+ file, ctx, processed_bytes);
+
+ /* No multiline */
+ if (ret == FLB_TAIL_MULT_NA) {
+ flb_tail_mult_flush(file, ctx);
+
+ flb_tail_file_pack_line(NULL,
+ line, line_len, file, processed_bytes);
+ }
+ else if (ret == FLB_TAIL_MULT_MORE) {
+ /* we need more data, do nothing */
+ goto go_next;
+ }
+ else if (ret == FLB_TAIL_MULT_DONE) {
+ /* Finalized */
+ }
+ }
+ else {
+ flb_tail_file_pack_line(NULL,
+ line, line_len, file, processed_bytes);
+ }
+#else
+ flb_tail_file_pack_line(NULL,
+ line, line_len, file, processed_bytes);
+#endif
+
+ go_next:
+ flb_free(repl_line);
+ repl_line = NULL;
+ /* Adjust counters */
+ data += len + 1;
+ processed_bytes += len + 1;
+ lines++;
+ file->parsed = 0;
+ file->last_processed_bytes += processed_bytes;
+ }
+ file->parsed = file->buf_len;
+
+ if (lines > 0) {
+ /* Append buffer content to a chunk */
+ *bytes = processed_bytes;
+
+ if (file->sl_log_event_encoder->output_length > 0) {
+ flb_input_log_append_records(ctx->ins,
+ lines,
+ file->tag_buf,
+ file->tag_len,
+ file->sl_log_event_encoder->output_buffer,
+ file->sl_log_event_encoder->output_length);
+
+ flb_log_event_encoder_reset(file->sl_log_event_encoder);
+ }
+ }
+ else if (file->skip_next) {
+ *bytes = file->buf_len;
+ }
+ else {
+ *bytes = processed_bytes;
+ }
+
+ if (ctx->ml_ctx) {
+ ml_stream_buffer_flush(ctx, file);
+ }
+
+ return lines;
+}
+
+static inline void drop_bytes(char *buf, size_t len, int pos, int bytes)
+{
+ memmove(buf + pos,
+ buf + pos + bytes,
+ len - pos - bytes);
+}
+
+#ifdef FLB_HAVE_REGEX
+static void cb_results(const char *name, const char *value,
+ size_t vlen, void *data)
+{
+ struct flb_hash_table *ht = data;
+
+ if (vlen == 0) {
+ return;
+ }
+
+ flb_hash_table_add(ht, name, strlen(name), (void *) value, vlen);
+}
+#endif
+
+#ifdef FLB_HAVE_REGEX
+static int tag_compose(char *tag, struct flb_regex *tag_regex, char *fname,
+ char *out_buf, size_t *out_size,
+ struct flb_tail_config *ctx)
+#else
+static int tag_compose(char *tag, char *fname, char *out_buf, size_t *out_size,
+ struct flb_tail_config *ctx)
+#endif
+{
+ int i;
+ size_t len;
+ char *p;
+ size_t buf_s = 0;
+#ifdef FLB_HAVE_REGEX
+ ssize_t n;
+ struct flb_regex_search result;
+ struct flb_hash_table *ht;
+ char *beg;
+ char *end;
+ int ret;
+ const char *tmp;
+ size_t tmp_s;
+#endif
+
+#ifdef FLB_HAVE_REGEX
+ if (tag_regex) {
+ n = flb_regex_do(tag_regex, fname, strlen(fname), &result);
+ if (n <= 0) {
+ flb_plg_error(ctx->ins, "invalid tag_regex pattern for file %s",
+ fname);
+ return -1;
+ }
+ else {
+ ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE,
+ FLB_HASH_TABLE_SIZE, FLB_HASH_TABLE_SIZE);
+ flb_regex_parse(tag_regex, &result, cb_results, ht);
+
+ for (p = tag, beg = p; (beg = strchr(p, '<')); p = end + 2) {
+ if (beg != p) {
+ len = (beg - p);
+ memcpy(out_buf + buf_s, p, len);
+ buf_s += len;
+ }
+
+ beg++;
+
+ end = strchr(beg, '>');
+ if (end && !memchr(beg, '<', end - beg)) {
+ end--;
+
+ len = end - beg + 1;
+ ret = flb_hash_table_get(ht, beg, len, (void *) &tmp, &tmp_s);
+ if (ret != -1) {
+ memcpy(out_buf + buf_s, tmp, tmp_s);
+ buf_s += tmp_s;
+ }
+ else {
+ memcpy(out_buf + buf_s, "_", 1);
+ buf_s++;
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "missing closing angle bracket in tag %s "
+ "at position %lu", tag, beg - tag);
+ flb_hash_table_destroy(ht);
+ return -1;
+ }
+ }
+
+ flb_hash_table_destroy(ht);
+ if (*p) {
+ len = strlen(p);
+ memcpy(out_buf + buf_s, p, len);
+ buf_s += len;
+ }
+ }
+ }
+ else {
+#endif
+ p = strchr(tag, '*');
+ if (!p) {
+ return -1;
+ }
+
+ /* Copy tag prefix if any */
+ len = (p - tag);
+ if (len > 0) {
+ memcpy(out_buf, tag, len);
+ buf_s += len;
+ }
+
+ /* Append file name */
+ len = strlen(fname);
+ memcpy(out_buf + buf_s, fname, len);
+ buf_s += len;
+
+ /* Tag suffix (if any) */
+ p++;
+ if (*p) {
+ len = strlen(tag);
+ memcpy(out_buf + buf_s, p, (len - (p - tag)));
+ buf_s += (len - (p - tag));
+ }
+
+ /* Sanitize buffer */
+ for (i = 0; i < buf_s; i++) {
+ if (out_buf[i] == '/' || out_buf[i] == '\\' || out_buf[i] == ':') {
+ if (i > 0) {
+ out_buf[i] = '.';
+ }
+ else {
+ drop_bytes(out_buf, buf_s, i, 1);
+ buf_s--;
+ i--;
+ }
+ }
+
+ if (i > 0 && out_buf[i] == '.') {
+ if (out_buf[i - 1] == '.') {
+ drop_bytes(out_buf, buf_s, i, 1);
+ buf_s--;
+ i--;
+ }
+ }
+ else if (out_buf[i] == '*') {
+ drop_bytes(out_buf, buf_s, i, 1);
+ buf_s--;
+ i--;
+ }
+ }
+
+ /* Check for an ending '.' */
+ if (out_buf[buf_s - 1] == '.') {
+ drop_bytes(out_buf, buf_s, buf_s - 1, 1);
+ buf_s--;
+ }
+#ifdef FLB_HAVE_REGEX
+ }
+#endif
+
+ out_buf[buf_s] = '\0';
+ *out_size = buf_s;
+
+ return 0;
+}
+
+static inline int flb_tail_file_exists(struct stat *st,
+ struct flb_tail_config *ctx)
+{
+ int ret;
+ uint64_t hash;
+
+ ret = stat_to_hash_bits(ctx, st, &hash);
+ if (ret != 0) {
+ return -1;
+ }
+
+ /* static hash */
+ if (flb_hash_table_exists(ctx->static_hash, hash)) {
+ return FLB_TRUE;
+ }
+
+ /* event hash */
+ if (flb_hash_table_exists(ctx->event_hash, hash)) {
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+/*
+ * Based in the configuration or database offset, set the proper 'offset' for the
+ * file in question.
+ */
+static int set_file_position(struct flb_tail_config *ctx,
+ struct flb_tail_file *file)
+{
+ int64_t ret;
+
+#ifdef FLB_HAVE_SQLDB
+ /*
+ * If the database option is enabled, try to gather the file position. The
+ * database function updates the file->offset entry.
+ */
+ if (ctx->db) {
+ ret = flb_tail_db_file_set(file, ctx);
+ if (ret == 0) {
+ if (file->offset > 0) {
+ ret = lseek(file->fd, file->offset, SEEK_SET);
+ if (ret == -1) {
+ flb_errno();
+ return -1;
+ }
+ }
+ else if (ctx->read_from_head == FLB_FALSE) {
+ ret = lseek(file->fd, 0, SEEK_END);
+ if (ret == -1) {
+ flb_errno();
+ return -1;
+ }
+ file->offset = ret;
+ flb_tail_db_file_offset(file, ctx);
+ }
+ return 0;
+ }
+ }
+#endif
+
+ if (ctx->read_from_head == FLB_TRUE) {
+ /* no need to seek, offset position is already zero */
+ return 0;
+ }
+
+ /* tail... */
+ ret = lseek(file->fd, 0, SEEK_END);
+ if (ret == -1) {
+ flb_errno();
+ return -1;
+ }
+ file->offset = ret;
+
+ return 0;
+}
+
+/* Multiline flush callback: invoked every time some content is complete */
+static int ml_flush_callback(struct flb_ml_parser *parser,
+ struct flb_ml_stream *mst,
+ void *data, char *buf_data, size_t buf_size)
+{
+ int result;
+ size_t mult_size = 0;
+ char *mult_buf = NULL;
+ struct flb_tail_file *file = data;
+ struct flb_tail_config *ctx = file->config;
+
+ if (ctx->path_key == NULL && ctx->offset_key == NULL) {
+ ml_stream_buffer_append(file, buf_data, buf_size);
+ }
+ else {
+ /* adjust the records in a new buffer */
+ result = record_append_custom_keys(file,
+ buf_data,
+ buf_size,
+ &mult_buf,
+ &mult_size);
+
+ if (result < 0) {
+ ml_stream_buffer_append(file, buf_data, buf_size);
+ }
+ else {
+ ml_stream_buffer_append(file, mult_buf, mult_size);
+
+ flb_free(mult_buf);
+ }
+ }
+
+ if (mst->forced_flush) {
+ ml_stream_buffer_flush(ctx, file);
+ }
+
+ return 0;
+}
+
+int flb_tail_file_append(char *path, struct stat *st, int mode,
+ struct flb_tail_config *ctx)
+{
+ int fd;
+ int ret;
+ uint64_t stream_id;
+ uint64_t ts;
+ uint64_t hash_bits;
+ flb_sds_t hash_key;
+ size_t len;
+ char *tag;
+ char *name;
+ size_t tag_len;
+ struct flb_tail_file *file;
+ struct stat lst;
+ flb_sds_t inode_str;
+
+ if (!S_ISREG(st->st_mode)) {
+ return -1;
+ }
+
+ if (flb_tail_file_exists(st, ctx) == FLB_TRUE) {
+ return -1;
+ }
+
+ fd = open(path, O_RDONLY);
+ if (fd == -1) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "cannot open %s", path);
+ return -1;
+ }
+
+ file = flb_calloc(1, sizeof(struct flb_tail_file));
+ if (!file) {
+ flb_errno();
+ goto error;
+ }
+
+ /* Initialize */
+ file->watch_fd = -1;
+ file->fd = fd;
+
+ /* On non-windows environments check if the original path is a link */
+ ret = lstat(path, &lst);
+ if (ret == 0) {
+ if (S_ISLNK(lst.st_mode)) {
+ file->is_link = FLB_TRUE;
+ file->link_inode = lst.st_ino;
+ }
+ }
+
+ /* get unique hash for this file */
+ ret = stat_to_hash_bits(ctx, st, &hash_bits);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "error procesisng hash bits for file %s", path);
+ goto error;
+ }
+ file->hash_bits = hash_bits;
+
+ /* store the hash key used for hash_bits */
+ ret = stat_to_hash_key(ctx, st, &hash_key);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "error procesisng hash key for file %s", path);
+ goto error;
+ }
+ file->hash_key = hash_key;
+
+ file->inode = st->st_ino;
+ file->offset = 0;
+ file->size = st->st_size;
+ file->buf_len = 0;
+ file->parsed = 0;
+ file->config = ctx;
+ file->tail_mode = mode;
+ file->tag_len = 0;
+ file->tag_buf = NULL;
+ file->rotated = 0;
+ file->pending_bytes = 0;
+ file->mult_firstline = FLB_FALSE;
+ file->mult_keys = 0;
+ file->mult_flush_timeout = 0;
+ file->mult_skipping = FLB_FALSE;
+
+ /*
+ * Duplicate string into 'file' structure, the called function
+ * take cares to resolve real-name of the file in case we are
+ * running in a non-Linux system.
+ *
+ * Depending of the operating system, the way to obtain the file
+ * name associated to it file descriptor can have different behaviors
+ * specifically if it root path it's under a symbolic link. On Linux
+ * we can trust the file name but in others it's better to solve it
+ * with some extra calls.
+ */
+ ret = flb_tail_file_name_dup(path, file);
+ if (!file->name) {
+ flb_errno();
+ goto error;
+ }
+
+ /* We keep a copy of the initial filename in orig_name. This is required
+ * for path_key to continue working after rotation. */
+ file->orig_name = flb_strdup(file->name);
+ if (!file->orig_name) {
+ flb_free(file->name);
+ flb_errno();
+ goto error;
+ }
+ file->orig_name_len = file->name_len;
+
+ /* multiline msgpack buffers */
+ file->mult_records = 0;
+ msgpack_sbuffer_init(&file->mult_sbuf);
+ msgpack_packer_init(&file->mult_pck, &file->mult_sbuf,
+ msgpack_sbuffer_write);
+
+ /* docker mode */
+ file->dmode_flush_timeout = 0;
+ file->dmode_complete = true;
+ file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0);
+ file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0);
+ file->dmode_firstline = false;
+#ifdef FLB_HAVE_SQLDB
+ file->db_id = 0;
+#endif
+ file->skip_next = FLB_FALSE;
+ file->skip_warn = FLB_FALSE;
+
+ /* Multiline core mode */
+ if (ctx->ml_ctx) {
+ /*
+ * Create inode str to get stream_id.
+ *
+ * If stream_id is created by filename,
+ * it will be same after file rotation and it causes invalid destruction:
+ *
+ * - https://github.com/fluent/fluent-bit/issues/4190
+ */
+ inode_str = flb_sds_create_size(64);
+ flb_sds_printf(&inode_str, "%"PRIu64, file->inode);
+
+ /* Create a stream for this file */
+ ret = flb_ml_stream_create(ctx->ml_ctx,
+ inode_str, flb_sds_len(inode_str),
+ ml_flush_callback, file,
+ &stream_id);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins,
+ "could not create multiline stream for file: %s",
+ inode_str);
+ flb_sds_destroy(inode_str);
+ goto error;
+ }
+ file->ml_stream_id = stream_id;
+ flb_sds_destroy(inode_str);
+
+ /*
+ * Multiline core file buffer: the multiline core functionality invokes a callback everytime a message is ready
+ * to be processed by the caller, this can be a multiline message or a message that is considered 'complete'. In
+ * the previous version of Tail, when it received a message this message was automatically ingested into the pipeline
+ * without any previous buffering which leads to performance degradation.
+ *
+ * The msgpack buffer 'ml_sbuf' keeps all ML provided records and it's flushed just when the file processor finish
+ * processing the "read() bytes".
+ */
+ }
+
+ /* Local buffer */
+ file->buf_size = ctx->buf_chunk_size;
+ file->buf_data = flb_malloc(file->buf_size);
+ if (!file->buf_data) {
+ flb_errno();
+ goto error;
+ }
+
+ /* Initialize (optional) dynamic tag */
+ if (ctx->dynamic_tag == FLB_TRUE) {
+ len = ctx->ins->tag_len + strlen(path) + 1;
+ tag = flb_malloc(len);
+ if (!tag) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "failed to allocate tag buffer");
+ goto error;
+ }
+#ifdef FLB_HAVE_REGEX
+ ret = tag_compose(ctx->ins->tag, ctx->tag_regex, path, tag, &tag_len, ctx);
+#else
+ ret = tag_compose(ctx->ins->tag, path, tag, &tag_len, ctx);
+#endif
+ if (ret == 0) {
+ file->tag_len = tag_len;
+ file->tag_buf = flb_strdup(tag);
+ }
+ flb_free(tag);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "failed to compose tag for file: %s", path);
+ goto error;
+ }
+ }
+ else {
+ file->tag_len = strlen(ctx->ins->tag);
+ file->tag_buf = flb_strdup(ctx->ins->tag);
+ }
+ if (!file->tag_buf) {
+ flb_plg_error(ctx->ins, "failed to set tag for file: %s", path);
+ flb_errno();
+ goto error;
+ }
+
+ if (mode == FLB_TAIL_STATIC) {
+ mk_list_add(&file->_head, &ctx->files_static);
+ ctx->files_static_count++;
+ flb_hash_table_add(ctx->static_hash, file->hash_key, flb_sds_len(file->hash_key),
+ file, sizeof(file));
+ tail_signal_manager(file->config);
+ }
+ else if (mode == FLB_TAIL_EVENT) {
+ mk_list_add(&file->_head, &ctx->files_event);
+ flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key),
+ file, sizeof(file));
+
+ /* Register this file into the fs_event monitoring */
+ ret = flb_tail_fs_add(ctx, file);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not register file into fs_events");
+ goto error;
+ }
+ }
+
+ /* Set the file position (database offset, head or tail) */
+ ret = set_file_position(ctx, file);
+ if (ret == -1) {
+ flb_tail_file_remove(file);
+ goto error;
+ }
+
+ /* Remaining bytes to read */
+ file->pending_bytes = file->size - file->offset;
+
+#ifdef FLB_HAVE_METRICS
+ name = (char *) flb_input_name(ctx->ins);
+ ts = cfl_time_now();
+ cmt_counter_inc(ctx->cmt_files_opened, ts, 1, (char *[]) {name});
+
+ /* Old api */
+ flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics);
+#endif
+
+ file->sl_log_event_encoder = flb_log_event_encoder_create(
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (file->sl_log_event_encoder == NULL) {
+ flb_tail_file_remove(file);
+
+ goto error;
+ }
+
+ file->ml_log_event_encoder = flb_log_event_encoder_create(
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (file->ml_log_event_encoder == NULL) {
+ flb_tail_file_remove(file);
+
+ goto error;
+ }
+
+ flb_plg_debug(ctx->ins,
+ "inode=%"PRIu64" with offset=%"PRId64" appended as %s",
+ file->inode, file->offset, path);
+ return 0;
+
+error:
+ if (file) {
+ if (file->buf_data) {
+ flb_free(file->buf_data);
+ }
+ if (file->name) {
+ flb_free(file->name);
+ }
+ flb_free(file);
+ }
+ close(fd);
+
+ return -1;
+}
+
+void flb_tail_file_remove(struct flb_tail_file *file)
+{
+ uint64_t ts;
+ char *name;
+ struct flb_tail_config *ctx;
+
+ ctx = file->config;
+
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" removing file name %s",
+ file->inode, file->name);
+
+ /* remove the multiline.core stream */
+ if (ctx->ml_ctx && file->ml_stream_id > 0) {
+ /* destroy ml stream */
+ flb_ml_stream_id_destroy_all(ctx->ml_ctx, file->ml_stream_id);
+ }
+
+ if (file->rotated > 0) {
+#ifdef FLB_HAVE_SQLDB
+ /*
+ * Make sure to remove a the file entry from the database if the file
+ * was rotated and it's not longer being monitored.
+ */
+ if (ctx->db) {
+ flb_tail_db_file_delete(file, file->config);
+ }
+#endif
+ mk_list_del(&file->_rotate_head);
+ }
+
+ msgpack_sbuffer_destroy(&file->mult_sbuf);
+
+ if (file->sl_log_event_encoder != NULL) {
+ flb_log_event_encoder_destroy(file->sl_log_event_encoder);
+ }
+
+ if (file->ml_log_event_encoder != NULL) {
+ flb_log_event_encoder_destroy(file->ml_log_event_encoder);
+ }
+
+ flb_sds_destroy(file->dmode_buf);
+ flb_sds_destroy(file->dmode_lastline);
+ mk_list_del(&file->_head);
+ flb_tail_fs_remove(ctx, file);
+
+ /* avoid deleting file with -1 fd */
+ if (file->fd != -1) {
+ close(file->fd);
+ }
+ if (file->tag_buf) {
+ flb_free(file->tag_buf);
+ }
+
+ /* remove any potential entry from the hash tables */
+ flb_hash_table_del(ctx->static_hash, file->hash_key);
+ flb_hash_table_del(ctx->event_hash, file->hash_key);
+
+ flb_free(file->buf_data);
+ flb_free(file->name);
+ flb_free(file->orig_name);
+ flb_free(file->real_name);
+ flb_sds_destroy(file->hash_key);
+
+#ifdef FLB_HAVE_METRICS
+ name = (char *) flb_input_name(ctx->ins);
+ ts = cfl_time_now();
+ cmt_counter_inc(ctx->cmt_files_closed, ts, 1, (char *[]) {name});
+
+ /* old api */
+ flb_metrics_sum(FLB_TAIL_METRIC_F_CLOSED, 1, ctx->ins->metrics);
+#endif
+
+ flb_free(file);
+}
+
+int flb_tail_file_remove_all(struct flb_tail_config *ctx)
+{
+ int count = 0;
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_tail_file *file;
+
+ mk_list_foreach_safe(head, tmp, &ctx->files_static) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ flb_tail_file_remove(file);
+ count++;
+ }
+
+ mk_list_foreach_safe(head, tmp, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ flb_tail_file_remove(file);
+ count++;
+ }
+
+ return count;
+}
+
+static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *file)
+{
+ int ret;
+ int64_t offset;
+ struct stat st;
+
+ ret = fstat(file->fd, &st);
+ if (ret == -1) {
+ flb_errno();
+ return FLB_TAIL_ERROR;
+ }
+
+ /* Check if the file was truncated */
+ if (file->offset > st.st_size) {
+ offset = lseek(file->fd, 0, SEEK_SET);
+ if (offset == -1) {
+ flb_errno();
+ return FLB_TAIL_ERROR;
+ }
+
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s",
+ file->inode, file->name);
+ file->offset = offset;
+ file->buf_len = 0;
+
+ /* Update offset in the database file */
+#ifdef FLB_HAVE_SQLDB
+ if (ctx->db) {
+ flb_tail_db_file_offset(file, ctx);
+ }
+#endif
+ }
+ else {
+ file->size = st.st_size;
+ file->pending_bytes = (st.st_size - file->offset);
+ }
+
+ return FLB_TAIL_OK;
+}
+
+int flb_tail_file_chunk(struct flb_tail_file *file)
+{
+ int ret;
+ char *tmp;
+ size_t size;
+ size_t capacity;
+ size_t processed_bytes;
+ ssize_t bytes;
+ struct flb_tail_config *ctx;
+
+ /* Check if we the engine issued a pause */
+ ctx = file->config;
+ if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) {
+ return FLB_TAIL_BUSY;
+ }
+
+ capacity = (file->buf_size - file->buf_len) - 1;
+ if (capacity < 1) {
+ /*
+ * If there is no more room for more data, try to increase the
+ * buffer under the limit of buffer_max_size.
+ */
+ if (file->buf_size >= ctx->buf_max_size) {
+ if (ctx->skip_long_lines == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, "
+ "lines are too long. Skipping file.", file->name);
+ return FLB_TAIL_ERROR;
+ }
+
+ /* Warn the user */
+ if (file->skip_warn == FLB_FALSE) {
+ flb_plg_warn(ctx->ins, "file=%s have long lines. "
+ "Skipping long lines.", file->name);
+ file->skip_warn = FLB_TRUE;
+ }
+
+ /* Do buffer adjustments */
+ file->offset += file->buf_len;
+ file->buf_len = 0;
+ file->skip_next = FLB_TRUE;
+ }
+ else {
+ size = file->buf_size + ctx->buf_chunk_size;
+ if (size > ctx->buf_max_size) {
+ size = ctx->buf_max_size;
+ }
+
+ /* Increase the buffer size */
+ tmp = flb_realloc(file->buf_data, size);
+ if (tmp) {
+ flb_plg_trace(ctx->ins, "file=%s increase buffer size "
+ "%lu => %lu bytes",
+ file->name, file->buf_size, size);
+ file->buf_data = tmp;
+ file->buf_size = size;
+ }
+ else {
+ flb_errno();
+ flb_plg_error(ctx->ins, "cannot increase buffer size for %s, "
+ "skipping file.", file->name);
+ return FLB_TAIL_ERROR;
+ }
+ }
+ capacity = (file->buf_size - file->buf_len) - 1;
+ }
+
+ bytes = read(file->fd, file->buf_data + file->buf_len, capacity);
+ if (bytes > 0) {
+ /* we read some data, let the content processor take care of it */
+ file->buf_len += bytes;
+ file->buf_data[file->buf_len] = '\0';
+
+ /* Now that we have some data in the buffer, call the data processor
+ * which aims to cut lines and register the entries into the engine.
+ *
+ * The returned value is the absolute offset the file must be seek
+ * now. It may need to get back a few bytes at the beginning of a new
+ * line.
+ */
+ ret = process_content(file, &processed_bytes);
+ if (ret < 0) {
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR",
+ file->inode, file->name);
+ return FLB_TAIL_ERROR;
+ }
+
+ /* Adjust the file offset and buffer */
+ file->offset += processed_bytes;
+ consume_bytes(file->buf_data, processed_bytes, file->buf_len);
+ file->buf_len -= processed_bytes;
+ file->buf_data[file->buf_len] = '\0';
+
+#ifdef FLB_HAVE_SQLDB
+ if (file->config->db) {
+ flb_tail_db_file_offset(file, file->config);
+ }
+#endif
+
+ /* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */
+ ret = adjust_counters(ctx, file);
+
+ /* Data was consumed but likely some bytes still remain */
+ return ret;
+ }
+ else if (bytes == 0) {
+ /* We reached the end of file, let's wait for some incoming data */
+ ret = adjust_counters(ctx, file);
+ if (ret == FLB_TAIL_OK) {
+ return FLB_TAIL_WAIT;
+ }
+ else {
+ return FLB_TAIL_ERROR;
+ }
+ }
+ else {
+ /* error */
+ flb_errno();
+ flb_plg_error(ctx->ins, "error reading %s", file->name);
+ return FLB_TAIL_ERROR;
+ }
+
+ return FLB_TAIL_ERROR;
+}
+
+/* Returns FLB_TRUE if a file has been rotated, otherwise FLB_FALSE */
+int flb_tail_file_is_rotated(struct flb_tail_config *ctx,
+ struct flb_tail_file *file)
+{
+ int ret;
+ char *name;
+ struct stat st;
+
+ /*
+ * Do not double-check already rotated files since the caller of this
+ * function will trigger a rotation.
+ */
+ if (file->rotated != 0) {
+ return FLB_FALSE;
+ }
+
+ /* Check if the 'original monitored file' is a link and rotated */
+ if (file->is_link == FLB_TRUE) {
+ ret = lstat(file->name, &st);
+ if (ret == -1) {
+ /* Broken link or missing file */
+ if (errno == ENOENT) {
+ flb_plg_info(ctx->ins, "inode=%"PRIu64" link_rotated: %s",
+ file->link_inode, file->name);
+ return FLB_TRUE;
+ }
+ else {
+ flb_errno();
+ flb_plg_error(ctx->ins,
+ "link_inode=%"PRIu64" cannot detect if file: %s",
+ file->link_inode, file->name);
+ return -1;
+ }
+ }
+ else {
+ /* The file name is there, check if the same that we have */
+ if (st.st_ino != file->link_inode) {
+ return FLB_TRUE;
+ }
+ }
+ }
+
+ /* Retrieve the real file name, operating system lookup */
+ name = flb_tail_file_name(file);
+ if (!name) {
+ flb_plg_error(ctx->ins,
+ "inode=%"PRIu64" cannot detect if file was rotated: %s",
+ file->inode, file->name);
+ return -1;
+ }
+
+
+ /* Get stats from the file name */
+ ret = stat(name, &st);
+ if (ret == -1) {
+ flb_errno();
+ flb_free(name);
+ return -1;
+ }
+
+ /* Compare inodes and names */
+ if (file->inode == st.st_ino &&
+ flb_tail_target_file_name_cmp(name, file) == 0) {
+ flb_free(name);
+ return FLB_FALSE;
+ }
+
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated: %s => %s",
+ file->inode, file->name, name);
+
+ flb_free(name);
+ return FLB_TRUE;
+}
+
+/* Promote a event in the static list to the dynamic 'events' interface */
+int flb_tail_file_to_event(struct flb_tail_file *file)
+{
+ int ret;
+ struct stat st;
+ struct flb_tail_config *ctx = file->config;
+
+ /* Check if the file promoted have pending bytes */
+ ret = fstat(file->fd, &st);
+ if (ret != 0) {
+ flb_errno();
+ return -1;
+ }
+
+ if (file->offset < st.st_size) {
+ file->pending_bytes = (st.st_size - file->offset);
+ tail_signal_pending(file->config);
+ }
+ else {
+ file->pending_bytes = 0;
+ }
+
+ /* Check if the file has been rotated */
+ ret = flb_tail_file_is_rotated(ctx, file);
+ if (ret == FLB_TRUE) {
+ flb_tail_file_rotated(file);
+ }
+
+ /* Notify the fs-event handler that we will start monitoring this 'file' */
+ ret = flb_tail_fs_add(ctx, file);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* List swap: change from 'static' to 'event' list */
+ mk_list_del(&file->_head);
+ ctx->files_static_count--;
+ flb_hash_table_del(ctx->static_hash, file->hash_key);
+
+ mk_list_add(&file->_head, &file->config->files_event);
+ flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key),
+ file, sizeof(file));
+
+ file->tail_mode = FLB_TAIL_EVENT;
+
+ return 0;
+}
+
+/*
+ * Given an open file descriptor, return the filename. This function is a
+ * bit slow and it aims to be used only when a file is rotated.
+ */
+char *flb_tail_file_name(struct flb_tail_file *file)
+{
+ int ret;
+ char *buf;
+#ifdef __linux__
+ ssize_t s;
+ char tmp[128];
+#elif defined(__APPLE__)
+ char path[PATH_MAX];
+#elif defined(FLB_SYSTEM_WINDOWS)
+ HANDLE h;
+#elif defined(FLB_SYSTEM_FREEBSD)
+ struct kinfo_file *file_entries;
+ int file_count;
+ int file_index;
+#endif
+
+ buf = flb_malloc(PATH_MAX);
+ if (!buf) {
+ flb_errno();
+ return NULL;
+ }
+
+#ifdef __linux__
+ ret = snprintf(tmp, sizeof(tmp) - 1, "/proc/%i/fd/%i", getpid(), file->fd);
+ if (ret == -1) {
+ flb_errno();
+ flb_free(buf);
+ return NULL;
+ }
+
+ s = readlink(tmp, buf, PATH_MAX);
+ if (s == -1) {
+ flb_free(buf);
+ flb_errno();
+ return NULL;
+ }
+ buf[s] = '\0';
+
+#elif __APPLE__
+ int len;
+
+ ret = fcntl(file->fd, F_GETPATH, path);
+ if (ret == -1) {
+ flb_errno();
+ flb_free(buf);
+ return NULL;
+ }
+
+ len = strlen(path);
+ memcpy(buf, path, len);
+ buf[len] = '\0';
+
+#elif defined(FLB_SYSTEM_WINDOWS)
+ int len;
+
+ h = (HANDLE) _get_osfhandle(file->fd);
+ if (h == INVALID_HANDLE_VALUE) {
+ flb_errno();
+ flb_free(buf);
+ return NULL;
+ }
+
+ /* This function returns the length of the string excluding "\0"
+ * and the resulting path has a "\\?\" prefix.
+ */
+ len = GetFinalPathNameByHandleA(h, buf, PATH_MAX, FILE_NAME_NORMALIZED);
+ if (len == 0 || len >= PATH_MAX) {
+ flb_free(buf);
+ return NULL;
+ }
+
+ if (strstr(buf, "\\\\?\\")) {
+ memmove(buf, buf + 4, len + 1);
+ }
+#elif defined(FLB_SYSTEM_FREEBSD)
+ if ((file_entries = kinfo_getfile(getpid(), &file_count)) == NULL) {
+ flb_free(buf);
+ return NULL;
+ }
+
+ for (file_index=0; file_index < file_count; file_index++) {
+ if (file_entries[file_index].kf_fd == file->fd) {
+ strncpy(buf, file_entries[file_index].kf_path, PATH_MAX - 1);
+ buf[PATH_MAX - 1] = 0;
+ break;
+ }
+ }
+ free(file_entries);
+#endif
+ return buf;
+}
+
+int flb_tail_file_name_dup(char *path, struct flb_tail_file *file)
+{
+ file->name = flb_strdup(path);
+ if (!file->name) {
+ flb_errno();
+ return -1;
+ }
+ file->name_len = strlen(file->name);
+
+ if (file->real_name) {
+ flb_free(file->real_name);
+ }
+
+ file->real_name = flb_tail_file_name(file);
+ if (!file->real_name) {
+ flb_errno();
+ flb_free(file->name);
+ file->name = NULL;
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Invoked every time a file was rotated */
+int flb_tail_file_rotated(struct flb_tail_file *file)
+{
+ int ret;
+ uint64_t ts;
+ char *name;
+ char *i_name;
+ char *tmp;
+ struct stat st;
+ struct flb_tail_config *ctx = file->config;
+
+ /* Get the new file name */
+ name = flb_tail_file_name(file);
+ if (!name) {
+ return -1;
+ }
+
+ flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated %s -> %s",
+ file->inode, file->name, name);
+
+ /* Update local file entry */
+ tmp = file->name;
+ flb_tail_file_name_dup(name, file);
+ flb_plg_info(ctx->ins, "inode=%"PRIu64" handle rotation(): %s => %s",
+ file->inode, tmp, file->name);
+ if (file->rotated == 0) {
+ file->rotated = time(NULL);
+ mk_list_add(&file->_rotate_head, &file->config->files_rotated);
+
+ /* Rotate the file in the database */
+#ifdef FLB_HAVE_SQLDB
+ if (file->config->db) {
+ ret = flb_tail_db_file_rotate(name, file, file->config);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not rotate file %s->%s in database",
+ file->name, name);
+ }
+ }
+#endif
+
+#ifdef FLB_HAVE_METRICS
+ i_name = (char *) flb_input_name(ctx->ins);
+ ts = cfl_time_now();
+ cmt_counter_inc(ctx->cmt_files_rotated, ts, 1, (char *[]) {i_name});
+
+ /* OLD api */
+ flb_metrics_sum(FLB_TAIL_METRIC_F_ROTATED,
+ 1, file->config->ins->metrics);
+#endif
+
+ /* Check if a new file has been created */
+ ret = stat(tmp, &st);
+ if (ret == 0 && st.st_ino != file->inode) {
+ if (flb_tail_file_exists(&st, ctx) == FLB_FALSE) {
+ ret = flb_tail_file_append(tmp, &st, FLB_TAIL_STATIC, ctx);
+ if (ret == -1) {
+ flb_tail_scan(ctx->path_list, ctx);
+ }
+ else {
+ tail_signal_manager(file->config);
+ }
+ }
+ }
+ }
+ flb_free(tmp);
+ flb_free(name);
+
+ return 0;
+}
+
+static int check_purge_deleted_file(struct flb_tail_config *ctx,
+ struct flb_tail_file *file, time_t ts)
+{
+ int ret;
+ struct stat st;
+
+ ret = fstat(file->fd, &st);
+ if (ret == -1) {
+ flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name);
+ flb_tail_file_remove(file);
+ return FLB_TRUE;
+ }
+
+ if (st.st_nlink == 0) {
+ flb_plg_debug(ctx->ins, "purge: monitored file has been deleted: %s",
+ file->name);
+#ifdef FLB_HAVE_SQLDB
+ if (ctx->db) {
+ /* Remove file entry from the database */
+ flb_tail_db_file_delete(file, file->config);
+ }
+#endif
+ /* Remove file from the monitored list */
+ flb_tail_file_remove(file);
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+/* Purge rotated and deleted files */
+int flb_tail_file_purge(struct flb_input_instance *ins,
+ struct flb_config *config, void *context)
+{
+ int ret;
+ int count = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_tail_file *file;
+ struct flb_tail_config *ctx = context;
+ time_t now;
+ struct stat st;
+
+ /* Rotated files */
+ now = time(NULL);
+ mk_list_foreach_safe(head, tmp, &ctx->files_rotated) {
+ file = mk_list_entry(head, struct flb_tail_file, _rotate_head);
+ if ((file->rotated + ctx->rotate_wait) <= now) {
+ ret = fstat(file->fd, &st);
+ if (ret == 0) {
+ flb_plg_debug(ctx->ins,
+ "inode=%"PRIu64" purge rotated file %s " \
+ "(offset=%"PRId64" / size = %"PRIu64")",
+ file->inode, file->name, file->offset, (uint64_t)st.st_size);
+ if (file->pending_bytes > 0 && flb_input_buf_paused(ins)) {
+ flb_plg_warn(ctx->ins, "purged rotated file while data "
+ "ingestion is paused, consider increasing "
+ "rotate_wait");
+ }
+ }
+ else {
+ flb_plg_debug(ctx->ins,
+ "inode=%"PRIu64" purge rotated file %s (offset=%"PRId64")",
+ file->inode, file->name, file->offset);
+ }
+
+ flb_tail_file_remove(file);
+ count++;
+ }
+ }
+
+ /*
+ * Deleted files: under high load scenarios, exists the chances that in
+ * our event loop we miss some notifications about a file. In order to
+ * sanitize our list of monitored files we will iterate all of them and check
+ * if they have been deleted or not.
+ */
+ mk_list_foreach_safe(head, tmp, &ctx->files_static) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ check_purge_deleted_file(ctx, file, now);
+ }
+ mk_list_foreach_safe(head, tmp, &ctx->files_event) {
+ file = mk_list_entry(head, struct flb_tail_file, _head);
+ check_purge_deleted_file(ctx, file, now);
+ }
+
+ return count;
+}