diff options
Diffstat (limited to 'src/fluent-bit/plugins/in_systemd/systemd_db.c')
-rw-r--r-- | src/fluent-bit/plugins/in_systemd/systemd_db.c | 197 |
1 files changed, 197 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/in_systemd/systemd_db.c b/src/fluent-bit/plugins/in_systemd/systemd_db.c new file mode 100644 index 000000000..0abc4d999 --- /dev/null +++ b/src/fluent-bit/plugins/in_systemd/systemd_db.c @@ -0,0 +1,197 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_sqldb.h> + +#include "systemd_config.h" +#include "systemd_db.h" + +struct query_status { + int rows; + char *cursor; + time_t updated; +}; + +static int cb_cursor_check(void *data, int argc, char **argv, char **cols) +{ + struct query_status *qs = data; + + qs->cursor = flb_strdup(argv[0]); /* cursor string */ + qs->updated = atoll(argv[1]); /* timestamp */ + qs->rows++; + + return 0; +} + +static int cb_count_check(void *data, int argc, char **argv, char **cols) +{ + struct query_status *qs = data; + + qs->rows = atoll(argv[0]); + return 0; +} + +/* sanitize database table if required */ +static void flb_systemd_db_sanitize(struct flb_sqldb *db, + struct flb_input_instance *ins) +{ + int ret; + struct query_status qs = {0}; + + memset(&qs, '\0', sizeof(qs)); + ret = flb_sqldb_query(db, + SQL_COUNT_CURSOR, cb_count_check, &qs); + if (ret != FLB_OK) { + flb_plg_error(ins, "db: failed counting number of rows"); + return; + } + + if (qs.rows > 1) { + flb_plg_warn(ins, + "db: table in_systemd_cursor looks corrupted, it has " + "more than one entry (rows=%i), the table content will be " + "fixed", qs.rows); + + /* Delete duplicates, we only preserve the last record based on it ROWID */ + ret = flb_sqldb_query(db, SQL_DELETE_DUPS, NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ins, "could not delete in_systemd_cursor duplicates"); + return; + } + flb_plg_info(ins, "table in_systemd_cursor has been fixed"); + } + +} + +struct flb_sqldb *flb_systemd_db_open(const char *path, + struct flb_input_instance *ins, + struct flb_systemd_config *ctx, + struct flb_config *config) +{ + int ret; + char tmp[64]; + struct flb_sqldb *db; + + /* Open/create the database */ + db = flb_sqldb_open(path, ins->name, config); + if (!db) { + return NULL; + } + + /* Create table schema if it don't exists */ + ret = flb_sqldb_query(db, SQL_CREATE_CURSOR, NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ins, "db: could not create 'cursor' table"); + flb_sqldb_close(db); + return NULL; + } + + if (ctx->db_sync >= 0) { + snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC, + ctx->db_sync); + ret = flb_sqldb_query(db, tmp, NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db could not set pragma 'sync'"); + flb_sqldb_close(db); + return NULL; + } + } + + flb_systemd_db_sanitize(db, ins); + + return db; +} + +int flb_systemd_db_close(struct flb_sqldb *db) +{ + flb_sqldb_close(db); + return 0; +} + +int flb_systemd_db_init_cursor(struct flb_systemd_config *ctx, const char *cursor) +{ + int ret; + char query[PATH_MAX]; + struct query_status qs = {0}; + + /* Check if the file exists */ + memset(&qs, '\0', sizeof(qs)); + ret = flb_sqldb_query(ctx->db, + SQL_GET_CURSOR, cb_cursor_check, &qs); + + if (ret != FLB_OK) { + return -1; + } + + if (qs.rows == 0) { + /* Register the cursor */ + snprintf(query, sizeof(query) - 1, + SQL_INSERT_CURSOR, + cursor, time(NULL)); + ret = flb_sqldb_query(ctx->db, + query, NULL, NULL); + if (ret == FLB_ERROR) { + return -1; + } + return 0; + } + + return -1; +} + +int flb_systemd_db_set_cursor(struct flb_systemd_config *ctx, const char *cursor) +{ + int ret; + + /* Bind parameters */ + sqlite3_bind_text(ctx->stmt_cursor, 1, (char *) cursor, -1, 0); + sqlite3_bind_int64(ctx->stmt_cursor, 2, time(NULL)); + + ret = sqlite3_step(ctx->stmt_cursor); + + sqlite3_clear_bindings(ctx->stmt_cursor); + sqlite3_reset(ctx->stmt_cursor); + + if (ret != SQLITE_DONE) { + return -1; + } + return 0; +} + +char *flb_systemd_db_get_cursor(struct flb_systemd_config *ctx) +{ + int ret; + struct query_status qs = {0}; + + memset(&qs, '\0', sizeof(qs)); + ret = flb_sqldb_query(ctx->db, + SQL_GET_CURSOR, cb_cursor_check, &qs); + if (ret != FLB_OK) { + return NULL; + } + + if (qs.rows > 0) { + /* cursor must be freed by the caller */ + return qs.cursor; + } + + return NULL; +} |