diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/plugins/in_tail/tail_file.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/in_tail/tail_file.c')
-rw-r--r-- | fluent-bit/plugins/in_tail/tail_file.c | 1860 |
1 files changed, 0 insertions, 1860 deletions
diff --git a/fluent-bit/plugins/in_tail/tail_file.c b/fluent-bit/plugins/in_tail/tail_file.c deleted file mode 100644 index 2385f0626..000000000 --- a/fluent-bit/plugins/in_tail/tail_file.c +++ /dev/null @@ -1,1860 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> -#include <time.h> -#ifdef FLB_SYSTEM_FREEBSD -#include <sys/user.h> -#include <libutil.h> -#endif - -#include <fluent-bit/flb_compat.h> -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_parser.h> -#ifdef FLB_HAVE_REGEX -#include <fluent-bit/flb_regex.h> -#include <fluent-bit/flb_hash_table.h> -#endif - -#include "tail.h" -#include "tail_file.h" -#include "tail_config.h" -#include "tail_db.h" -#include "tail_signal.h" -#include "tail_dockermode.h" -#include "tail_multiline.h" -#include "tail_scan.h" - -#ifdef FLB_SYSTEM_WINDOWS -#include "win32.h" -#endif - -#include <cfl/cfl.h> - -static inline void consume_bytes(char *buf, int bytes, int length) -{ - memmove(buf, buf + bytes, length - bytes); -} - -static uint64_t stat_get_st_dev(struct stat *st) -{ -#ifdef FLB_SYSTEM_WINDOWS - /* do you want to contribute with a way to extract volume serial number ? */ - return 0; -#else - return st->st_dev; -#endif -} - -static int stat_to_hash_bits(struct flb_tail_config *ctx, struct stat *st, - uint64_t *out_hash) -{ - int len; - uint64_t st_dev; - char tmp[64]; - - st_dev = stat_get_st_dev(st); - - len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 ":%" PRIu64, - st_dev, (uint64_t)st->st_ino); - - *out_hash = cfl_hash_64bits(tmp, len); - return 0; -} - -static int stat_to_hash_key(struct flb_tail_config *ctx, struct stat *st, - flb_sds_t *key) -{ - uint64_t st_dev; - flb_sds_t tmp; - flb_sds_t buf; - - buf = flb_sds_create_size(64); - if (!buf) { - return -1; - } - - st_dev = stat_get_st_dev(st); - tmp = flb_sds_printf(&buf, "%" PRIu64 ":%" PRIu64, - st_dev, (uint64_t)st->st_ino); - if (!tmp) { - flb_sds_destroy(buf); - return -1; - } - - *key = buf; - return 0; -} - -/* Append custom keys and report the number of records processed */ -static int record_append_custom_keys(struct flb_tail_file *file, - char *in_data, size_t in_size, - char **out_data, size_t *out_size) -{ - int i; - int ret; - int records = 0; - msgpack_object k; - msgpack_object v; - struct flb_log_event event; - struct flb_tail_config *ctx; - struct flb_log_event_encoder encoder; - struct flb_log_event_decoder decoder; - - ctx = (struct flb_tail_config *) file->config; - - ret = flb_log_event_decoder_init(&decoder, in_data, in_size); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - return -1; - } - - ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_log_event_decoder_destroy(&decoder); - - return -2; - } - - while (flb_log_event_decoder_next(&decoder, &event) == - FLB_EVENT_DECODER_SUCCESS) { - - ret = flb_log_event_encoder_begin_record(&encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp(&encoder, &event.timestamp); - } - - /* append previous map keys */ - for (i = 0; i < event.body->via.map.size; i++) { - k = event.body->via.map.ptr[i].key; - v = event.body->via.map.ptr[i].val; - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_msgpack_object( - &encoder, - &k); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_msgpack_object( - &encoder, - &v); - } - } - - /* path_key */ - if (ctx->path_key != NULL) { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_cstring( - &encoder, - file->config->path_key); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_cstring( - &encoder, - file->orig_name); - } - } - - /* offset_key */ - if (ctx->offset_key != NULL) { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_cstring( - &encoder, - file->config->offset_key); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_uint64( - &encoder, - file->offset + - file->last_processed_bytes); - } - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&encoder); - } - else { - flb_plg_error(file->config->ins, "error packing event : %d", ret); - - flb_log_event_encoder_rollback_record(&encoder); - } - - /* counter */ - records++; - } - - *out_data = encoder.output_buffer; - *out_size = encoder.output_length; - - /* This function transfers ownership of the internal memory allocated by - * sbuffer using msgpack_sbuffer_release which means the caller is - * responsible for releasing the memory. - */ - flb_log_event_encoder_claim_internal_buffer_ownership(&encoder); - - flb_log_event_decoder_destroy(&decoder); - flb_log_event_encoder_destroy(&encoder); - - return records; -} - -static int flb_tail_repack_map(struct flb_log_event_encoder *encoder, - char *data, - size_t data_size) -{ - msgpack_unpacked source_map; - size_t offset; - int result; - size_t index; - msgpack_object value; - msgpack_object key; - - result = FLB_EVENT_ENCODER_SUCCESS; - - if (data_size > 0) { - msgpack_unpacked_init(&source_map); - - offset = 0; - result = msgpack_unpack_next(&source_map, - data, - data_size, - &offset); - - if (result == MSGPACK_UNPACK_SUCCESS) { - result = FLB_EVENT_ENCODER_SUCCESS; - } - else { - result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; - } - - for (index = 0; - index < source_map.data.via.map.size && - result == FLB_EVENT_ENCODER_SUCCESS; - index++) { - key = source_map.data.via.map.ptr[index].key; - value = source_map.data.via.map.ptr[index].val; - - result = flb_log_event_encoder_append_body_msgpack_object( - encoder, - &key); - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_msgpack_object( - encoder, - &value); - } - } - - msgpack_unpacked_destroy(&source_map); - } - - return result; -} - -int flb_tail_pack_line_map(struct flb_time *time, char **data, - size_t *data_size, struct flb_tail_file *file, - size_t processed_bytes) -{ - int result; - - result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder); - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_set_timestamp( - file->sl_log_event_encoder, time); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_tail_repack_map(file->sl_log_event_encoder, - *data, - *data_size); - } - - /* path_key */ - if (file->config->path_key != NULL) { - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_values( - file->sl_log_event_encoder, - FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key), - FLB_LOG_EVENT_STRING_VALUE(file->orig_name, - file->orig_name_len)); - } - } - - /* offset_key */ - if (file->config->offset_key != NULL) { - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_values( - file->sl_log_event_encoder, - FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key), - FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes)); - } - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder); - } - else { - flb_log_event_encoder_rollback_record(file->sl_log_event_encoder); - } - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(file->config->ins, "error packing event"); - - return -1; - } - - return 0; -} - -int flb_tail_file_pack_line(struct flb_time *time, char *data, size_t data_size, - struct flb_tail_file *file, size_t processed_bytes) -{ - int result; - - result = flb_log_event_encoder_begin_record(file->sl_log_event_encoder); - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_set_timestamp( - file->sl_log_event_encoder, time); - } - - /* path_key */ - if (file->config->path_key != NULL) { - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_values( - file->sl_log_event_encoder, - FLB_LOG_EVENT_CSTRING_VALUE(file->config->path_key), - FLB_LOG_EVENT_STRING_VALUE(file->orig_name, - file->orig_name_len)); - } - } - - /* offset_key */ - if (file->config->offset_key != NULL) { - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_values( - file->sl_log_event_encoder, - FLB_LOG_EVENT_CSTRING_VALUE(file->config->offset_key), - FLB_LOG_EVENT_UINT64_VALUE(file->offset + processed_bytes)); - } - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_append_body_values( - file->sl_log_event_encoder, - FLB_LOG_EVENT_CSTRING_VALUE(file->config->key), - FLB_LOG_EVENT_STRING_VALUE(data, - data_size)); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_commit_record(file->sl_log_event_encoder); - } - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(file->config->ins, "error packing event : %d", result); - - return -1; - } - - return 0; -} - -static int ml_stream_buffer_append(struct flb_tail_file *file, char *buf_data, size_t buf_size) -{ - int result; - - result = flb_log_event_encoder_emit_raw_record( - file->ml_log_event_encoder, - buf_data, buf_size); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(file->config->ins, - "log event raw append error : %d", - result); - - return -1; - } - - return 0; -} - -static int ml_stream_buffer_flush(struct flb_tail_config *ctx, struct flb_tail_file *file) -{ - if (file->ml_log_event_encoder->output_length > 0) { - flb_input_log_append(ctx->ins, - file->tag_buf, - file->tag_len, - file->ml_log_event_encoder->output_buffer, - file->ml_log_event_encoder->output_length); - - flb_log_event_encoder_reset(file->ml_log_event_encoder); - } - - return 0; -} - -static int process_content(struct flb_tail_file *file, size_t *bytes) -{ - size_t len; - int lines = 0; - int ret; - size_t processed_bytes = 0; - char *data; - char *end; - char *p; - void *out_buf; - size_t out_size; - int crlf; - char *line; - size_t line_len; - char *repl_line; - size_t repl_line_len; - time_t now = time(NULL); - struct flb_time out_time = {0}; - struct flb_tail_config *ctx; - - ctx = (struct flb_tail_config *) file->config; - - /* Parse the data content */ - data = file->buf_data; - end = data + file->buf_len; - - /* reset last processed bytes */ - file->last_processed_bytes = 0; - - /* Skip null characters from the head (sometimes introduced by copy-truncate log rotation) */ - while (data < end && *data == '\0') { - data++; - processed_bytes++; - } - - while (data < end && (p = memchr(data, '\n', end - data))) { - len = (p - data); - crlf = 0; - if (file->skip_next == FLB_TRUE) { - data += len + 1; - processed_bytes += len + 1; - file->skip_next = FLB_FALSE; - continue; - } - - /* - * Empty line (just breakline) - * --------------------------- - * [NOTE] with the new Multiline core feature and Multiline Filter on - * Fluent Bit v1.8.2, there are a couple of cases where stack traces - * or multi line patterns expects an empty line (meaning only the - * breakline), skipping empty lines on this plugin will break that - * functionality. - * - * We are introducing 'skip_empty_lines=off' configuration - * property to revert this behavior if some user is affected by - * this change. - */ - - if (len == 0 && ctx->skip_empty_lines) { - data++; - processed_bytes++; - continue; - } - - /* Process '\r\n' */ - if (len >= 2) { - crlf = (data[len-1] == '\r'); - if (len == 1 && crlf) { - data += 2; - processed_bytes += 2; - continue; - } - } - - /* Reset time for each line */ - flb_time_zero(&out_time); - - line = data; - line_len = len - crlf; - repl_line = NULL; - - if (ctx->ml_ctx) { - ret = flb_ml_append_text(ctx->ml_ctx, - file->ml_stream_id, - &out_time, - line, - line_len); - goto go_next; - } - else if (ctx->docker_mode) { - ret = flb_tail_dmode_process_content(now, line, line_len, - &repl_line, &repl_line_len, - file, ctx); - if (ret >= 0) { - if (repl_line == line) { - repl_line = NULL; - } - else { - line = repl_line; - line_len = repl_line_len; - } - /* Skip normal parsers flow */ - goto go_next; - } - else { - flb_tail_dmode_flush(file, ctx); - } - } - -#ifdef FLB_HAVE_PARSER - if (ctx->parser) { - /* Common parser (non-multiline) */ - ret = flb_parser_do(ctx->parser, line, line_len, - &out_buf, &out_size, &out_time); - if (ret >= 0) { - if (flb_time_to_nanosec(&out_time) == 0L) { - flb_time_get(&out_time); - } - - /* If multiline is enabled, flush any buffered data */ - if (ctx->multiline == FLB_TRUE) { - flb_tail_mult_flush(file, ctx); - } - - flb_tail_pack_line_map(&out_time, - (char**) &out_buf, &out_size, file, - processed_bytes); - - flb_free(out_buf); - } - else { - /* Parser failed, pack raw text */ - flb_tail_file_pack_line(NULL, data, len, file, processed_bytes); - } - } - else if (ctx->multiline == FLB_TRUE) { - ret = flb_tail_mult_process_content(now, - line, line_len, - file, ctx, processed_bytes); - - /* No multiline */ - if (ret == FLB_TAIL_MULT_NA) { - flb_tail_mult_flush(file, ctx); - - flb_tail_file_pack_line(NULL, - line, line_len, file, processed_bytes); - } - else if (ret == FLB_TAIL_MULT_MORE) { - /* we need more data, do nothing */ - goto go_next; - } - else if (ret == FLB_TAIL_MULT_DONE) { - /* Finalized */ - } - } - else { - flb_tail_file_pack_line(NULL, - line, line_len, file, processed_bytes); - } -#else - flb_tail_file_pack_line(NULL, - line, line_len, file, processed_bytes); -#endif - - go_next: - flb_free(repl_line); - repl_line = NULL; - /* Adjust counters */ - data += len + 1; - processed_bytes += len + 1; - lines++; - file->parsed = 0; - file->last_processed_bytes += processed_bytes; - } - file->parsed = file->buf_len; - - if (lines > 0) { - /* Append buffer content to a chunk */ - *bytes = processed_bytes; - - if (file->sl_log_event_encoder->output_length > 0) { - flb_input_log_append_records(ctx->ins, - lines, - file->tag_buf, - file->tag_len, - file->sl_log_event_encoder->output_buffer, - file->sl_log_event_encoder->output_length); - - flb_log_event_encoder_reset(file->sl_log_event_encoder); - } - } - else if (file->skip_next) { - *bytes = file->buf_len; - } - else { - *bytes = processed_bytes; - } - - if (ctx->ml_ctx) { - ml_stream_buffer_flush(ctx, file); - } - - return lines; -} - -static inline void drop_bytes(char *buf, size_t len, int pos, int bytes) -{ - memmove(buf + pos, - buf + pos + bytes, - len - pos - bytes); -} - -#ifdef FLB_HAVE_REGEX -static void cb_results(const char *name, const char *value, - size_t vlen, void *data) -{ - struct flb_hash_table *ht = data; - - if (vlen == 0) { - return; - } - - flb_hash_table_add(ht, name, strlen(name), (void *) value, vlen); -} -#endif - -#ifdef FLB_HAVE_REGEX -static int tag_compose(char *tag, struct flb_regex *tag_regex, char *fname, - char *out_buf, size_t *out_size, - struct flb_tail_config *ctx) -#else -static int tag_compose(char *tag, char *fname, char *out_buf, size_t *out_size, - struct flb_tail_config *ctx) -#endif -{ - int i; - size_t len; - char *p; - size_t buf_s = 0; -#ifdef FLB_HAVE_REGEX - ssize_t n; - struct flb_regex_search result; - struct flb_hash_table *ht; - char *beg; - char *end; - int ret; - const char *tmp; - size_t tmp_s; -#endif - -#ifdef FLB_HAVE_REGEX - if (tag_regex) { - n = flb_regex_do(tag_regex, fname, strlen(fname), &result); - if (n <= 0) { - flb_plg_error(ctx->ins, "invalid tag_regex pattern for file %s", - fname); - return -1; - } - else { - ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, - FLB_HASH_TABLE_SIZE, FLB_HASH_TABLE_SIZE); - flb_regex_parse(tag_regex, &result, cb_results, ht); - - for (p = tag, beg = p; (beg = strchr(p, '<')); p = end + 2) { - if (beg != p) { - len = (beg - p); - memcpy(out_buf + buf_s, p, len); - buf_s += len; - } - - beg++; - - end = strchr(beg, '>'); - if (end && !memchr(beg, '<', end - beg)) { - end--; - - len = end - beg + 1; - ret = flb_hash_table_get(ht, beg, len, (void *) &tmp, &tmp_s); - if (ret != -1) { - memcpy(out_buf + buf_s, tmp, tmp_s); - buf_s += tmp_s; - } - else { - memcpy(out_buf + buf_s, "_", 1); - buf_s++; - } - } - else { - flb_plg_error(ctx->ins, - "missing closing angle bracket in tag %s " - "at position %lu", tag, beg - tag); - flb_hash_table_destroy(ht); - return -1; - } - } - - flb_hash_table_destroy(ht); - if (*p) { - len = strlen(p); - memcpy(out_buf + buf_s, p, len); - buf_s += len; - } - } - } - else { -#endif - p = strchr(tag, '*'); - if (!p) { - return -1; - } - - /* Copy tag prefix if any */ - len = (p - tag); - if (len > 0) { - memcpy(out_buf, tag, len); - buf_s += len; - } - - /* Append file name */ - len = strlen(fname); - memcpy(out_buf + buf_s, fname, len); - buf_s += len; - - /* Tag suffix (if any) */ - p++; - if (*p) { - len = strlen(tag); - memcpy(out_buf + buf_s, p, (len - (p - tag))); - buf_s += (len - (p - tag)); - } - - /* Sanitize buffer */ - for (i = 0; i < buf_s; i++) { - if (out_buf[i] == '/' || out_buf[i] == '\\' || out_buf[i] == ':') { - if (i > 0) { - out_buf[i] = '.'; - } - else { - drop_bytes(out_buf, buf_s, i, 1); - buf_s--; - i--; - } - } - - if (i > 0 && out_buf[i] == '.') { - if (out_buf[i - 1] == '.') { - drop_bytes(out_buf, buf_s, i, 1); - buf_s--; - i--; - } - } - else if (out_buf[i] == '*') { - drop_bytes(out_buf, buf_s, i, 1); - buf_s--; - i--; - } - } - - /* Check for an ending '.' */ - if (out_buf[buf_s - 1] == '.') { - drop_bytes(out_buf, buf_s, buf_s - 1, 1); - buf_s--; - } -#ifdef FLB_HAVE_REGEX - } -#endif - - out_buf[buf_s] = '\0'; - *out_size = buf_s; - - return 0; -} - -static inline int flb_tail_file_exists(struct stat *st, - struct flb_tail_config *ctx) -{ - int ret; - uint64_t hash; - - ret = stat_to_hash_bits(ctx, st, &hash); - if (ret != 0) { - return -1; - } - - /* static hash */ - if (flb_hash_table_exists(ctx->static_hash, hash)) { - return FLB_TRUE; - } - - /* event hash */ - if (flb_hash_table_exists(ctx->event_hash, hash)) { - return FLB_TRUE; - } - - return FLB_FALSE; -} - -/* - * Based in the configuration or database offset, set the proper 'offset' for the - * file in question. - */ -static int set_file_position(struct flb_tail_config *ctx, - struct flb_tail_file *file) -{ - int64_t ret; - -#ifdef FLB_HAVE_SQLDB - /* - * If the database option is enabled, try to gather the file position. The - * database function updates the file->offset entry. - */ - if (ctx->db) { - ret = flb_tail_db_file_set(file, ctx); - if (ret == 0) { - if (file->offset > 0) { - ret = lseek(file->fd, file->offset, SEEK_SET); - if (ret == -1) { - flb_errno(); - return -1; - } - } - else if (ctx->read_from_head == FLB_FALSE) { - ret = lseek(file->fd, 0, SEEK_END); - if (ret == -1) { - flb_errno(); - return -1; - } - file->offset = ret; - flb_tail_db_file_offset(file, ctx); - } - return 0; - } - } -#endif - - if (ctx->read_from_head == FLB_TRUE) { - /* no need to seek, offset position is already zero */ - return 0; - } - - /* tail... */ - ret = lseek(file->fd, 0, SEEK_END); - if (ret == -1) { - flb_errno(); - return -1; - } - file->offset = ret; - - return 0; -} - -/* Multiline flush callback: invoked every time some content is complete */ -static int ml_flush_callback(struct flb_ml_parser *parser, - struct flb_ml_stream *mst, - void *data, char *buf_data, size_t buf_size) -{ - int result; - size_t mult_size = 0; - char *mult_buf = NULL; - struct flb_tail_file *file = data; - struct flb_tail_config *ctx = file->config; - - if (ctx->path_key == NULL && ctx->offset_key == NULL) { - ml_stream_buffer_append(file, buf_data, buf_size); - } - else { - /* adjust the records in a new buffer */ - result = record_append_custom_keys(file, - buf_data, - buf_size, - &mult_buf, - &mult_size); - - if (result < 0) { - ml_stream_buffer_append(file, buf_data, buf_size); - } - else { - ml_stream_buffer_append(file, mult_buf, mult_size); - - flb_free(mult_buf); - } - } - - if (mst->forced_flush) { - ml_stream_buffer_flush(ctx, file); - } - - return 0; -} - -int flb_tail_file_append(char *path, struct stat *st, int mode, - struct flb_tail_config *ctx) -{ - int fd; - int ret; - uint64_t stream_id; - uint64_t ts; - uint64_t hash_bits; - flb_sds_t hash_key; - size_t len; - char *tag; - char *name; - size_t tag_len; - struct flb_tail_file *file; - struct stat lst; - flb_sds_t inode_str; - - if (!S_ISREG(st->st_mode)) { - return -1; - } - - if (flb_tail_file_exists(st, ctx) == FLB_TRUE) { - return -1; - } - - fd = open(path, O_RDONLY); - if (fd == -1) { - flb_errno(); - flb_plg_error(ctx->ins, "cannot open %s", path); - return -1; - } - - file = flb_calloc(1, sizeof(struct flb_tail_file)); - if (!file) { - flb_errno(); - goto error; - } - - /* Initialize */ - file->watch_fd = -1; - file->fd = fd; - - /* On non-windows environments check if the original path is a link */ - ret = lstat(path, &lst); - if (ret == 0) { - if (S_ISLNK(lst.st_mode)) { - file->is_link = FLB_TRUE; - file->link_inode = lst.st_ino; - } - } - - /* get unique hash for this file */ - ret = stat_to_hash_bits(ctx, st, &hash_bits); - if (ret != 0) { - flb_plg_error(ctx->ins, "error procesisng hash bits for file %s", path); - goto error; - } - file->hash_bits = hash_bits; - - /* store the hash key used for hash_bits */ - ret = stat_to_hash_key(ctx, st, &hash_key); - if (ret != 0) { - flb_plg_error(ctx->ins, "error procesisng hash key for file %s", path); - goto error; - } - file->hash_key = hash_key; - - file->inode = st->st_ino; - file->offset = 0; - file->size = st->st_size; - file->buf_len = 0; - file->parsed = 0; - file->config = ctx; - file->tail_mode = mode; - file->tag_len = 0; - file->tag_buf = NULL; - file->rotated = 0; - file->pending_bytes = 0; - file->mult_firstline = FLB_FALSE; - file->mult_keys = 0; - file->mult_flush_timeout = 0; - file->mult_skipping = FLB_FALSE; - - /* - * Duplicate string into 'file' structure, the called function - * take cares to resolve real-name of the file in case we are - * running in a non-Linux system. - * - * Depending of the operating system, the way to obtain the file - * name associated to it file descriptor can have different behaviors - * specifically if it root path it's under a symbolic link. On Linux - * we can trust the file name but in others it's better to solve it - * with some extra calls. - */ - ret = flb_tail_file_name_dup(path, file); - if (!file->name) { - flb_errno(); - goto error; - } - - /* We keep a copy of the initial filename in orig_name. This is required - * for path_key to continue working after rotation. */ - file->orig_name = flb_strdup(file->name); - if (!file->orig_name) { - flb_free(file->name); - flb_errno(); - goto error; - } - file->orig_name_len = file->name_len; - - /* multiline msgpack buffers */ - file->mult_records = 0; - msgpack_sbuffer_init(&file->mult_sbuf); - msgpack_packer_init(&file->mult_pck, &file->mult_sbuf, - msgpack_sbuffer_write); - - /* docker mode */ - file->dmode_flush_timeout = 0; - file->dmode_complete = true; - file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0); - file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0); - file->dmode_firstline = false; -#ifdef FLB_HAVE_SQLDB - file->db_id = 0; -#endif - file->skip_next = FLB_FALSE; - file->skip_warn = FLB_FALSE; - - /* Multiline core mode */ - if (ctx->ml_ctx) { - /* - * Create inode str to get stream_id. - * - * If stream_id is created by filename, - * it will be same after file rotation and it causes invalid destruction: - * - * - https://github.com/fluent/fluent-bit/issues/4190 - */ - inode_str = flb_sds_create_size(64); - flb_sds_printf(&inode_str, "%"PRIu64, file->inode); - - /* Create a stream for this file */ - ret = flb_ml_stream_create(ctx->ml_ctx, - inode_str, flb_sds_len(inode_str), - ml_flush_callback, file, - &stream_id); - if (ret != 0) { - flb_plg_error(ctx->ins, - "could not create multiline stream for file: %s", - inode_str); - flb_sds_destroy(inode_str); - goto error; - } - file->ml_stream_id = stream_id; - flb_sds_destroy(inode_str); - - /* - * Multiline core file buffer: the multiline core functionality invokes a callback everytime a message is ready - * to be processed by the caller, this can be a multiline message or a message that is considered 'complete'. In - * the previous version of Tail, when it received a message this message was automatically ingested into the pipeline - * without any previous buffering which leads to performance degradation. - * - * The msgpack buffer 'ml_sbuf' keeps all ML provided records and it's flushed just when the file processor finish - * processing the "read() bytes". - */ - } - - /* Local buffer */ - file->buf_size = ctx->buf_chunk_size; - file->buf_data = flb_malloc(file->buf_size); - if (!file->buf_data) { - flb_errno(); - goto error; - } - - /* Initialize (optional) dynamic tag */ - if (ctx->dynamic_tag == FLB_TRUE) { - len = ctx->ins->tag_len + strlen(path) + 1; - tag = flb_malloc(len); - if (!tag) { - flb_errno(); - flb_plg_error(ctx->ins, "failed to allocate tag buffer"); - goto error; - } -#ifdef FLB_HAVE_REGEX - ret = tag_compose(ctx->ins->tag, ctx->tag_regex, path, tag, &tag_len, ctx); -#else - ret = tag_compose(ctx->ins->tag, path, tag, &tag_len, ctx); -#endif - if (ret == 0) { - file->tag_len = tag_len; - file->tag_buf = flb_strdup(tag); - } - flb_free(tag); - if (ret != 0) { - flb_plg_error(ctx->ins, "failed to compose tag for file: %s", path); - goto error; - } - } - else { - file->tag_len = strlen(ctx->ins->tag); - file->tag_buf = flb_strdup(ctx->ins->tag); - } - if (!file->tag_buf) { - flb_plg_error(ctx->ins, "failed to set tag for file: %s", path); - flb_errno(); - goto error; - } - - if (mode == FLB_TAIL_STATIC) { - mk_list_add(&file->_head, &ctx->files_static); - ctx->files_static_count++; - flb_hash_table_add(ctx->static_hash, file->hash_key, flb_sds_len(file->hash_key), - file, sizeof(file)); - tail_signal_manager(file->config); - } - else if (mode == FLB_TAIL_EVENT) { - mk_list_add(&file->_head, &ctx->files_event); - flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key), - file, sizeof(file)); - - /* Register this file into the fs_event monitoring */ - ret = flb_tail_fs_add(ctx, file); - if (ret == -1) { - flb_plg_error(ctx->ins, "could not register file into fs_events"); - goto error; - } - } - - /* Set the file position (database offset, head or tail) */ - ret = set_file_position(ctx, file); - if (ret == -1) { - flb_tail_file_remove(file); - goto error; - } - - /* Remaining bytes to read */ - file->pending_bytes = file->size - file->offset; - -#ifdef FLB_HAVE_METRICS - name = (char *) flb_input_name(ctx->ins); - ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_files_opened, ts, 1, (char *[]) {name}); - - /* Old api */ - flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics); -#endif - - file->sl_log_event_encoder = flb_log_event_encoder_create( - FLB_LOG_EVENT_FORMAT_DEFAULT); - - if (file->sl_log_event_encoder == NULL) { - flb_tail_file_remove(file); - - goto error; - } - - file->ml_log_event_encoder = flb_log_event_encoder_create( - FLB_LOG_EVENT_FORMAT_DEFAULT); - - if (file->ml_log_event_encoder == NULL) { - flb_tail_file_remove(file); - - goto error; - } - - flb_plg_debug(ctx->ins, - "inode=%"PRIu64" with offset=%"PRId64" appended as %s", - file->inode, file->offset, path); - return 0; - -error: - if (file) { - if (file->buf_data) { - flb_free(file->buf_data); - } - if (file->name) { - flb_free(file->name); - } - flb_free(file); - } - close(fd); - - return -1; -} - -void flb_tail_file_remove(struct flb_tail_file *file) -{ - uint64_t ts; - char *name; - struct flb_tail_config *ctx; - - ctx = file->config; - - flb_plg_debug(ctx->ins, "inode=%"PRIu64" removing file name %s", - file->inode, file->name); - - /* remove the multiline.core stream */ - if (ctx->ml_ctx && file->ml_stream_id > 0) { - /* destroy ml stream */ - flb_ml_stream_id_destroy_all(ctx->ml_ctx, file->ml_stream_id); - } - - if (file->rotated > 0) { -#ifdef FLB_HAVE_SQLDB - /* - * Make sure to remove a the file entry from the database if the file - * was rotated and it's not longer being monitored. - */ - if (ctx->db) { - flb_tail_db_file_delete(file, file->config); - } -#endif - mk_list_del(&file->_rotate_head); - } - - msgpack_sbuffer_destroy(&file->mult_sbuf); - - if (file->sl_log_event_encoder != NULL) { - flb_log_event_encoder_destroy(file->sl_log_event_encoder); - } - - if (file->ml_log_event_encoder != NULL) { - flb_log_event_encoder_destroy(file->ml_log_event_encoder); - } - - flb_sds_destroy(file->dmode_buf); - flb_sds_destroy(file->dmode_lastline); - mk_list_del(&file->_head); - flb_tail_fs_remove(ctx, file); - - /* avoid deleting file with -1 fd */ - if (file->fd != -1) { - close(file->fd); - } - if (file->tag_buf) { - flb_free(file->tag_buf); - } - - /* remove any potential entry from the hash tables */ - flb_hash_table_del(ctx->static_hash, file->hash_key); - flb_hash_table_del(ctx->event_hash, file->hash_key); - - flb_free(file->buf_data); - flb_free(file->name); - flb_free(file->orig_name); - flb_free(file->real_name); - flb_sds_destroy(file->hash_key); - -#ifdef FLB_HAVE_METRICS - name = (char *) flb_input_name(ctx->ins); - ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_files_closed, ts, 1, (char *[]) {name}); - - /* old api */ - flb_metrics_sum(FLB_TAIL_METRIC_F_CLOSED, 1, ctx->ins->metrics); -#endif - - flb_free(file); -} - -int flb_tail_file_remove_all(struct flb_tail_config *ctx) -{ - int count = 0; - struct mk_list *head; - struct mk_list *tmp; - struct flb_tail_file *file; - - mk_list_foreach_safe(head, tmp, &ctx->files_static) { - file = mk_list_entry(head, struct flb_tail_file, _head); - flb_tail_file_remove(file); - count++; - } - - mk_list_foreach_safe(head, tmp, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - flb_tail_file_remove(file); - count++; - } - - return count; -} - -static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *file) -{ - int ret; - int64_t offset; - struct stat st; - - ret = fstat(file->fd, &st); - if (ret == -1) { - flb_errno(); - return FLB_TAIL_ERROR; - } - - /* Check if the file was truncated */ - if (file->offset > st.st_size) { - offset = lseek(file->fd, 0, SEEK_SET); - if (offset == -1) { - flb_errno(); - return FLB_TAIL_ERROR; - } - - flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s", - file->inode, file->name); - file->offset = offset; - file->buf_len = 0; - - /* Update offset in the database file */ -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - flb_tail_db_file_offset(file, ctx); - } -#endif - } - else { - file->size = st.st_size; - file->pending_bytes = (st.st_size - file->offset); - } - - return FLB_TAIL_OK; -} - -int flb_tail_file_chunk(struct flb_tail_file *file) -{ - int ret; - char *tmp; - size_t size; - size_t capacity; - size_t processed_bytes; - ssize_t bytes; - struct flb_tail_config *ctx; - - /* Check if we the engine issued a pause */ - ctx = file->config; - if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { - return FLB_TAIL_BUSY; - } - - capacity = (file->buf_size - file->buf_len) - 1; - if (capacity < 1) { - /* - * If there is no more room for more data, try to increase the - * buffer under the limit of buffer_max_size. - */ - if (file->buf_size >= ctx->buf_max_size) { - if (ctx->skip_long_lines == FLB_FALSE) { - flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, " - "lines are too long. Skipping file.", file->name); - return FLB_TAIL_ERROR; - } - - /* Warn the user */ - if (file->skip_warn == FLB_FALSE) { - flb_plg_warn(ctx->ins, "file=%s have long lines. " - "Skipping long lines.", file->name); - file->skip_warn = FLB_TRUE; - } - - /* Do buffer adjustments */ - file->offset += file->buf_len; - file->buf_len = 0; - file->skip_next = FLB_TRUE; - } - else { - size = file->buf_size + ctx->buf_chunk_size; - if (size > ctx->buf_max_size) { - size = ctx->buf_max_size; - } - - /* Increase the buffer size */ - tmp = flb_realloc(file->buf_data, size); - if (tmp) { - flb_plg_trace(ctx->ins, "file=%s increase buffer size " - "%lu => %lu bytes", - file->name, file->buf_size, size); - file->buf_data = tmp; - file->buf_size = size; - } - else { - flb_errno(); - flb_plg_error(ctx->ins, "cannot increase buffer size for %s, " - "skipping file.", file->name); - return FLB_TAIL_ERROR; - } - } - capacity = (file->buf_size - file->buf_len) - 1; - } - - bytes = read(file->fd, file->buf_data + file->buf_len, capacity); - if (bytes > 0) { - /* we read some data, let the content processor take care of it */ - file->buf_len += bytes; - file->buf_data[file->buf_len] = '\0'; - - /* Now that we have some data in the buffer, call the data processor - * which aims to cut lines and register the entries into the engine. - * - * The returned value is the absolute offset the file must be seek - * now. It may need to get back a few bytes at the beginning of a new - * line. - */ - ret = process_content(file, &processed_bytes); - if (ret < 0) { - flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR", - file->inode, file->name); - return FLB_TAIL_ERROR; - } - - /* Adjust the file offset and buffer */ - file->offset += processed_bytes; - consume_bytes(file->buf_data, processed_bytes, file->buf_len); - file->buf_len -= processed_bytes; - file->buf_data[file->buf_len] = '\0'; - -#ifdef FLB_HAVE_SQLDB - if (file->config->db) { - flb_tail_db_file_offset(file, file->config); - } -#endif - - /* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */ - ret = adjust_counters(ctx, file); - - /* Data was consumed but likely some bytes still remain */ - return ret; - } - else if (bytes == 0) { - /* We reached the end of file, let's wait for some incoming data */ - ret = adjust_counters(ctx, file); - if (ret == FLB_TAIL_OK) { - return FLB_TAIL_WAIT; - } - else { - return FLB_TAIL_ERROR; - } - } - else { - /* error */ - flb_errno(); - flb_plg_error(ctx->ins, "error reading %s", file->name); - return FLB_TAIL_ERROR; - } - - return FLB_TAIL_ERROR; -} - -/* Returns FLB_TRUE if a file has been rotated, otherwise FLB_FALSE */ -int flb_tail_file_is_rotated(struct flb_tail_config *ctx, - struct flb_tail_file *file) -{ - int ret; - char *name; - struct stat st; - - /* - * Do not double-check already rotated files since the caller of this - * function will trigger a rotation. - */ - if (file->rotated != 0) { - return FLB_FALSE; - } - - /* Check if the 'original monitored file' is a link and rotated */ - if (file->is_link == FLB_TRUE) { - ret = lstat(file->name, &st); - if (ret == -1) { - /* Broken link or missing file */ - if (errno == ENOENT) { - flb_plg_info(ctx->ins, "inode=%"PRIu64" link_rotated: %s", - file->link_inode, file->name); - return FLB_TRUE; - } - else { - flb_errno(); - flb_plg_error(ctx->ins, - "link_inode=%"PRIu64" cannot detect if file: %s", - file->link_inode, file->name); - return -1; - } - } - else { - /* The file name is there, check if the same that we have */ - if (st.st_ino != file->link_inode) { - return FLB_TRUE; - } - } - } - - /* Retrieve the real file name, operating system lookup */ - name = flb_tail_file_name(file); - if (!name) { - flb_plg_error(ctx->ins, - "inode=%"PRIu64" cannot detect if file was rotated: %s", - file->inode, file->name); - return -1; - } - - - /* Get stats from the file name */ - ret = stat(name, &st); - if (ret == -1) { - flb_errno(); - flb_free(name); - return -1; - } - - /* Compare inodes and names */ - if (file->inode == st.st_ino && - flb_tail_target_file_name_cmp(name, file) == 0) { - flb_free(name); - return FLB_FALSE; - } - - flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated: %s => %s", - file->inode, file->name, name); - - flb_free(name); - return FLB_TRUE; -} - -/* Promote a event in the static list to the dynamic 'events' interface */ -int flb_tail_file_to_event(struct flb_tail_file *file) -{ - int ret; - struct stat st; - struct flb_tail_config *ctx = file->config; - - /* Check if the file promoted have pending bytes */ - ret = fstat(file->fd, &st); - if (ret != 0) { - flb_errno(); - return -1; - } - - if (file->offset < st.st_size) { - file->pending_bytes = (st.st_size - file->offset); - tail_signal_pending(file->config); - } - else { - file->pending_bytes = 0; - } - - /* Check if the file has been rotated */ - ret = flb_tail_file_is_rotated(ctx, file); - if (ret == FLB_TRUE) { - flb_tail_file_rotated(file); - } - - /* Notify the fs-event handler that we will start monitoring this 'file' */ - ret = flb_tail_fs_add(ctx, file); - if (ret == -1) { - return -1; - } - - /* List swap: change from 'static' to 'event' list */ - mk_list_del(&file->_head); - ctx->files_static_count--; - flb_hash_table_del(ctx->static_hash, file->hash_key); - - mk_list_add(&file->_head, &file->config->files_event); - flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key), - file, sizeof(file)); - - file->tail_mode = FLB_TAIL_EVENT; - - return 0; -} - -/* - * Given an open file descriptor, return the filename. This function is a - * bit slow and it aims to be used only when a file is rotated. - */ -char *flb_tail_file_name(struct flb_tail_file *file) -{ - int ret; - char *buf; -#ifdef __linux__ - ssize_t s; - char tmp[128]; -#elif defined(__APPLE__) - char path[PATH_MAX]; -#elif defined(FLB_SYSTEM_WINDOWS) - HANDLE h; -#elif defined(FLB_SYSTEM_FREEBSD) - struct kinfo_file *file_entries; - int file_count; - int file_index; -#endif - - buf = flb_malloc(PATH_MAX); - if (!buf) { - flb_errno(); - return NULL; - } - -#ifdef __linux__ - ret = snprintf(tmp, sizeof(tmp) - 1, "/proc/%i/fd/%i", getpid(), file->fd); - if (ret == -1) { - flb_errno(); - flb_free(buf); - return NULL; - } - - s = readlink(tmp, buf, PATH_MAX); - if (s == -1) { - flb_free(buf); - flb_errno(); - return NULL; - } - buf[s] = '\0'; - -#elif __APPLE__ - int len; - - ret = fcntl(file->fd, F_GETPATH, path); - if (ret == -1) { - flb_errno(); - flb_free(buf); - return NULL; - } - - len = strlen(path); - memcpy(buf, path, len); - buf[len] = '\0'; - -#elif defined(FLB_SYSTEM_WINDOWS) - int len; - - h = (HANDLE) _get_osfhandle(file->fd); - if (h == INVALID_HANDLE_VALUE) { - flb_errno(); - flb_free(buf); - return NULL; - } - - /* This function returns the length of the string excluding "\0" - * and the resulting path has a "\\?\" prefix. - */ - len = GetFinalPathNameByHandleA(h, buf, PATH_MAX, FILE_NAME_NORMALIZED); - if (len == 0 || len >= PATH_MAX) { - flb_free(buf); - return NULL; - } - - if (strstr(buf, "\\\\?\\")) { - memmove(buf, buf + 4, len + 1); - } -#elif defined(FLB_SYSTEM_FREEBSD) - if ((file_entries = kinfo_getfile(getpid(), &file_count)) == NULL) { - flb_free(buf); - return NULL; - } - - for (file_index=0; file_index < file_count; file_index++) { - if (file_entries[file_index].kf_fd == file->fd) { - strncpy(buf, file_entries[file_index].kf_path, PATH_MAX - 1); - buf[PATH_MAX - 1] = 0; - break; - } - } - free(file_entries); -#endif - return buf; -} - -int flb_tail_file_name_dup(char *path, struct flb_tail_file *file) -{ - file->name = flb_strdup(path); - if (!file->name) { - flb_errno(); - return -1; - } - file->name_len = strlen(file->name); - - if (file->real_name) { - flb_free(file->real_name); - } - - file->real_name = flb_tail_file_name(file); - if (!file->real_name) { - flb_errno(); - flb_free(file->name); - file->name = NULL; - return -1; - } - - return 0; -} - -/* Invoked every time a file was rotated */ -int flb_tail_file_rotated(struct flb_tail_file *file) -{ - int ret; - uint64_t ts; - char *name; - char *i_name; - char *tmp; - struct stat st; - struct flb_tail_config *ctx = file->config; - - /* Get the new file name */ - name = flb_tail_file_name(file); - if (!name) { - return -1; - } - - flb_plg_debug(ctx->ins, "inode=%"PRIu64" rotated %s -> %s", - file->inode, file->name, name); - - /* Update local file entry */ - tmp = file->name; - flb_tail_file_name_dup(name, file); - flb_plg_info(ctx->ins, "inode=%"PRIu64" handle rotation(): %s => %s", - file->inode, tmp, file->name); - if (file->rotated == 0) { - file->rotated = time(NULL); - mk_list_add(&file->_rotate_head, &file->config->files_rotated); - - /* Rotate the file in the database */ -#ifdef FLB_HAVE_SQLDB - if (file->config->db) { - ret = flb_tail_db_file_rotate(name, file, file->config); - if (ret == -1) { - flb_plg_error(ctx->ins, "could not rotate file %s->%s in database", - file->name, name); - } - } -#endif - -#ifdef FLB_HAVE_METRICS - i_name = (char *) flb_input_name(ctx->ins); - ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_files_rotated, ts, 1, (char *[]) {i_name}); - - /* OLD api */ - flb_metrics_sum(FLB_TAIL_METRIC_F_ROTATED, - 1, file->config->ins->metrics); -#endif - - /* Check if a new file has been created */ - ret = stat(tmp, &st); - if (ret == 0 && st.st_ino != file->inode) { - if (flb_tail_file_exists(&st, ctx) == FLB_FALSE) { - ret = flb_tail_file_append(tmp, &st, FLB_TAIL_STATIC, ctx); - if (ret == -1) { - flb_tail_scan(ctx->path_list, ctx); - } - else { - tail_signal_manager(file->config); - } - } - } - } - flb_free(tmp); - flb_free(name); - - return 0; -} - -static int check_purge_deleted_file(struct flb_tail_config *ctx, - struct flb_tail_file *file, time_t ts) -{ - int ret; - struct stat st; - - ret = fstat(file->fd, &st); - if (ret == -1) { - flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name); - flb_tail_file_remove(file); - return FLB_TRUE; - } - - if (st.st_nlink == 0) { - flb_plg_debug(ctx->ins, "purge: monitored file has been deleted: %s", - file->name); -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - /* Remove file entry from the database */ - flb_tail_db_file_delete(file, file->config); - } -#endif - /* Remove file from the monitored list */ - flb_tail_file_remove(file); - return FLB_TRUE; - } - - return FLB_FALSE; -} - -/* Purge rotated and deleted files */ -int flb_tail_file_purge(struct flb_input_instance *ins, - struct flb_config *config, void *context) -{ - int ret; - int count = 0; - struct mk_list *tmp; - struct mk_list *head; - struct flb_tail_file *file; - struct flb_tail_config *ctx = context; - time_t now; - struct stat st; - - /* Rotated files */ - now = time(NULL); - mk_list_foreach_safe(head, tmp, &ctx->files_rotated) { - file = mk_list_entry(head, struct flb_tail_file, _rotate_head); - if ((file->rotated + ctx->rotate_wait) <= now) { - ret = fstat(file->fd, &st); - if (ret == 0) { - flb_plg_debug(ctx->ins, - "inode=%"PRIu64" purge rotated file %s " \ - "(offset=%"PRId64" / size = %"PRIu64")", - file->inode, file->name, file->offset, (uint64_t)st.st_size); - if (file->pending_bytes > 0 && flb_input_buf_paused(ins)) { - flb_plg_warn(ctx->ins, "purged rotated file while data " - "ingestion is paused, consider increasing " - "rotate_wait"); - } - } - else { - flb_plg_debug(ctx->ins, - "inode=%"PRIu64" purge rotated file %s (offset=%"PRId64")", - file->inode, file->name, file->offset); - } - - flb_tail_file_remove(file); - count++; - } - } - - /* - * Deleted files: under high load scenarios, exists the chances that in - * our event loop we miss some notifications about a file. In order to - * sanitize our list of monitored files we will iterate all of them and check - * if they have been deleted or not. - */ - mk_list_foreach_safe(head, tmp, &ctx->files_static) { - file = mk_list_entry(head, struct flb_tail_file, _head); - check_purge_deleted_file(ctx, file, now); - } - mk_list_foreach_safe(head, tmp, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - check_purge_deleted_file(ctx, file, now); - } - - return count; -} |