diff options
Diffstat (limited to 'fluent-bit/plugins/in_tail/tail_config.c')
-rw-r--r-- | fluent-bit/plugins/in_tail/tail_config.c | 472 |
1 files changed, 472 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_tail/tail_config.c b/fluent-bit/plugins/in_tail/tail_config.c new file mode 100644 index 000000000..360b3dbea --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_config.c @@ -0,0 +1,472 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_parser.h> + +#include <stdlib.h> +#include <fcntl.h> + +#include "tail_fs.h" +#include "tail_db.h" +#include "tail_config.h" +#include "tail_scan.h" +#include "tail_sql.h" +#include "tail_dockermode.h" + +#ifdef FLB_HAVE_PARSER +#include "tail_multiline.h" +#endif + +static int multiline_load_parsers(struct flb_tail_config *ctx) +{ + struct mk_list *head; + struct mk_list *head_p; + struct flb_config_map_val *mv; + struct flb_slist_entry *val = NULL; + struct flb_ml_parser_ins *parser_i; + + if (!ctx->multiline_parsers) { + return 0; + } + + /* Create Multiline context using the plugin instance name */ + ctx->ml_ctx = flb_ml_create(ctx->config, ctx->ins->name); + if (!ctx->ml_ctx) { + return -1; + } + + /* + * Iterate all 'multiline.parser' entries. Every entry is considered + * a group which can have multiple multiline parser instances. + */ + flb_config_map_foreach(head, mv, ctx->multiline_parsers) { + mk_list_foreach(head_p, mv->val.list) { + val = mk_list_entry(head_p, struct flb_slist_entry, _head); + + /* Create an instance of the defined parser */ + parser_i = flb_ml_parser_instance_create(ctx->ml_ctx, val->str); + if (!parser_i) { + return -1; + } + } + } + + return 0; +} + +struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, + struct flb_config *config) +{ + int ret; + int sec; + int i; + long nsec; + const char *tmp; + struct flb_tail_config *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_tail_config)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->config = config; + ctx->ins = ins; + ctx->ignore_older = 0; + ctx->skip_long_lines = FLB_FALSE; +#ifdef FLB_HAVE_SQLDB + ctx->db_sync = 1; /* sqlite sync 'normal' */ +#endif + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Create the channel manager */ + ret = flb_pipe_create(ctx->ch_manager); + if (ret == -1) { + flb_errno(); + flb_free(ctx); + return NULL; + } + ctx->ch_reads = 0; + ctx->ch_writes = 0; + + /* Create the pending channel */ + ret = flb_pipe_create(ctx->ch_pending); + if (ret == -1) { + flb_errno(); + flb_tail_config_destroy(ctx); + return NULL; + } + /* Make pending channel non-blocking */ + for (i = 0; i <= 1; i++) { + ret = flb_pipe_set_nonblocking(ctx->ch_pending[i]); + if (ret == -1) { + flb_errno(); + flb_tail_config_destroy(ctx); + return NULL; + } + } + + /* Config: path/pattern to read files */ + if (!ctx->path_list || mk_list_size(ctx->path_list) == 0) { + flb_plg_error(ctx->ins, "no input 'path' was given"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* Config: seconds interval before to re-scan the path */ + tmp = flb_input_get_property("refresh_interval", ins); + if (!tmp) { + ctx->refresh_interval_sec = FLB_TAIL_REFRESH; + ctx->refresh_interval_nsec = 0; + } + else { + ret = flb_utils_time_split(tmp, &sec, &nsec); + if (ret == 0) { + ctx->refresh_interval_sec = sec; + ctx->refresh_interval_nsec = nsec; + + if (sec == 0 && nsec == 0) { + flb_plg_error(ctx->ins, "invalid 'refresh_interval' config " + "value (%s)", tmp); + flb_free(ctx); + return NULL; + } + + if (sec == 0 && nsec <= 1000000) { + flb_plg_warn(ctx->ins, "very low refresh_interval " + "(%i.%lu nanoseconds) might cause high CPU usage", + sec, nsec); + } + } + else { + flb_plg_error(ctx->ins, + "invalid 'refresh_interval' config value (%s)", + tmp); + flb_tail_config_destroy(ctx); + return NULL; + } + } + + /* Config: seconds interval to monitor file after rotation */ + if (ctx->rotate_wait <= 0) { + flb_plg_error(ctx->ins, "invalid 'rotate_wait' config value"); + flb_free(ctx); + return NULL; + } + +#ifdef FLB_HAVE_PARSER + /* Config: multi-line support */ + if (ctx->multiline == FLB_TRUE) { + ret = flb_tail_mult_create(ctx, ins, config); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return NULL; + } + } +#endif + + /* Config: Docker mode */ + if(ctx->docker_mode == FLB_TRUE) { + ret = flb_tail_dmode_create(ctx, ins, config); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return NULL; + } + } + + /* Validate buffer limit */ + if (ctx->buf_chunk_size > ctx->buf_max_size) { + flb_plg_error(ctx->ins, "buffer_max_size must be >= buffer_chunk"); + flb_free(ctx); + return NULL; + } + +#ifdef FLB_HAVE_REGEX + /* Parser / Format */ + tmp = flb_input_get_property("parser", ins); + if (tmp) { + ctx->parser = flb_parser_get(tmp, config); + if (!ctx->parser) { + flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp); + } + } +#endif + + mk_list_init(&ctx->files_static); + mk_list_init(&ctx->files_event); + mk_list_init(&ctx->files_rotated); + + /* hash table for files lookups */ + ctx->static_hash = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1000, 0); + if (!ctx->static_hash) { + flb_plg_error(ctx->ins, "could not create static hash"); + flb_tail_config_destroy(ctx); + return NULL; + } + + ctx->event_hash = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1000, 0); + if (!ctx->event_hash) { + flb_plg_error(ctx->ins, "could not create event hash"); + flb_tail_config_destroy(ctx); + return NULL; + } + +#ifdef FLB_HAVE_SQLDB + ctx->db = NULL; +#endif + +#ifdef FLB_HAVE_REGEX + tmp = flb_input_get_property("tag_regex", ins); + if (tmp) { + ctx->tag_regex = flb_regex_create(tmp); + if (ctx->tag_regex) { + ctx->dynamic_tag = FLB_TRUE; + } + else { + flb_plg_error(ctx->ins, "invalid 'tag_regex' config value"); + } + } + else { + ctx->tag_regex = NULL; + } +#endif + + /* Check if it should use dynamic tags */ + tmp = strchr(ins->tag, '*'); + if (tmp) { + ctx->dynamic_tag = FLB_TRUE; + } + +#ifdef FLB_HAVE_SQLDB + /* Database options (needs to be set before the context) */ + tmp = flb_input_get_property("db.sync", ins); + if (tmp) { + if (strcasecmp(tmp, "extra") == 0) { + ctx->db_sync = 3; + } + else if (strcasecmp(tmp, "full") == 0) { + ctx->db_sync = 2; + } + else if (strcasecmp(tmp, "normal") == 0) { + ctx->db_sync = 1; + } + else if (strcasecmp(tmp, "off") == 0) { + ctx->db_sync = 0; + } + else { + flb_plg_error(ctx->ins, "invalid database 'db.sync' value"); + } + } + + /* Initialize database */ + tmp = flb_input_get_property("db", ins); + if (tmp) { + ctx->db = flb_tail_db_open(tmp, ins, ctx, config); + if (!ctx->db) { + flb_plg_error(ctx->ins, "could not open/create database"); + flb_tail_config_destroy(ctx); + return NULL; + } + } + + /* Journal mode check */ + tmp = flb_input_get_property("db.journal_mode", ins); + if (tmp) { + if (strcasecmp(tmp, "DELETE") != 0 && + strcasecmp(tmp, "TRUNCATE") != 0 && + strcasecmp(tmp, "PERSIST") != 0 && + strcasecmp(tmp, "MEMORY") != 0 && + strcasecmp(tmp, "WAL") != 0 && + strcasecmp(tmp, "OFF") != 0) { + + flb_plg_error(ctx->ins, "invalid db.journal_mode=%s", tmp); + flb_tail_config_destroy(ctx); + return NULL; + } + } + + /* Prepare Statement */ + if (ctx->db) { + /* SQL_GET_FILE */ + ret = sqlite3_prepare_v2(ctx->db->handler, + SQL_GET_FILE, + -1, + &ctx->stmt_get_file, + 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* SQL_INSERT_FILE */ + ret = sqlite3_prepare_v2(ctx->db->handler, + SQL_INSERT_FILE, + -1, + &ctx->stmt_insert_file, + 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* SQL_ROTATE_FILE */ + ret = sqlite3_prepare_v2(ctx->db->handler, + SQL_ROTATE_FILE, + -1, + &ctx->stmt_rotate_file, + 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* SQL_UPDATE_OFFSET */ + ret = sqlite3_prepare_v2(ctx->db->handler, + SQL_UPDATE_OFFSET, + -1, + &ctx->stmt_offset, + 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* SQL_DELETE_FILE */ + ret = sqlite3_prepare_v2(ctx->db->handler, + SQL_DELETE_FILE, + -1, + &ctx->stmt_delete_file, + 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement"); + flb_tail_config_destroy(ctx); + return NULL; + } + + } +#endif + +#ifdef FLB_HAVE_PARSER + /* Multiline core API */ + if (ctx->multiline_parsers && mk_list_size(ctx->multiline_parsers) > 0) { + ret = multiline_load_parsers(ctx); + if (ret != 0) { + flb_plg_error(ctx->ins, "could not load multiline parsers"); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* Enable auto-flush routine */ + ret = flb_ml_auto_flush_init(ctx->ml_ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not start multiline auto-flush"); + flb_tail_config_destroy(ctx); + return NULL; + } + flb_plg_info(ctx->ins, "multiline core started"); + } +#endif + +#ifdef FLB_HAVE_METRICS + ctx->cmt_files_opened = cmt_counter_create(ins->cmt, + "fluentbit", "input", + "files_opened_total", + "Total number of opened files", + 1, (char *[]) {"name"}); + + ctx->cmt_files_closed = cmt_counter_create(ins->cmt, + "fluentbit", "input", + "files_closed_total", + "Total number of closed files", + 1, (char *[]) {"name"}); + + ctx->cmt_files_rotated = cmt_counter_create(ins->cmt, + "fluentbit", "input", + "files_rotated_total", + "Total number of rotated files", + 1, (char *[]) {"name"}); + + /* OLD metrics */ + flb_metrics_add(FLB_TAIL_METRIC_F_OPENED, + "files_opened", ctx->ins->metrics); + flb_metrics_add(FLB_TAIL_METRIC_F_CLOSED, + "files_closed", ctx->ins->metrics); + flb_metrics_add(FLB_TAIL_METRIC_F_ROTATED, + "files_rotated", ctx->ins->metrics); +#endif + + return ctx; +} + +int flb_tail_config_destroy(struct flb_tail_config *config) +{ + +#ifdef FLB_HAVE_PARSER + flb_tail_mult_destroy(config); + + if (config->ml_ctx) { + flb_ml_destroy(config->ml_ctx); + } +#endif + + /* Close pipe ends */ + flb_pipe_close(config->ch_manager[0]); + flb_pipe_close(config->ch_manager[1]); + flb_pipe_close(config->ch_pending[0]); + flb_pipe_close(config->ch_pending[1]); + +#ifdef FLB_HAVE_REGEX + if (config->tag_regex) { + flb_regex_destroy(config->tag_regex); + } +#endif + +#ifdef FLB_HAVE_SQLDB + if (config->db != NULL) { + sqlite3_finalize(config->stmt_get_file); + sqlite3_finalize(config->stmt_insert_file); + sqlite3_finalize(config->stmt_delete_file); + sqlite3_finalize(config->stmt_rotate_file); + sqlite3_finalize(config->stmt_offset); + flb_tail_db_close(config->db); + } +#endif + + if (config->static_hash) { + flb_hash_table_destroy(config->static_hash); + } + if (config->event_hash) { + flb_hash_table_destroy(config->event_hash); + } + + flb_free(config); + return 0; +} |