diff options
Diffstat (limited to 'fluent-bit/plugins/in_tail/tail_file.c')
-rw-r--r-- | fluent-bit/plugins/in_tail/tail_file.c | 1860 |
1 files changed, 1860 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_tail/tail_file.c b/fluent-bit/plugins/in_tail/tail_file.c new file mode 100644 index 000000000..2385f0626 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_file.c @@ -0,0 +1,1860 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <time.h> +#ifdef FLB_SYSTEM_FREEBSD +#include <sys/user.h> +#include <libutil.h> +#endif + +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_parser.h> +#ifdef FLB_HAVE_REGEX +#include <fluent-bit/flb_regex.h> +#include <fluent-bit/flb_hash_table.h> +#endif + +#include "tail.h" +#include "tail_file.h" +#include "tail_config.h" +#include "tail_db.h" +#include "tail_signal.h" +#include "tail_dockermode.h" +#include "tail_multiline.h" +#include "tail_scan.h" + +#ifdef FLB_SYSTEM_WINDOWS +#include "win32.h" +#endif + +#include <cfl/cfl.h> + +static inline void consume_bytes(char *buf, int bytes, int length) +{ + memmove(buf, buf + bytes, length - bytes); +} + +static uint64_t stat_get_st_dev(struct stat *st) +{ +#ifdef FLB_SYSTEM_WINDOWS + /* do you want to contribute with a way to extract volume serial number ? */ + return 0; +#else + return st->st_dev; +#endif +} + +static int stat_to_hash_bits(struct flb_tail_config *ctx, struct stat *st, + uint64_t *out_hash) +{ + int len; + uint64_t st_dev; + char tmp[64]; + + st_dev = stat_get_st_dev(st); + + len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 ":%" PRIu64, + st_dev, (uint64_t)st->st_ino); + + *out_hash = cfl_hash_64bits(tmp, len); + return 0; +} + +static int stat_to_hash_key(struct flb_tail_config *ctx, struct stat *st, + flb_sds_t *key) +{ + uint64_t st_dev; + flb_sds_t tmp; + flb_sds_t buf; + + buf = flb_sds_create_size(64); + if (!buf) { + return -1; + } + + st_dev = stat_get_st_dev(st); + tmp = flb_sds_printf(&buf, "%" PRIu64 ":%" PRIu64, + st_dev, (uint64_t)st->st_ino); + if (!tmp) { + flb_sds_destroy(buf); + return -1; + } + + *key = buf; + return 0; +} + +/* Append custom keys and report the number of records processed */ +static int record_append_custom_keys(struct flb_tail_file *file, + char *in_data, size_t in_size, + char **out_data, size_t *out_size) +{ + int i; + int ret; + int records = 0; + msgpack_object k; + msgpack_object v; + struct flb_log_event event; + struct flb_tail_config *ctx; + struct flb_log_event_encoder encoder; + struct flb_log_event_decoder decoder; + + ctx = (struct flb_tail_config *) file->config; + + ret = flb_log_event_decoder_init(&decoder, in_data, in_size); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_decoder_destroy(&decoder); + + return -2; + } + + while (flb_log_event_decoder_next(&decoder, &event) == + FLB_EVENT_DECODER_SUCCESS) { + + ret = flb_log_event_encoder_begin_record(&encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp(&encoder, &event.timestamp); + } + + /* append previous map keys */ + for (i = 0; i < event.body->via.map.size; i++) { + k = event.body->via.map.ptr[i].key; + v = event.body->via.map.ptr[i].val; + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_msgpack_object( + &encoder, + &k); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_msgpack_object( + &encoder, + &v); + } + } + + /* path_key */ + if (ctx->path_key != NULL) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_cstring( + &encoder, + file->config->path_key); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_cstring( + &encoder, + file->orig_name); + } + } + + /* offset_key */ + if (ctx->offset_key != NULL) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_cstring( + &encoder, + file->config->offset_key); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_uint64( + &encoder, + file->offset + + file->last_processed_bytes); + } + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(&encoder); + } + else { + flb_plg_error(file->config->ins, "error packing event : %d", ret); + + flb_log_event_encoder_rollback_record(&encoder); + } + + /* counter */ + records++; + } + + *out_data = encoder.output_buffer; + *out_size = encoder.output_length; + + /* This function transfers ownership of the internal memory allocated by + * sbuffer using msgpack_sbuffer_release which means the caller is + * responsible for releasing the memory. + */ + flb_log_event_encoder_claim_internal_buffer_ownership(&encoder); + + flb_log_event_decoder_destroy(&decoder); + flb_log_event_encoder_destroy(&encoder); + + return records; +} + +static int flb_tail_repack_map(struct flb_log_event_encoder *encoder, + char *data, + size_t data_size) +{ + msgpack_unpacked source_map; + size_t offset; + int result; + size_t index; + msgpack_object value; + msgpack_object key; + + result = FLB_EVENT_ENCODER_SUCCESS; + + if (data_size > 0) { + msgpack_unpacked_init(&source_map); + + offset = 0; + result = msgpack_unpack_next(&source_map, + data, + data_size, + &offset); + + if (result == MSGPACK_UNPACK_SUCCESS) { + result = FLB_EVENT_ENCODER_SUCCESS; + } + else { + result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; + } + + for (index = 0; + index < source_map.data.via.map.size && + result == FLB_EVENT_ENCODER_SUCCESS; + index++) { + key = source_map.data.via.map.ptr[index].key; + value = source_map.data.via.map.ptr[index].val; + + result = flb_log_event_encoder_append_body_msgpack_object( + encoder, + &key); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_msgpack_object( + encoder, + &value); + } + } + + msgpack_unpacked_destroy(&source_map); + } + + return result; +} + +int flb_tail_pack_line_map(struct flb_time *time, char **data, + size_t *data_size, struct flb_tail_file *file, + size_t processed_bytes) +{ + int result; + + result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_set_timestamp( + file->sl_log_event_encoder, time); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_tail_repack_map(file->sl_log_event_encoder, + *data, + *data_size); + } + + /* path_key */ + if (file->config->path_key != NULL) { + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + file->sl_log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key), + FLB_LOG_EVENT_STRING_VALUE(file->orig_name, + file->orig_name_len)); + } + } + + /* offset_key */ + if (file->config->offset_key != NULL) { + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + file->sl_log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key), + FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes)); + } + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder); + } + else { + flb_log_event_encoder_rollback_record(file->sl_log_event_encoder); + } + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(file->config->ins, "error packing event"); + + return -1; + } + + return 0; +} + +int flb_tail_file_pack_line(struct flb_time *time, char *data, size_t data_size, + struct flb_tail_file *file, size_t processed_bytes) +{ + int result; + + result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_set_timestamp( + file->sl_log_event_encoder, time); + } + + /* path_key */ + if (file->config->path_key != NULL) { + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + file->sl_log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key), + FLB_LOG_EVENT_STRING_VALUE(file->orig_name, + file->orig_name_len)); + } + } + + /* offset_key */ + if (file->config->offset_key != NULL) { + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + file->sl_log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key), + FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes)); + } + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + file->sl_log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(file->config->key), + FLB_LOG_EVENT_STRING_VALUE(data, + data_size)); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder); + } + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(file->config->ins, "error packing event : %d", result); + + return -1; + } + + return 0; +} + +static int ml_stream_buffer_append(struct flb_tail_file *file, char *buf_data, size_t buf_size) +{ + int result; + + result = flb_log_event_encoder_emit_raw_record( + file->ml_log_event_encoder, + buf_data, buf_size); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(file->config->ins, + "log event raw append error : %d", + result); + + return -1; + } + + return 0; +} + +static int ml_stream_buffer_flush(struct flb_tail_config *ctx, struct flb_tail_file *file) +{ + if (file->ml_log_event_encoder->output_length > 0) { + flb_input_log_append(ctx->ins, + file->tag_buf, + file->tag_len, + file->ml_log_event_encoder->output_buffer, + file->ml_log_event_encoder->output_length); + + flb_log_event_encoder_reset(file->ml_log_event_encoder); + } + + return 0; +} + +static int process_content(struct flb_tail_file *file, size_t *bytes) +{ + size_t len; + int lines = 0; + int ret; + size_t processed_bytes = 0; + char *data; + char *end; + char *p; + void *out_buf; + size_t out_size; + int crlf; + char *line; + size_t line_len; + char *repl_line; + size_t repl_line_len; + time_t now = time(NULL); + struct flb_time out_time = {0}; + struct flb_tail_config *ctx; + + ctx = (struct flb_tail_config *) file->config; + + /* Parse the data content */ + data = file->buf_data; + end = data + file->buf_len; + + /* reset last processed bytes */ + file->last_processed_bytes = 0; + + /* Skip null characters from the head (sometimes introduced by copy-truncate log rotation) */ + while (data < end && *data == '\0') { + data++; + processed_bytes++; + } + + while (data < end && (p = memchr(data, '\n', end - data))) { + len = (p - data); + crlf = 0; + if (file->skip_next == FLB_TRUE) { + data += len + 1; + processed_bytes += len + 1; + file->skip_next = FLB_FALSE; + continue; + } + + /* + * Empty line (just breakline) + * --------------------------- + * [NOTE] with the new Multiline core feature and Multiline Filter on + * Fluent Bit v1.8.2, there are a couple of cases where stack traces + * or multi line patterns expects an empty line (meaning only the + * breakline), skipping empty lines on this plugin will break that + * functionality. + * + * We are introducing 'skip_empty_lines=off' configuration + * property to revert this behavior if some user is affected by + * this change. + */ + + if (len == 0 && ctx->skip_empty_lines) { + data++; + processed_bytes++; + continue; + } + + /* Process '\r\n' */ + if (len >= 2) { + crlf = (data[len-1] == '\r'); + if (len == 1 && crlf) { + data += 2; + processed_bytes += 2; + continue; + } + } + + /* Reset time for each line */ + flb_time_zero(&out_time); + + line = data; + line_len = len - crlf; + repl_line = NULL; + + if (ctx->ml_ctx) { + ret = flb_ml_append_text(ctx->ml_ctx, + file->ml_stream_id, + &out_time, + line, + line_len); + goto go_next; + } + else if (ctx->docker_mode) { + ret = flb_tail_dmode_process_content(now, line, line_len, + &repl_line, &repl_line_len, + file, ctx); + if (ret >= 0) { + if (repl_line == line) { + repl_line = NULL; + } + else { + line = repl_line; + line_len = repl_line_len; + } + /* Skip normal parsers flow */ + goto go_next; + } + else { + flb_tail_dmode_flush(file, ctx); + } + } + +#ifdef FLB_HAVE_PARSER + if (ctx->parser) { + /* Common parser (non-multiline) */ + ret = flb_parser_do(ctx->parser, line, line_len, + &out_buf, &out_size, &out_time); + if (ret >= 0) { + if (flb_time_to_nanosec(&out_time) == 0L) { + flb_time_get(&out_time); + } + + /* If multiline is enabled, flush any buffered data */ + if (ctx->multiline == FLB_TRUE) { + flb_tail_mult_flush(file, ctx); + } + + flb_tail_pack_line_map(&out_time, + (char**) &out_buf, &out_size, file, + processed_bytes); + + flb_free(out_buf); + } + else { + /* Parser failed, pack raw text */ + flb_tail_file_pack_line(NULL, data, len, file, processed_bytes); + } + } + else if (ctx->multiline == FLB_TRUE) { + ret = flb_tail_mult_process_content(now, + line, line_len, + file, ctx, processed_bytes); + + /* No multiline */ + if (ret == FLB_TAIL_MULT_NA) { + flb_tail_mult_flush(file, ctx); + + flb_tail_file_pack_line(NULL, + line, line_len, file, processed_bytes); + } + else if (ret == FLB_TAIL_MULT_MORE) { + /* we need more data, do nothing */ + goto go_next; + } + else if (ret == FLB_TAIL_MULT_DONE) { + /* Finalized */ + } + } + else { + flb_tail_file_pack_line(NULL, + line, line_len, file, processed_bytes); + } +#else + flb_tail_file_pack_line(NULL, + line, line_len, file, processed_bytes); +#endif + + go_next: + flb_free(repl_line); + repl_line = NULL; + /* Adjust counters */ + data += len + 1; + processed_bytes += len + 1; + lines++; + file->parsed = 0; + file->last_processed_bytes += processed_bytes; + } + file->parsed = file->buf_len; + + if (lines > 0) { + /* Append buffer content to a chunk */ + *bytes = processed_bytes; + + if (file->sl_log_event_encoder->output_length > 0) { + flb_input_log_append_records(ctx->ins, + lines, + file->tag_buf, + file->tag_len, + file->sl_log_event_encoder->output_buffer, + file->sl_log_event_encoder->output_length); + + flb_log_event_encoder_reset(file->sl_log_event_encoder); + } + } + else if (file->skip_next) { + *bytes = file->buf_len; + } + else { + *bytes = processed_bytes; + } + + if (ctx->ml_ctx) { + ml_stream_buffer_flush(ctx, file); + } + + return lines; +} + +static inline void drop_bytes(char *buf, size_t len, int pos, int bytes) +{ + memmove(buf + pos, + buf + pos + bytes, + len - pos - bytes); +} + +#ifdef FLB_HAVE_REGEX +static void cb_results(const char *name, const char *value, + size_t vlen, void *data) +{ + struct flb_hash_table *ht = data; + + if (vlen == 0) { + return; + } + + flb_hash_table_add(ht, name, strlen(name), (void *) value, vlen); +} +#endif + +#ifdef FLB_HAVE_REGEX +static int tag_compose(char *tag, struct flb_regex *tag_regex, char *fname, + char *out_buf, size_t *out_size, + struct flb_tail_config *ctx) +#else +static int tag_compose(char *tag, char *fname, char *out_buf, size_t *out_size, + struct flb_tail_config *ctx) +#endif +{ + int i; + size_t len; + char *p; + size_t buf_s = 0; +#ifdef FLB_HAVE_REGEX + ssize_t n; + struct flb_regex_search result; + struct flb_hash_table *ht; + char *beg; + char *end; + int ret; + const char *tmp; + size_t tmp_s; +#endif + +#ifdef FLB_HAVE_REGEX + if (tag_regex) { + n = flb_regex_do(tag_regex, fname, strlen(fname), &result); + if (n <= 0) { + flb_plg_error(ctx->ins, "invalid tag_regex pattern for file %s", + fname); + return -1; + } + else { + ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, + FLB_HASH_TABLE_SIZE, FLB_HASH_TABLE_SIZE); + flb_regex_parse(tag_regex, &result, cb_results, ht); + + for (p = tag, beg = p; (beg = strchr(p, '<')); p = end + 2) { + if (beg != p) { + len = (beg - p); + memcpy(out_buf + buf_s, p, len); + buf_s += len; + } + + beg++; + + end = strchr(beg, '>'); + if (end && !memchr(beg, '<', end - beg)) { + end--; + + len = end - beg + 1; + ret = flb_hash_table_get(ht, beg, len, (void *) &tmp, &tmp_s); + if (ret != -1) { + memcpy(out_buf + buf_s, tmp, tmp_s); + buf_s += tmp_s; + } + else { + memcpy(out_buf + buf_s, "_", 1); + buf_s++; + } + } + else { + flb_plg_error(ctx->ins, + "missing closing angle bracket in tag %s " + "at position %lu", tag, beg - tag); + flb_hash_table_destroy(ht); + return -1; + } + } + + flb_hash_table_destroy(ht); + if (*p) { + len = strlen(p); + memcpy(out_buf + buf_s, p, len); + buf_s += len; + } + } + } + else { +#endif + p = strchr(tag, '*'); + if (!p) { + return -1; + } + + /* Copy tag prefix if any */ + len = (p - tag); + if (len > 0) { + memcpy(out_buf, tag, len); + buf_s += len; + } + + /* Append file name */ + len = strlen(fname); + memcpy(out_buf + buf_s, fname, len); + buf_s += len; + + /* Tag suffix (if any) */ + p++; + if (*p) { + len = strlen(tag); + memcpy(out_buf + buf_s, p, (len - (p - tag))); + buf_s += (len - (p - tag)); + } + + /* Sanitize buffer */ + for (i = 0; i < buf_s; i++) { + if (out_buf[i] == '/' || out_buf[i] == '\\' || out_buf[i] == ':') { + if (i > 0) { + out_buf[i] = '.'; + } + else { + drop_bytes(out_buf, buf_s, i, 1); + buf_s--; + i--; + } + } + + if (i > 0 && out_buf[i] == '.') { + if (out_buf[i - 1] == '.') { + drop_bytes(out_buf, buf_s, i, 1); + buf_s--; + i--; + } + } + else if (out_buf[i] == '*') { + drop_bytes(out_buf, buf_s, i, 1); + buf_s--; + i--; + } + } + + /* Check for an ending '.' */ + if (out_buf[buf_s - 1] == '.') { + drop_bytes(out_buf, buf_s, buf_s - 1, 1); + buf_s--; + } +#ifdef FLB_HAVE_REGEX + } +#endif + + out_buf[buf_s] = '\0'; + *out_size = buf_s; + + return 0; +} + +static inline int flb_tail_file_exists(struct stat *st, + struct flb_tail_config *ctx) +{ + int ret; + uint64_t hash; + + ret = stat_to_hash_bits(ctx, st, &hash); + if (ret != 0) { + return -1; + } + + /* static hash */ + if (flb_hash_table_exists(ctx->static_hash, hash)) { + return FLB_TRUE; + } + + /* event hash */ + if (flb_hash_table_exists(ctx->event_hash, hash)) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +/* + * Based in the configuration or database offset, set the proper 'offset' for the + * file in question. + */ +static int set_file_position(struct flb_tail_config *ctx, + struct flb_tail_file *file) +{ + int64_t ret; + +#ifdef FLB_HAVE_SQLDB + /* + * If the database option is enabled, try to gather the file position. The + * database function updates the file->offset entry. + */ + if (ctx->db) { + ret = flb_tail_db_file_set(file, ctx); + if (ret == 0) { + if (file->offset > 0) { + ret = lseek(file->fd, file->offset, SEEK_SET); + if (ret == -1) { + flb_errno(); + return -1; + } + } + else if (ctx->read_from_head == FLB_FALSE) { + ret = lseek(file->fd, 0, SEEK_END); + if (ret == -1) { + flb_errno(); + return -1; + } + file->offset = ret; + flb_tail_db_file_offset(file, ctx); + } + return 0; + } + } +#endif + + if (ctx->read_from_head == FLB_TRUE) { + /* no need to seek, offset position is already zero */ + return 0; + } + + /* tail... */ + ret = lseek(file->fd, 0, SEEK_END); + if (ret == -1) { + flb_errno(); + return -1; + } + file->offset = ret; + + return 0; +} + +/* Multiline flush callback: invoked every time some content is complete */ +static int ml_flush_callback(struct flb_ml_parser *parser, + struct flb_ml_stream *mst, + void *data, char *buf_data, size_t buf_size) +{ + int result; + size_t mult_size = 0; + char *mult_buf = NULL; + struct flb_tail_file *file = data; + struct flb_tail_config *ctx = file->config; + + if (ctx->path_key == NULL && ctx->offset_key == NULL) { + ml_stream_buffer_append(file, buf_data, buf_size); + } + else { + /* adjust the records in a new buffer */ + result = record_append_custom_keys(file, + buf_data, + buf_size, + &mult_buf, + &mult_size); + + if (result < 0) { + ml_stream_buffer_append(file, buf_data, buf_size); + } + else { + ml_stream_buffer_append(file, mult_buf, mult_size); + + flb_free(mult_buf); + } + } + + if (mst->forced_flush) { + ml_stream_buffer_flush(ctx, file); + } + + return 0; +} + +int flb_tail_file_append(char *path, struct stat *st, int mode, + struct flb_tail_config *ctx) +{ + int fd; + int ret; + uint64_t stream_id; + uint64_t ts; + uint64_t hash_bits; + flb_sds_t hash_key; + size_t len; + char *tag; + char *name; + size_t tag_len; + struct flb_tail_file *file; + struct stat lst; + flb_sds_t inode_str; + + if (!S_ISREG(st->st_mode)) { + return -1; + } + + if (flb_tail_file_exists(st, ctx) == FLB_TRUE) { + return -1; + } + + fd = open(path, O_RDONLY); + if (fd == -1) { + flb_errno(); + flb_plg_error(ctx->ins, "cannot open %s", path); + return -1; + } + + file = flb_calloc(1, sizeof(struct flb_tail_file)); + if (!file) { + flb_errno(); + goto error; + } + + /* Initialize */ + file->watch_fd = -1; + file->fd = fd; + + /* On non-windows environments check if the original path is a link */ + ret = lstat(path, &lst); + if (ret == 0) { + if (S_ISLNK(lst.st_mode)) { + file->is_link = FLB_TRUE; + file->link_inode = lst.st_ino; + } + } + + /* get unique hash for this file */ + ret = stat_to_hash_bits(ctx, st, &hash_bits); + if (ret != 0) { + flb_plg_error(ctx->ins, "error procesisng hash bits for file %s", path); + goto error; + } + file->hash_bits = hash_bits; + + /* store the hash key used for hash_bits */ + ret = stat_to_hash_key(ctx, st, &hash_key); + if (ret != 0) { + flb_plg_error(ctx->ins, "error procesisng hash key for file %s", path); + goto error; + } + file->hash_key = hash_key; + + file->inode = st->st_ino; + file->offset = 0; + file->size = st->st_size; + file->buf_len = 0; + file->parsed = 0; + file->config = ctx; + file->tail_mode = mode; + file->tag_len = 0; + file->tag_buf = NULL; + file->rotated = 0; + file->pending_bytes = 0; + file->mult_firstline = FLB_FALSE; + file->mult_keys = 0; + file->mult_flush_timeout = 0; + file->mult_skipping = FLB_FALSE; + + /* + * Duplicate string into 'file' structure, the called function + * take cares to resolve real-name of the file in case we are + * running in a non-Linux system. + * + * Depending of the operating system, the way to obtain the file + * name associated to it file descriptor can have different behaviors + * specifically if it root path it's under a symbolic link. On Linux + * we can trust the file name but in others it's better to solve it + * with some extra calls. + */ + ret = flb_tail_file_name_dup(path, file); + if (!file->name) { + flb_errno(); + goto error; + } + + /* We keep a copy of the initial filename in orig_name. This is required + * for path_key to continue working after rotation. */ + file->orig_name = flb_strdup(file->name); + if (!file->orig_name) { + flb_free(file->name); + flb_errno(); + goto error; + } + file->orig_name_len = file->name_len; + + /* multiline msgpack buffers */ + file->mult_records = 0; + msgpack_sbuffer_init(&file->mult_sbuf); + msgpack_packer_init(&file->mult_pck, &file->mult_sbuf, + msgpack_sbuffer_write); + + /* docker mode */ + file->dmode_flush_timeout = 0; + file->dmode_complete = true; + file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0); + file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0); + file->dmode_firstline = false; +#ifdef FLB_HAVE_SQLDB + file->db_id = 0; +#endif + file->skip_next = FLB_FALSE; + file->skip_warn = FLB_FALSE; + + /* Multiline core mode */ + if (ctx->ml_ctx) { + /* + * Create inode str to get stream_id. + * + * If stream_id is created by filename, + * it will be same after file rotation and it causes invalid destruction: + * + * - https://github.com/fluent/fluent-bit/issues/4190 + */ + inode_str = flb_sds_create_size(64); + flb_sds_printf(&inode_str, "%"PRIu64, file->inode); + + /* Create a stream for this file */ + ret = flb_ml_stream_create(ctx->ml_ctx, + inode_str, flb_sds_len(inode_str), + ml_flush_callback, file, + &stream_id); + if (ret != 0) { + flb_plg_error(ctx->ins, + "could not create multiline stream for file: %s", + inode_str); + flb_sds_destroy(inode_str); + goto error; + } + file->ml_stream_id = stream_id; + flb_sds_destroy(inode_str); + + /* + * Multiline core file buffer: the multiline core functionality invokes a callback everytime a message is ready + * to be processed by the caller, this can be a multiline message or a message that is considered 'complete'. In + * the previous version of Tail, when it received a message this message was automatically ingested into the pipeline + * without any previous buffering which leads to performance degradation. + * + * The msgpack buffer 'ml_sbuf' keeps all ML provided records and it's flushed just when the file processor finish + * processing the "read() bytes". + */ + } + + /* Local buffer */ + file->buf_size = ctx->buf_chunk_size; + file->buf_data = flb_malloc(file->buf_size); + if (!file->buf_data) { + flb_errno(); + goto error; + } + + /* Initialize (optional) dynamic tag */ + if (ctx->dynamic_tag == FLB_TRUE) { + len = ctx->ins->tag_len + strlen(path) + 1; + tag = flb_malloc(len); + if (!tag) { + flb_errno(); + flb_plg_error(ctx->ins, "failed to allocate tag buffer"); + goto error; + } +#ifdef FLB_HAVE_REGEX + ret = tag_compose(ctx->ins->tag, ctx->tag_regex, path, tag, &tag_len, ctx); +#else + ret = tag_compose(ctx->ins->tag, path, tag, &tag_len, ctx); +#endif + if (ret == 0) { + file->tag_len = tag_len; + file->tag_buf = flb_strdup(tag); + } + flb_free(tag); + if (ret != 0) { + flb_plg_error(ctx->ins, "failed to compose tag for file: %s", path); + goto error; + } + } + else { + file->tag_len = strlen(ctx->ins->tag); + file->tag_buf = flb_strdup(ctx->ins->tag); + } + if (!file->tag_buf) { + flb_plg_error(ctx->ins, "failed to set tag for file: %s", path); + flb_errno(); + goto error; + } + + if (mode == FLB_TAIL_STATIC) { + mk_list_add(&file->_head, &ctx->files_static); + ctx->files_static_count++; + flb_hash_table_add(ctx->static_hash, file->hash_key, flb_sds_len(file->hash_key), + file, sizeof(file)); + tail_signal_manager(file->config); + } + else if (mode == FLB_TAIL_EVENT) { + mk_list_add(&file->_head, &ctx->files_event); + flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key), + file, sizeof(file)); + + /* Register this file into the fs_event monitoring */ + ret = flb_tail_fs_add(ctx, file); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not register file into fs_events"); + goto error; + } + } + + /* Set the file position (database offset, head or tail) */ + ret = set_file_position(ctx, file); + if (ret == -1) { + flb_tail_file_remove(file); + goto error; + } + + /* Remaining bytes to read */ + file->pending_bytes = file->size - file->offset; + +#ifdef FLB_HAVE_METRICS + name = (char *) flb_input_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_files_opened, ts, 1, (char *[]) {name}); + + /* Old api */ + flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics); +#endif + + file->sl_log_event_encoder = flb_log_event_encoder_create( + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (file->sl_log_event_encoder == NULL) { + flb_tail_file_remove(file); + + goto error; + } + + file->ml_log_event_encoder = flb_log_event_encoder_create( + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (file->ml_log_event_encoder == NULL) { + flb_tail_file_remove(file); + + goto error; + } + + flb_plg_debug(ctx->ins, + "inode=%"PRIu64" with offset=%"PRId64" appended as %s", + file->inode, file->offset, path); + return 0; + +error: + if (file) { + if (file->buf_data) { + flb_free(file->buf_data); + } + if (file->name) { + flb_free(file->name); + } + flb_free(file); + } + close(fd); + + return -1; +} + +void flb_tail_file_remove(struct flb_tail_file *file) +{ + uint64_t ts; + char *name; + struct flb_tail_config *ctx; + + ctx = file->config; + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" removing file name %s", + file->inode, file->name); + + /* remove the multiline.core stream */ + if (ctx->ml_ctx && file->ml_stream_id > 0) { + /* destroy ml stream */ + flb_ml_stream_id_destroy_all(ctx->ml_ctx, file->ml_stream_id); + } + + if (file->rotated > 0) { +#ifdef FLB_HAVE_SQLDB + /* + * Make sure to remove a the file entry from the database if the file + * was rotated and it's not longer being monitored. + */ + if (ctx->db) { + flb_tail_db_file_delete(file, file->config); + } +#endif + mk_list_del(&file->_rotate_head); + } + + msgpack_sbuffer_destroy(&file->mult_sbuf); + + if (file->sl_log_event_encoder != NULL) { + flb_log_event_encoder_destroy(file->sl_log_event_encoder); + } + + if (file->ml_log_event_encoder != NULL) { + flb_log_event_encoder_destroy(file->ml_log_event_encoder); + } + + flb_sds_destroy(file->dmode_buf); + flb_sds_destroy(file->dmode_lastline); + mk_list_del(&file->_head); + flb_tail_fs_remove(ctx, file); + + /* avoid deleting file with -1 fd */ + if (file->fd != -1) { + close(file->fd); + } + if (file->tag_buf) { + flb_free(file->tag_buf); + } + + /* remove any potential entry from the hash tables */ + flb_hash_table_del(ctx->static_hash, file->hash_key); + flb_hash_table_del(ctx->event_hash, file->hash_key); + + flb_free(file->buf_data); + flb_free(file->name); + flb_free(file->orig_name); + flb_free(file->real_name); + flb_sds_destroy(file->hash_key); + +#ifdef FLB_HAVE_METRICS + name = (char *) flb_input_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_files_closed, ts, 1, (char *[]) {name}); + + /* old api */ + flb_metrics_sum(FLB_TAIL_METRIC_F_CLOSED, 1, ctx->ins->metrics); +#endif + + flb_free(file); +} + +int flb_tail_file_remove_all(struct flb_tail_config *ctx) +{ + int count = 0; + struct mk_list *head; + struct mk_list *tmp; + struct flb_tail_file *file; + + mk_list_foreach_safe(head, tmp, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + flb_tail_file_remove(file); + count++; + } + + mk_list_foreach_safe(head, tmp, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + flb_tail_file_remove(file); + count++; + } + + return count; +} + +static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *file) +{ + int ret; + int64_t offset; + struct stat st; + + ret = fstat(file->fd, &st); + if (ret == -1) { + flb_errno(); + return FLB_TAIL_ERROR; + } + + /* Check if the file was truncated */ + if (file->offset > st.st_size) { + offset = lseek(file->fd, 0, SEEK_SET); + if (offset == -1) { + flb_errno(); + return FLB_TAIL_ERROR; + } + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s", + file->inode, file->name); + file->offset = offset; + file->buf_len = 0; + + /* Update offset in the database file */ +#ifdef FLB_HAVE_SQLDB + if (ctx->db) { + flb_tail_db_file_offset(file, ctx); + } +#endif + } + else { + file->size = st.st_size; + file->pending_bytes = (st.st_size - file->offset); + } + + return FLB_TAIL_OK; +} + +int flb_tail_file_chunk(struct flb_tail_file *file) +{ + int ret; + char *tmp; + size_t size; + size_t capacity; + size_t processed_bytes; + ssize_t bytes; + struct flb_tail_config *ctx; + + /* Check if we the engine issued a pause */ + ctx = file->config; + if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { + return FLB_TAIL_BUSY; + } + + capacity = (file->buf_size - file->buf_len) - 1; + if (capacity < 1) { + /* + * If there is no more room for more data, try to increase the + * buffer under the limit of buffer_max_size. + */ + if (file->buf_size >= ctx->buf_max_size) { + if (ctx->skip_long_lines == FLB_FALSE) { + flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, " + "lines are too long. Skipping file.", file->name); + return FLB_TAIL_ERROR; + } + + /* Warn the user */ + if (file->skip_warn == FLB_FALSE) { + flb_plg_warn(ctx->ins, "file=%s have long lines. " + "Skipping long lines.", file->name); + file->skip_warn = FLB_TRUE; + } + + /* Do buffer adjustments */ + file->offset += file->buf_len; + file->buf_len = 0; + file->skip_next = FLB_TRUE; + } + else { + size = file->buf_size + ctx->buf_chunk_size; + if (size > ctx->buf_max_size) { + size = ctx->buf_max_size; + } + + /* Increase the buffer size */ + tmp = flb_realloc(file->buf_data, size); + if (tmp) { + flb_plg_trace(ctx->ins, "file=%s increase buffer size " + "%lu => %lu bytes", + file->name, file->buf_size, size); + file->buf_data = tmp; + file->buf_size = size; + } + else { + flb_errno(); + flb_plg_error(ctx->ins, "cannot increase buffer size for %s, " + "skipping file.", file->name); + return FLB_TAIL_ERROR; + } + } + capacity = (file->buf_size - file->buf_len) - 1; + } + + bytes = read(file->fd, file->buf_data + file->buf_len, capacity); + if (bytes > 0) { + /* we read some data, let the content processor take care of it */ + file->buf_len += bytes; + file->buf_data[file->buf_len] = '\0'; + + /* Now that we have some data in the buffer, call the data processor + * which aims to cut lines and register the entries into the engine. + * + * The returned value is the absolute offset the file must be seek + * now. It may need to get back a few bytes at the beginning of a new + * line. + */ + ret = process_content(file, &processed_bytes); + if (ret < 0) { + flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR", + file->inode, file->name); + return FLB_TAIL_ERROR; + } + + /* Adjust the file offset and buffer */ + file->offset += processed_bytes; + consume_bytes(file->buf_data, processed_bytes, file->buf_len); + file->buf_len -= processed_bytes; + file->buf_data[file->buf_len] = '\0'; + +#ifdef FLB_HAVE_SQLDB + if (file->config->db) { + flb_tail_db_file_offset(file, file->config); + } +#endif + + /* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */ + ret = adjust_counters(ctx, file); + + /* Data was consumed but likely some bytes still remain */ + return ret; + } + else if (bytes == 0) { + /* We reached the end of file, let's wait for some incoming data */ + ret = adjust_counters(ctx, file); + if (ret == FLB_TAIL_OK) { + return FLB_TAIL_WAIT; + } + else { + return FLB_TAIL_ERROR; + } + } + else { + /* error */ + flb_errno(); + flb_plg_error(ctx->ins, "error reading %s", file->name); + return FLB_TAIL_ERROR; + } + + return FLB_TAIL_ERROR; +} + +/* Returns FLB_TRUE if a file has been rotated, otherwise FLB_FALSE */ +int flb_tail_file_is_rotated(struct flb_tail_config *ctx, + struct flb_tail_file *file) +{ + int ret; + char *name; + struct stat st; + + /* + * Do not double-check already rotated files since the caller of this + * function will trigger a rotation. + */ + if (file->rotated != 0) { + return FLB_FALSE; + } + + /* Check if the 'original monitored file' is a link and rotated */ + if (file->is_link == FLB_TRUE) { + ret = lstat(file->name, &st); + if (ret == -1) { + /* Broken link or missing file */ + if (errno == ENOENT) { + flb_plg_info(ctx->ins, "inode=%"PRIu64" link_rotated: %s", + file->link_inode, file->name); + return FLB_TRUE; + } + else { + flb_errno(); + flb_plg_error(ctx->ins, + "link_inode=%"PRIu64" cannot detect if file: %s", + file->link_inode, file->name); + return -1; + } + } + else { + /* The file name is there, check if the same that we have */ + if (st.st_ino != file->link_inode) { + return FLB_TRUE; + } + } + } + + /* Retrieve the real file name, operating system lookup */ + name = flb_tail_file_name(file); + if (!name) { + flb_plg_error(ctx->ins, + "inode=%"PRIu64" cannot detect if file was rotated: %s", + file->inode, file->name); + return -1; + } + + + /* Get stats from the file name */ + ret = stat(name, &st); + if (ret == -1) { + flb_errno(); + flb_free(name); + return -1; + } + + /* Compare inodes and names */ + if (file->inode == st.st_ino && + flb_tail_target_file_name_cmp(name, file) == 0) { + flb_free(name); + return FLB_FALSE; + } + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated: %s => %s", + file->inode, file->name, name); + + flb_free(name); + return FLB_TRUE; +} + +/* Promote a event in the static list to the dynamic 'events' interface */ +int flb_tail_file_to_event(struct flb_tail_file *file) +{ + int ret; + struct stat st; + struct flb_tail_config *ctx = file->config; + + /* Check if the file promoted have pending bytes */ + ret = fstat(file->fd, &st); + if (ret != 0) { + flb_errno(); + return -1; + } + + if (file->offset < st.st_size) { + file->pending_bytes = (st.st_size - file->offset); + tail_signal_pending(file->config); + } + else { + file->pending_bytes = 0; + } + + /* Check if the file has been rotated */ + ret = flb_tail_file_is_rotated(ctx, file); + if (ret == FLB_TRUE) { + flb_tail_file_rotated(file); + } + + /* Notify the fs-event handler that we will start monitoring this 'file' */ + ret = flb_tail_fs_add(ctx, file); + if (ret == -1) { + return -1; + } + + /* List swap: change from 'static' to 'event' list */ + mk_list_del(&file->_head); + ctx->files_static_count--; + flb_hash_table_del(ctx->static_hash, file->hash_key); + + mk_list_add(&file->_head, &file->config->files_event); + flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key), + file, sizeof(file)); + + file->tail_mode = FLB_TAIL_EVENT; + + return 0; +} + +/* + * Given an open file descriptor, return the filename. This function is a + * bit slow and it aims to be used only when a file is rotated. + */ +char *flb_tail_file_name(struct flb_tail_file *file) +{ + int ret; + char *buf; +#ifdef __linux__ + ssize_t s; + char tmp[128]; +#elif defined(__APPLE__) + char path[PATH_MAX]; +#elif defined(FLB_SYSTEM_WINDOWS) + HANDLE h; +#elif defined(FLB_SYSTEM_FREEBSD) + struct kinfo_file *file_entries; + int file_count; + int file_index; +#endif + + buf = flb_malloc(PATH_MAX); + if (!buf) { + flb_errno(); + return NULL; + } + +#ifdef __linux__ + ret = snprintf(tmp, sizeof(tmp) - 1, "/proc/%i/fd/%i", getpid(), file->fd); + if (ret == -1) { + flb_errno(); + flb_free(buf); + return NULL; + } + + s = readlink(tmp, buf, PATH_MAX); + if (s == -1) { + flb_free(buf); + flb_errno(); + return NULL; + } + buf[s] = '\0'; + +#elif __APPLE__ + int len; + + ret = fcntl(file->fd, F_GETPATH, path); + if (ret == -1) { + flb_errno(); + flb_free(buf); + return NULL; + } + + len = strlen(path); + memcpy(buf, path, len); + buf[len] = '\0'; + +#elif defined(FLB_SYSTEM_WINDOWS) + int len; + + h = (HANDLE) _get_osfhandle(file->fd); + if (h == INVALID_HANDLE_VALUE) { + flb_errno(); + flb_free(buf); + return NULL; + } + + /* This function returns the length of the string excluding "\0" + * and the resulting path has a "\\?\" prefix. + */ + len = GetFinalPathNameByHandleA(h, buf, PATH_MAX, FILE_NAME_NORMALIZED); + if (len == 0 || len >= PATH_MAX) { + flb_free(buf); + return NULL; + } + + if (strstr(buf, "\\\\?\\")) { + memmove(buf, buf + 4, len + 1); + } +#elif defined(FLB_SYSTEM_FREEBSD) + if ((file_entries = kinfo_getfile(getpid(), &file_count)) == NULL) { + flb_free(buf); + return NULL; + } + + for (file_index=0; file_index < file_count; file_index++) { + if (file_entries[file_index].kf_fd == file->fd) { + strncpy(buf, file_entries[file_index].kf_path, PATH_MAX - 1); + buf[PATH_MAX - 1] = 0; + break; + } + } + free(file_entries); +#endif + return buf; +} + +int flb_tail_file_name_dup(char *path, struct flb_tail_file *file) +{ + file->name = flb_strdup(path); + if (!file->name) { + flb_errno(); + return -1; + } + file->name_len = strlen(file->name); + + if (file->real_name) { + flb_free(file->real_name); + } + + file->real_name = flb_tail_file_name(file); + if (!file->real_name) { + flb_errno(); + flb_free(file->name); + file->name = NULL; + return -1; + } + + return 0; +} + +/* Invoked every time a file was rotated */ +int flb_tail_file_rotated(struct flb_tail_file *file) +{ + int ret; + uint64_t ts; + char *name; + char *i_name; + char *tmp; + struct stat st; + struct flb_tail_config *ctx = file->config; + + /* Get the new file name */ + name = flb_tail_file_name(file); + if (!name) { + return -1; + } + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated %s -> %s", + file->inode, file->name, name); + + /* Update local file entry */ + tmp = file->name; + flb_tail_file_name_dup(name, file); + flb_plg_info(ctx->ins, "inode=%"PRIu64" handle rotation(): %s => %s", + file->inode, tmp, file->name); + if (file->rotated == 0) { + file->rotated = time(NULL); + mk_list_add(&file->_rotate_head, &file->config->files_rotated); + + /* Rotate the file in the database */ +#ifdef FLB_HAVE_SQLDB + if (file->config->db) { + ret = flb_tail_db_file_rotate(name, file, file->config); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not rotate file %s->%s in database", + file->name, name); + } + } +#endif + +#ifdef FLB_HAVE_METRICS + i_name = (char *) flb_input_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_files_rotated, ts, 1, (char *[]) {i_name}); + + /* OLD api */ + flb_metrics_sum(FLB_TAIL_METRIC_F_ROTATED, + 1, file->config->ins->metrics); +#endif + + /* Check if a new file has been created */ + ret = stat(tmp, &st); + if (ret == 0 && st.st_ino != file->inode) { + if (flb_tail_file_exists(&st, ctx) == FLB_FALSE) { + ret = flb_tail_file_append(tmp, &st, FLB_TAIL_STATIC, ctx); + if (ret == -1) { + flb_tail_scan(ctx->path_list, ctx); + } + else { + tail_signal_manager(file->config); + } + } + } + } + flb_free(tmp); + flb_free(name); + + return 0; +} + +static int check_purge_deleted_file(struct flb_tail_config *ctx, + struct flb_tail_file *file, time_t ts) +{ + int ret; + struct stat st; + + ret = fstat(file->fd, &st); + if (ret == -1) { + flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name); + flb_tail_file_remove(file); + return FLB_TRUE; + } + + if (st.st_nlink == 0) { + flb_plg_debug(ctx->ins, "purge: monitored file has been deleted: %s", + file->name); +#ifdef FLB_HAVE_SQLDB + if (ctx->db) { + /* Remove file entry from the database */ + flb_tail_db_file_delete(file, file->config); + } +#endif + /* Remove file from the monitored list */ + flb_tail_file_remove(file); + return FLB_TRUE; + } + + return FLB_FALSE; +} + +/* Purge rotated and deleted files */ +int flb_tail_file_purge(struct flb_input_instance *ins, + struct flb_config *config, void *context) +{ + int ret; + int count = 0; + struct mk_list *tmp; + struct mk_list *head; + struct flb_tail_file *file; + struct flb_tail_config *ctx = context; + time_t now; + struct stat st; + + /* Rotated files */ + now = time(NULL); + mk_list_foreach_safe(head, tmp, &ctx->files_rotated) { + file = mk_list_entry(head, struct flb_tail_file, _rotate_head); + if ((file->rotated + ctx->rotate_wait) <= now) { + ret = fstat(file->fd, &st); + if (ret == 0) { + flb_plg_debug(ctx->ins, + "inode=%"PRIu64" purge rotated file %s " \ + "(offset=%"PRId64" / size = %"PRIu64")", + file->inode, file->name, file->offset, (uint64_t)st.st_size); + if (file->pending_bytes > 0 && flb_input_buf_paused(ins)) { + flb_plg_warn(ctx->ins, "purged rotated file while data " + "ingestion is paused, consider increasing " + "rotate_wait"); + } + } + else { + flb_plg_debug(ctx->ins, + "inode=%"PRIu64" purge rotated file %s (offset=%"PRId64")", + file->inode, file->name, file->offset); + } + + flb_tail_file_remove(file); + count++; + } + } + + /* + * Deleted files: under high load scenarios, exists the chances that in + * our event loop we miss some notifications about a file. In order to + * sanitize our list of monitored files we will iterate all of them and check + * if they have been deleted or not. + */ + mk_list_foreach_safe(head, tmp, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + check_purge_deleted_file(ctx, file, now); + } + mk_list_foreach_safe(head, tmp, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + check_purge_deleted_file(ctx, file, now); + } + + return count; +} |