diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/plugins/in_systemd | |
parent | Initial commit. (diff) | |
download | netdata-upstream.tar.xz netdata-upstream.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/in_systemd')
-rw-r--r-- | fluent-bit/plugins/in_systemd/CMakeLists.txt | 11 | ||||
-rw-r--r-- | fluent-bit/plugins/in_systemd/systemd.c | 555 | ||||
-rw-r--r-- | fluent-bit/plugins/in_systemd/systemd_config.c | 314 | ||||
-rw-r--r-- | fluent-bit/plugins/in_systemd/systemd_config.h | 82 | ||||
-rw-r--r-- | fluent-bit/plugins/in_systemd/systemd_db.c | 197 | ||||
-rw-r--r-- | fluent-bit/plugins/in_systemd/systemd_db.h | 64 |
6 files changed, 1223 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_systemd/CMakeLists.txt b/fluent-bit/plugins/in_systemd/CMakeLists.txt new file mode 100644 index 00000000..3e52d52b --- /dev/null +++ b/fluent-bit/plugins/in_systemd/CMakeLists.txt @@ -0,0 +1,11 @@ +set(src + systemd_config.c + systemd.c) + +if(FLB_SQLDB) +set(src + ${src} + systemd_db.c) +endif() + +FLB_PLUGIN(in_systemd "${src}" ${JOURNALD_LIBRARIES}) diff --git a/fluent-bit/plugins/in_systemd/systemd.c b/fluent-bit/plugins/in_systemd/systemd.c new file mode 100644 index 00000000..02f81144 --- /dev/null +++ b/fluent-bit/plugins/in_systemd/systemd.c @@ -0,0 +1,555 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_time.h> + +#include "systemd_config.h" +#include "systemd_db.h" + +#include <ctype.h> + +/* msgpack helpers to pack unsigned ints (it takes care of endianness */ +#define pack_uint16(buf, d) _msgpack_store16(buf, (uint16_t) d) +#define pack_uint32(buf, d) _msgpack_store32(buf, (uint32_t) d) + +/* tag composer */ +static int tag_compose(const char *tag, const char *unit_name, + int unit_size, char **out_buf, size_t *out_size) +{ + int len; + const char *p; + char *buf = *out_buf; + size_t buf_s = 0; + + p = strchr(tag, '*'); + if (!p) { + return -1; + } + + /* Copy tag prefix if any */ + len = (p - tag); + if (len > 0) { + memcpy(buf, tag, len); + buf_s += len; + } + + /* Append file name */ + memcpy(buf + buf_s, unit_name, unit_size); + buf_s += unit_size; + + /* Tag suffix (if any) */ + p++; + if (*p) { + len = strlen(tag); + memcpy(buf + buf_s, p, (len - (p - tag))); + buf_s += (len - (p - tag)); + } + + buf[buf_s] = '\0'; + *out_size = buf_s; + + return 0; +} + +static int in_systemd_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int ret; + int ret_j; + int i; + int len; + int entries = 0; + int skip_entries = 0; + int rows = 0; + time_t sec; + long nsec; + uint64_t usec; + size_t length; + size_t threshold; + const char *sep; + const char *key; + const char *val; + char *buf = NULL; +#ifdef FLB_HAVE_SQLDB + char *cursor = NULL; +#endif + char *tag = NULL; + char new_tag[PATH_MAX]; + char last_tag[PATH_MAX] = {0}; + size_t tag_len; + size_t last_tag_len = 0; + const void *data; + struct flb_systemd_config *ctx = in_context; + struct flb_time tm; + + /* Restricted by mem_buf_limit */ + if (flb_input_buf_paused(ins) == FLB_TRUE) { + return FLB_SYSTEMD_BUSY; + } + + /* + * if there are not pending records from a previous round, likely we got + * some changes in the journal, otherwise go ahead and continue reading + * the journal. + */ + if (ctx->pending_records == FLB_FALSE) { + ret = sd_journal_process(ctx->j); + if (ret == SD_JOURNAL_INVALIDATE) { + flb_plg_debug(ctx->ins, + "received event on added or removed journal file"); + } + if (ret != SD_JOURNAL_APPEND && ret != SD_JOURNAL_NOP) { + return FLB_SYSTEMD_NONE; + } + } + + if (ctx->lowercase == FLB_TRUE) { + ret = sd_journal_get_data_threshold(ctx->j, &threshold); + if (ret != 0) { + flb_plg_error(ctx->ins, + "error setting up systemd data. " + "sd_journal_get_data_threshold() return value '%i'", + ret); + return FLB_SYSTEMD_ERROR; + } + } + + while ((ret_j = sd_journal_next(ctx->j)) > 0) { + /* If the tag is composed dynamically, gather the Systemd Unit name */ + if (ctx->dynamic_tag) { + ret = sd_journal_get_data(ctx->j, "_SYSTEMD_UNIT", &data, &length); + if (ret == 0) { + tag = new_tag; + tag_compose(ctx->ins->tag, (const char *) data + 14, length - 14, + &tag, &tag_len); + } + else { + tag = new_tag; + tag_compose(ctx->ins->tag, + FLB_SYSTEMD_UNKNOWN, sizeof(FLB_SYSTEMD_UNKNOWN) - 1, + &tag, &tag_len); + } + } + else { + tag = ctx->ins->tag; + tag_len = ctx->ins->tag_len; + } + + if (last_tag_len == 0) { + strncpy(last_tag, tag, tag_len); + last_tag_len = tag_len; + } + + /* Set time */ + ret = sd_journal_get_realtime_usec(ctx->j, &usec); + if (ret != 0) { + flb_plg_error(ctx->ins, + "error reading from systemd journal. " + "sd_journal_get_realtime_usec() return value '%i'", + ret); + /* It seems the journal file was deleted (rotated). */ + ret_j = -1; + break; + } + sec = usec / 1000000; + nsec = (usec % 1000000) * 1000; + flb_time_set(&tm, sec, nsec); + + /* + * The new incoming record can have a different tag than previous one, + * so a new msgpack buffer is required. We ingest the data and prepare + * a new buffer. + */ + if (ctx->log_encoder->output_length > 0 && + ((last_tag_len != tag_len) || + (strncmp(last_tag, tag, tag_len) != 0))) { + flb_input_log_append(ctx->ins, + last_tag, last_tag_len, + ctx->log_encoder->output_buffer, + ctx->log_encoder->output_length); + + flb_log_event_encoder_reset(ctx->log_encoder); + + strncpy(last_tag, tag, tag_len); + last_tag_len = tag_len; + } + + + ret = flb_log_event_encoder_begin_record(ctx->log_encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm); + } + + /* Pack every field in the entry */ + entries = 0; + skip_entries = 0; + while (sd_journal_enumerate_data(ctx->j, &data, &length) > 0 && + entries < ctx->max_fields) { + key = (const char *) data; + if (ctx->strip_underscores == FLB_TRUE && key[0] == '_') { + key++; + length--; + } + + sep = strchr(key, '='); + if (sep == NULL) { + skip_entries++; + continue; + } + + len = (sep - key); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_length( + ctx->log_encoder, len); + } + + if (ctx->lowercase == FLB_TRUE) { + /* + * Ensure buf to have enough space for the key because the libsystemd + * might return larger data than the threshold. + */ + if (buf == NULL) { + buf = flb_sds_create_len(NULL, threshold); + } + if (flb_sds_alloc(buf) < len) { + buf = flb_sds_increase(buf, len - flb_sds_alloc(buf)); + } + for (i = 0; i < len; i++) { + buf[i] = tolower(key[i]); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_body( + ctx->log_encoder, buf, len); + } + } + else { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_body( + ctx->log_encoder, (char *) key, len); + } + } + + val = sep + 1; + len = length - (sep - key) - 1; + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string( + ctx->log_encoder, (char *) val, len); + } + + entries++; + } + rows++; + + if (skip_entries > 0) { + flb_plg_error(ctx->ins, "Skip %d broken entries", skip_entries); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(ctx->log_encoder); + } + + /* + * Some journals can have too much data, pause if we have processed + * more than 1MB. Journal will resume later. + */ + if (ctx->log_encoder->output_length > 1024000) { + flb_input_log_append(ctx->ins, + tag, tag_len, + ctx->log_encoder->output_buffer, + ctx->log_encoder->output_length); + + flb_log_event_encoder_reset(ctx->log_encoder); + + strncpy(last_tag, tag, tag_len); + last_tag_len = tag_len; + + break; + } + + if (rows >= ctx->max_entries) { + break; + } + } + + flb_sds_destroy(buf); + +#ifdef FLB_HAVE_SQLDB + /* Save cursor */ + if (ctx->db) { + sd_journal_get_cursor(ctx->j, &cursor); + if (cursor) { + flb_systemd_db_set_cursor(ctx, cursor); + flb_free(cursor); + } + } +#endif + + /* Write any pending data into the buffer */ + if (ctx->log_encoder->output_length > 0) { + flb_input_log_append(ctx->ins, + tag, tag_len, + ctx->log_encoder->output_buffer, + ctx->log_encoder->output_length); + + flb_log_event_encoder_reset(ctx->log_encoder); + } + + /* the journal is empty, no more records */ + if (ret_j == 0) { + ctx->pending_records = FLB_FALSE; + return FLB_SYSTEMD_OK; + } + else if (ret_j > 0) { + /* + * ret_j == 1, but the loop was broken due to some special condition like + * buffer size limit or it reach the max number of rows that it supposed to + * process on this call. Assume there are pending records. + */ + ctx->pending_records = FLB_TRUE; + return FLB_SYSTEMD_MORE; + } + else { + /* Supposedly, current cursor points to a deleted file. + * Re-seeking to the first journal entry. + * Other failures, such as disk read error, would still lead to infinite loop there, + * but at least FLB log will be full of errors. */ + ret = sd_journal_seek_head(ctx->j); + flb_plg_error(ctx->ins, + "sd_journal_next() returned error %i; " + "journal is re-opened, unread logs are lost; " + "sd_journal_seek_head() returned %i", ret_j, ret); + ctx->pending_records = FLB_TRUE; + return FLB_SYSTEMD_ERROR; + } +} + +static int in_systemd_collect_archive(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int ret; + uint64_t val; + ssize_t bytes; + struct flb_systemd_config *ctx = in_context; + + bytes = read(ctx->ch_manager[0], &val, sizeof(uint64_t)); + if (bytes == -1) { + flb_errno(); + return -1; + } + + ret = in_systemd_collect(ins, config, in_context); + if (ret == FLB_SYSTEMD_OK) { + /* Events collector: journald events */ + ret = flb_input_set_collector_event(ins, + in_systemd_collect, + ctx->fd, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, "error setting up collector events"); + flb_systemd_config_destroy(ctx); + return -1; + } + ctx->coll_fd_journal = ret; + flb_input_collector_start(ctx->coll_fd_journal, ins); + + /* Timer to collect pending events */ + ret = flb_input_set_collector_time(ins, + in_systemd_collect, + 1, 0, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, + "error setting up collector for pending events"); + flb_systemd_config_destroy(ctx); + return -1; + } + ctx->coll_fd_pending = ret; + flb_input_collector_start(ctx->coll_fd_pending, ins); + + return 0; + } + + /* If FLB_SYSTEMD_NONE or FLB_SYSTEMD_MORE, keep trying */ + write(ctx->ch_manager[1], &val, sizeof(uint64_t)); + + return 0; +} + +static int in_systemd_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + int ret; + struct flb_systemd_config *ctx; + + ctx = flb_systemd_config_create(ins, config); + if (!ctx) { + flb_plg_error(ins, "cannot initialize"); + return -1; + } + + /* Set the context */ + flb_input_set_context(ins, ctx); + + /* Events collector: archive */ + ret = flb_input_set_collector_event(ins, in_systemd_collect_archive, + ctx->ch_manager[0], config); + if (ret == -1) { + flb_systemd_config_destroy(ctx); + return -1; + } + ctx->coll_fd_archive = ret; + + return 0; +} + +static int in_systemd_pre_run(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int n; + uint64_t val = 0xc002; + struct flb_systemd_config *ctx = in_context; + (void) ins; + (void) config; + + /* Insert a dummy event into the channel manager */ + n = write(ctx->ch_manager[1], &val, sizeof(val)); + if (n == -1) { + flb_errno(); + return -1; + } + + return n; +} + +static void in_systemd_pause(void *data, struct flb_config *config) +{ + int ret; + struct flb_systemd_config *ctx = data; + + flb_input_collector_pause(ctx->coll_fd_archive, ctx->ins); + + /* pause only if it's running */ + ret = flb_input_collector_running(ctx->coll_fd_journal, ctx->ins); + if (ret == FLB_TRUE) { + flb_input_collector_pause(ctx->coll_fd_journal, ctx->ins); + flb_input_collector_pause(ctx->coll_fd_pending, ctx->ins); + } +} + +static void in_systemd_resume(void *data, struct flb_config *config) +{ + int ret; + struct flb_systemd_config *ctx = data; + + flb_input_collector_resume(ctx->coll_fd_archive, ctx->ins); + + /* resume only if is not running */ + ret = flb_input_collector_running(ctx->coll_fd_journal, ctx->ins); + if (ret == FLB_FALSE) { + flb_input_collector_resume(ctx->coll_fd_journal, ctx->ins); + flb_input_collector_resume(ctx->coll_fd_pending, ctx->ins); + } +} + +static int in_systemd_exit(void *data, struct flb_config *config) +{ + (void) *config; + struct flb_systemd_config *ctx = data; + + flb_systemd_config_destroy(ctx); + return 0; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "path", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_systemd_config, path), + "Set the systemd journal path" + }, + { + FLB_CONFIG_MAP_INT, "max_fields", FLB_SYSTEMD_MAX_FIELDS, + 0, FLB_TRUE, offsetof(struct flb_systemd_config, max_fields), + "Set the maximum fields per notification" + }, + { + FLB_CONFIG_MAP_INT, "max_entries", FLB_SYSTEMD_MAX_ENTRIES, + 0, FLB_TRUE, offsetof(struct flb_systemd_config, max_entries), + "Set the maximum entries per notification" + }, + { + FLB_CONFIG_MAP_STR, "systemd_filter_type", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_systemd_config, filter_type), + "Set the systemd filter type to either 'and' or 'or'" + }, + { + FLB_CONFIG_MAP_STR, "systemd_filter", (char *)NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_systemd_config, systemd_filters), + "Add a systemd filter, can be set multiple times" + }, + { + FLB_CONFIG_MAP_BOOL, "read_from_tail", "false", + 0, FLB_TRUE, offsetof(struct flb_systemd_config, read_from_tail), + "Read the journal from the end (tail)" + }, + { + FLB_CONFIG_MAP_BOOL, "lowercase", "false", + 0, FLB_TRUE, offsetof(struct flb_systemd_config, lowercase), + "Lowercase the fields" + }, + { + FLB_CONFIG_MAP_BOOL, "strip_underscores", "false", + 0, FLB_TRUE, offsetof(struct flb_systemd_config, strip_underscores), + "Strip undersecores from fields" + }, +#ifdef FLB_HAVE_SQLDB + { + FLB_CONFIG_MAP_STR, "db.sync", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_systemd_config, db_sync_mode), + "Set the database sync mode: extra, full, normal or off" + }, + { + FLB_CONFIG_MAP_STR, "db", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_systemd_config, db_path), + "Set the database path" + }, +#endif /* FLB_HAVE_SQLDB */ + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_input_plugin in_systemd_plugin = { + .name = "systemd", + .description = "Systemd (Journal) reader", + .cb_init = in_systemd_init, + .cb_pre_run = in_systemd_pre_run, + .cb_flush_buf = NULL, + .cb_pause = in_systemd_pause, + .cb_resume = in_systemd_resume, + .cb_exit = in_systemd_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/fluent-bit/plugins/in_systemd/systemd_config.c b/fluent-bit/plugins/in_systemd/systemd_config.c new file mode 100644 index 00000000..57c13a85 --- /dev/null +++ b/fluent-bit/plugins/in_systemd/systemd_config.c @@ -0,0 +1,314 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_kv.h> + +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> + +#ifdef FLB_HAVE_SQLDB +#include "systemd_db.h" +#endif + +#include "systemd_config.h" + +struct flb_systemd_config *flb_systemd_config_create(struct flb_input_instance *ins, + struct flb_config *config) +{ + int ret; + const char *tmp; + char *cursor = NULL; + struct stat st; + struct mk_list *head; + struct flb_systemd_config *ctx; + int journal_filter_is_and; + size_t size; + struct flb_config_map_val *mv; + + + /* Allocate space for the configuration */ + ctx = flb_calloc(1, sizeof(struct flb_systemd_config)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; +#ifdef FLB_HAVE_SQLDB + ctx->db_sync = -1; +#endif + + /* Load the config_map */ + ret = flb_input_config_map_set(ins, (void *)ctx); + if (ret == -1) { + flb_plg_error(ins, "unable to load configuration"); + flb_free(config); + return NULL; + } + + /* Create the channel manager */ + ret = pipe(ctx->ch_manager); + if (ret == -1) { + flb_errno(); + flb_free(ctx); + return NULL; + } + + /* Config: path */ + if (ctx->path) { + ret = stat(ctx->path, &st); + if (ret == -1) { + flb_errno(); + flb_plg_error(ctx->ins, "given path %s is invalid", ctx->path); + flb_free(ctx); + return NULL; + } + + if (!S_ISDIR(st.st_mode)) { + flb_errno(); + flb_plg_error(ctx->ins, "given path is not a directory: %s", ctx->path); + flb_free(ctx); + return NULL; + } + } + else { + ctx->path = NULL; + } + + /* Open the Journal */ + if (ctx->path) { + ret = sd_journal_open_directory(&ctx->j, ctx->path, 0); + } + else { + ret = sd_journal_open(&ctx->j, SD_JOURNAL_LOCAL_ONLY); + } + if (ret != 0) { + flb_plg_error(ctx->ins, "could not open the Journal"); + flb_free(ctx); + return NULL; + } + ctx->fd = sd_journal_get_fd(ctx->j); + + /* Tag settings */ + tmp = strchr(ins->tag, '*'); + if (tmp) { + ctx->dynamic_tag = FLB_TRUE; + } + else { + ctx->dynamic_tag = FLB_FALSE; + } + +#ifdef FLB_HAVE_SQLDB + /* Database options (needs to be set before the context) */ + if (ctx->db_sync_mode) { + if (strcasecmp(ctx->db_sync_mode, "extra") == 0) { + ctx->db_sync = 3; + } + else if (strcasecmp(ctx->db_sync_mode, "full") == 0) { + ctx->db_sync = 2; + } + else if (strcasecmp(ctx->db_sync_mode, "normal") == 0) { + ctx->db_sync = 1; + } + else if (strcasecmp(ctx->db_sync_mode, "off") == 0) { + ctx->db_sync = 0; + } + else { + flb_plg_error(ctx->ins, "invalid database 'db.sync' value: %s", ctx->db_sync_mode); + } + } + + /* Database file */ + if (ctx->db_path) { + ctx->db = flb_systemd_db_open(ctx->db_path, ins, ctx, config); + if (!ctx->db) { + flb_plg_error(ctx->ins, "could not open/create database '%s'", ctx->db_path); + } + } + +#endif + + if (ctx->filter_type) { + if (strcasecmp(ctx->filter_type, "and") == 0) { + journal_filter_is_and = FLB_TRUE; + } + else if (strcasecmp(ctx->filter_type, "or") == 0) { + journal_filter_is_and = FLB_FALSE; + } + else { + flb_plg_error(ctx->ins, + "systemd_filter_type must be 'and' or 'or'. Got %s", + ctx->filter_type); + flb_free(ctx); + return NULL; + } + } + else { + journal_filter_is_and = FLB_FALSE; + } + + /* Load Systemd filters */ + if (ctx->systemd_filters) { + flb_config_map_foreach(head, mv, ctx->systemd_filters) { + flb_plg_debug(ctx->ins, "add filter: %s (%s)", mv->val.str, + journal_filter_is_and ? "and" : "or"); + ret = sd_journal_add_match(ctx->j, mv->val.str, 0); + if (ret < 0) { + if (ret == -EINVAL) { + flb_plg_error(ctx->ins, + "systemd_filter error: invalid input '%s'", + mv->val.str); + } + else { + flb_plg_error(ctx->ins, + "systemd_filter error: status=%d input '%s'", + ret, mv->val.str); + } + flb_systemd_config_destroy(ctx); + return NULL; + } + if (journal_filter_is_and) { + ret = sd_journal_add_conjunction(ctx->j); + if (ret < 0) { + flb_plg_error(ctx->ins, + "sd_journal_add_conjunction failed. ret=%d", + ret); + flb_systemd_config_destroy(ctx); + return NULL; + } + } + else { + ret = sd_journal_add_disjunction(ctx->j); + if (ret < 0) { + flb_plg_error(ctx->ins, + "sd_journal_add_disjunction failed. ret=%d", + ret); + flb_systemd_config_destroy(ctx); + return NULL; + } + } + } + } + + if (ctx->read_from_tail == FLB_TRUE) { + sd_journal_seek_tail(ctx->j); + /* + * Skip up to 350 records until the end of journal is found. + * Workaround for bug https://github.com/systemd/systemd/issues/9934 + * Due to the bug, sd_journal_next() returns 2 last records of each journal file. + * 4 GB is the default journal limit, so with 25 MB/file we may get + * up to 4096/25*2 ~= 350 old log messages. See also fluent-bit PR #1565. + */ + ret = sd_journal_next_skip(ctx->j, 350); + flb_plg_debug(ctx->ins, + "jump to the end of journal and skip %d last entries", ret); + } + else { + ret = sd_journal_seek_head(ctx->j); + } + +#ifdef FLB_HAVE_SQLDB + /* Check if we have a cursor in our database */ + if (ctx->db) { + /* Initialize prepared statement */ + ret = sqlite3_prepare_v2(ctx->db->handler, + SQL_UPDATE_CURSOR, + -1, + &ctx->stmt_cursor, + 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement"); + flb_systemd_config_destroy(ctx); + return NULL; + } + + /* Get current cursor */ + cursor = flb_systemd_db_get_cursor(ctx); + if (cursor) { + ret = sd_journal_seek_cursor(ctx->j, cursor); + if (ret == 0) { + flb_plg_info(ctx->ins, "seek_cursor=%.40s... OK", cursor); + + /* Skip the first entry, already processed */ + sd_journal_next_skip(ctx->j, 1); + } + else { + flb_plg_warn(ctx->ins, "seek_cursor failed"); + } + flb_free(cursor); + } + else { + /* Insert the first row */ + cursor = NULL; + flb_systemd_db_init_cursor(ctx, cursor); + if (cursor) { + flb_free(cursor); + } + } + } +#endif + + ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (ctx->log_encoder == NULL) { + flb_plg_error(ctx->ins, "could not initialize event encoder"); + flb_systemd_config_destroy(ctx); + + return NULL; + } + + + sd_journal_get_data_threshold(ctx->j, &size); + flb_plg_debug(ctx->ins, + "sd_journal library may truncate values " + "to sd_journal_get_data_threshold() bytes: %zu", size); + + return ctx; +} + +int flb_systemd_config_destroy(struct flb_systemd_config *ctx) +{ + if (ctx->log_encoder != NULL) { + flb_log_event_encoder_destroy(ctx->log_encoder); + + ctx->log_encoder = NULL; + } + + /* Close context */ + if (ctx->j) { + sd_journal_close(ctx->j); + } + +#ifdef FLB_HAVE_SQLDB + if (ctx->db) { + sqlite3_finalize(ctx->stmt_cursor); + flb_systemd_db_close(ctx->db); + } +#endif + + close(ctx->ch_manager[0]); + close(ctx->ch_manager[1]); + + flb_free(ctx); + return 0; +} diff --git a/fluent-bit/plugins/in_systemd/systemd_config.h b/fluent-bit/plugins/in_systemd/systemd_config.h new file mode 100644 index 00000000..db5c4cf5 --- /dev/null +++ b/fluent-bit/plugins/in_systemd/systemd_config.h @@ -0,0 +1,82 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_SYSTEMD_CONFIG_H +#define FLB_SYSTEMD_CONFIG_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_sqldb.h> +#include <fluent-bit/flb_log_event_encoder.h> + +#include <systemd/sd-journal.h> + +/* return values */ +#define FLB_SYSTEMD_ERROR -1 /* Systemd journal file read error. */ +#define FLB_SYSTEMD_NONE 0 +#define FLB_SYSTEMD_OK 1 +#define FLB_SYSTEMD_MORE 2 +#define FLB_SYSTEMD_BUSY 3 + +/* constants */ +#define FLB_SYSTEMD_UNIT "_SYSTEMD_UNIT" +#define FLB_SYSTEMD_UNKNOWN "unknown" +#define FLB_SYSTEMD_MAX_FIELDS "8000" +#define FLB_SYSTEMD_MAX_ENTRIES "5000" + +/* Input configuration & context */ +struct flb_systemd_config { + /* Journal */ + int fd; /* Journal file descriptor */ + sd_journal *j; /* Journal context */ + char *cursor; + flb_sds_t path; + flb_sds_t filter_type; /* sysytemd filter type: and|or */ + struct mk_list *systemd_filters; + int pending_records; + int read_from_tail; /* read_from_tail option */ + int lowercase; + int strip_underscores; + + /* Internal */ + int ch_manager[2]; /* pipe: channel manager */ + int coll_fd_archive; /* archive collector */ + int coll_fd_journal; /* journal, events mode */ + int coll_fd_pending; /* pending records */ + int dynamic_tag; + int max_fields; /* max number of fields per record */ + int max_entries; /* max number of records per iteration */ + +#ifdef FLB_HAVE_SQLDB + flb_sds_t db_path; + struct flb_sqldb *db; + flb_sds_t db_sync_mode; + int db_sync; + sqlite3_stmt *stmt_cursor; +#endif + struct flb_input_instance *ins; + struct flb_log_event_encoder *log_encoder; +}; + +struct flb_systemd_config *flb_systemd_config_create(struct flb_input_instance *i_ins, + struct flb_config *config); + +int flb_systemd_config_destroy(struct flb_systemd_config *ctx); +#endif diff --git a/fluent-bit/plugins/in_systemd/systemd_db.c b/fluent-bit/plugins/in_systemd/systemd_db.c new file mode 100644 index 00000000..0abc4d99 --- /dev/null +++ b/fluent-bit/plugins/in_systemd/systemd_db.c @@ -0,0 +1,197 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_sqldb.h> + +#include "systemd_config.h" +#include "systemd_db.h" + +struct query_status { + int rows; + char *cursor; + time_t updated; +}; + +static int cb_cursor_check(void *data, int argc, char **argv, char **cols) +{ + struct query_status *qs = data; + + qs->cursor = flb_strdup(argv[0]); /* cursor string */ + qs->updated = atoll(argv[1]); /* timestamp */ + qs->rows++; + + return 0; +} + +static int cb_count_check(void *data, int argc, char **argv, char **cols) +{ + struct query_status *qs = data; + + qs->rows = atoll(argv[0]); + return 0; +} + +/* sanitize database table if required */ +static void flb_systemd_db_sanitize(struct flb_sqldb *db, + struct flb_input_instance *ins) +{ + int ret; + struct query_status qs = {0}; + + memset(&qs, '\0', sizeof(qs)); + ret = flb_sqldb_query(db, + SQL_COUNT_CURSOR, cb_count_check, &qs); + if (ret != FLB_OK) { + flb_plg_error(ins, "db: failed counting number of rows"); + return; + } + + if (qs.rows > 1) { + flb_plg_warn(ins, + "db: table in_systemd_cursor looks corrupted, it has " + "more than one entry (rows=%i), the table content will be " + "fixed", qs.rows); + + /* Delete duplicates, we only preserve the last record based on it ROWID */ + ret = flb_sqldb_query(db, SQL_DELETE_DUPS, NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ins, "could not delete in_systemd_cursor duplicates"); + return; + } + flb_plg_info(ins, "table in_systemd_cursor has been fixed"); + } + +} + +struct flb_sqldb *flb_systemd_db_open(const char *path, + struct flb_input_instance *ins, + struct flb_systemd_config *ctx, + struct flb_config *config) +{ + int ret; + char tmp[64]; + struct flb_sqldb *db; + + /* Open/create the database */ + db = flb_sqldb_open(path, ins->name, config); + if (!db) { + return NULL; + } + + /* Create table schema if it don't exists */ + ret = flb_sqldb_query(db, SQL_CREATE_CURSOR, NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ins, "db: could not create 'cursor' table"); + flb_sqldb_close(db); + return NULL; + } + + if (ctx->db_sync >= 0) { + snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC, + ctx->db_sync); + ret = flb_sqldb_query(db, tmp, NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db could not set pragma 'sync'"); + flb_sqldb_close(db); + return NULL; + } + } + + flb_systemd_db_sanitize(db, ins); + + return db; +} + +int flb_systemd_db_close(struct flb_sqldb *db) +{ + flb_sqldb_close(db); + return 0; +} + +int flb_systemd_db_init_cursor(struct flb_systemd_config *ctx, const char *cursor) +{ + int ret; + char query[PATH_MAX]; + struct query_status qs = {0}; + + /* Check if the file exists */ + memset(&qs, '\0', sizeof(qs)); + ret = flb_sqldb_query(ctx->db, + SQL_GET_CURSOR, cb_cursor_check, &qs); + + if (ret != FLB_OK) { + return -1; + } + + if (qs.rows == 0) { + /* Register the cursor */ + snprintf(query, sizeof(query) - 1, + SQL_INSERT_CURSOR, + cursor, time(NULL)); + ret = flb_sqldb_query(ctx->db, + query, NULL, NULL); + if (ret == FLB_ERROR) { + return -1; + } + return 0; + } + + return -1; +} + +int flb_systemd_db_set_cursor(struct flb_systemd_config *ctx, const char *cursor) +{ + int ret; + + /* Bind parameters */ + sqlite3_bind_text(ctx->stmt_cursor, 1, (char *) cursor, -1, 0); + sqlite3_bind_int64(ctx->stmt_cursor, 2, time(NULL)); + + ret = sqlite3_step(ctx->stmt_cursor); + + sqlite3_clear_bindings(ctx->stmt_cursor); + sqlite3_reset(ctx->stmt_cursor); + + if (ret != SQLITE_DONE) { + return -1; + } + return 0; +} + +char *flb_systemd_db_get_cursor(struct flb_systemd_config *ctx) +{ + int ret; + struct query_status qs = {0}; + + memset(&qs, '\0', sizeof(qs)); + ret = flb_sqldb_query(ctx->db, + SQL_GET_CURSOR, cb_cursor_check, &qs); + if (ret != FLB_OK) { + return NULL; + } + + if (qs.rows > 0) { + /* cursor must be freed by the caller */ + return qs.cursor; + } + + return NULL; +} diff --git a/fluent-bit/plugins/in_systemd/systemd_db.h b/fluent-bit/plugins/in_systemd/systemd_db.h new file mode 100644 index 00000000..da8d6e19 --- /dev/null +++ b/fluent-bit/plugins/in_systemd/systemd_db.h @@ -0,0 +1,64 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_SYSTEMD_DB_H +#define FLB_SYSTEMD_DB_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_config.h> + +#include "systemd_config.h" + +#define SQL_CREATE_CURSOR \ + "CREATE TABLE IF NOT EXISTS in_systemd_cursor (" \ + " cursor TEXT NOT NULL," \ + " updated INTEGER" \ + ");" + +#define SQL_GET_CURSOR \ + "SELECT * FROM in_systemd_cursor LIMIT 1;" + +#define SQL_INSERT_CURSOR \ + "INSERT INTO in_systemd_cursor (cursor, updated)" \ + " VALUES ('%s', %lu);" + +#define SQL_COUNT_CURSOR \ + "SELECT COUNT(*) FROM in_systemd_cursor;" + +#define SQL_UPDATE_CURSOR \ + "UPDATE in_systemd_cursor SET cursor=@cursor, updated=@updated;" + +#define SQL_DELETE_DUPS \ + "DELETE FROM in_systemd_cursor WHERE ROWID < " \ + "(SELECT MAX(ROWID) FROM in_systemd_cursor);" + +#define SQL_PRAGMA_SYNC \ + "PRAGMA synchronous=%i;" + +struct flb_sqldb *flb_systemd_db_open(const char *path, + struct flb_input_instance *ins, + struct flb_systemd_config *ctx, + struct flb_config *config); +int flb_systemd_db_close(struct flb_sqldb *db); +int flb_systemd_db_init_cursor(struct flb_systemd_config *ctx, const char *cursor); +int flb_systemd_db_set_cursor(struct flb_systemd_config *ctx, const char *cursor); +char *flb_systemd_db_get_cursor(struct flb_systemd_config *ctx); + +#endif |