diff options
Diffstat (limited to 'database/engine/metadata_log')
-rw-r--r-- | database/engine/metadata_log/Makefile.am | 8 | ||||
-rw-r--r-- | database/engine/metadata_log/README.md | 0 | ||||
-rw-r--r-- | database/engine/metadata_log/compaction.c | 86 | ||||
-rw-r--r-- | database/engine/metadata_log/compaction.h | 14 | ||||
-rw-r--r-- | database/engine/metadata_log/logfile.c | 453 | ||||
-rw-r--r-- | database/engine/metadata_log/logfile.h | 39 | ||||
-rw-r--r-- | database/engine/metadata_log/metadatalog.h | 28 | ||||
-rwxr-xr-x | database/engine/metadata_log/metadatalogapi.c | 39 | ||||
-rw-r--r-- | database/engine/metadata_log/metadatalogapi.h | 12 | ||||
-rw-r--r-- | database/engine/metadata_log/metadatalogprotocol.h | 53 | ||||
-rwxr-xr-x | database/engine/metadata_log/metalogpluginsd.c | 140 | ||||
-rw-r--r-- | database/engine/metadata_log/metalogpluginsd.h | 33 |
12 files changed, 905 insertions, 0 deletions
diff --git a/database/engine/metadata_log/Makefile.am b/database/engine/metadata_log/Makefile.am new file mode 100644 index 0000000..161784b --- /dev/null +++ b/database/engine/metadata_log/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/database/engine/metadata_log/README.md b/database/engine/metadata_log/README.md new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/database/engine/metadata_log/README.md diff --git a/database/engine/metadata_log/compaction.c b/database/engine/metadata_log/compaction.c new file mode 100644 index 0000000..ba19e1e --- /dev/null +++ b/database/engine/metadata_log/compaction.c @@ -0,0 +1,86 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "metadatalog.h" + +/* Return 0 on success. */ +int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles, + unsigned *matched_files) +{ + int ret; + unsigned starting_fileno, fileno, i, j, recovered_files; + struct metadata_logfile *metalogfile = NULL, *compactionfile = NULL, **tmp_metalogfiles; + char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; + + for (i = 0 ; i < *matched_files ; ++i) { + metalogfile = metalogfiles[i]; + if (0 == metalogfile->starting_fileno) + continue; /* skip standard metadata log files */ + break; /* this is a compaction temporary file */ + } + if (i == *matched_files) /* no recovery needed */ + return 0; + info("Starting metadata log file failure recovery procedure in \"%s\".", dbfiles_path); + + if (*matched_files - i > 1) { /* Can't have more than 1 temporary compaction files */ + error("Metadata log files are in an invalid state. Cannot proceed."); + return 1; + } + compactionfile = metalogfile; + starting_fileno = compactionfile->starting_fileno; + fileno = compactionfile->fileno; + /* scratchpad space to move file pointers around */ + tmp_metalogfiles = callocz(*matched_files, sizeof(*tmp_metalogfiles)); + + for (j = 0, recovered_files = 0 ; j < i ; ++j) { + metalogfile = metalogfiles[j]; + fatal_assert(0 == metalogfile->starting_fileno); + if (metalogfile->fileno < starting_fileno) { + tmp_metalogfiles[recovered_files++] = metalogfile; + continue; + } + break; /* reached compaction file serial number */ + } + + if ((j == i) /* Shouldn't be possible, invalid compaction temporary file */ || + (metalogfile->fileno == starting_fileno && metalogfile->fileno == fileno)) { + error("Deleting invalid compaction temporary file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL + METALOG_EXTENSION"\"", dbfiles_path, starting_fileno, fileno); + unlink_metadata_logfile(compactionfile); + freez(compactionfile); + freez(tmp_metalogfiles); + --*matched_files; /* delete the last one */ + + info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path); + return 0; + } + + for ( ; j < i ; ++j) { /* continue iterating through normal metadata log files */ + metalogfile = metalogfiles[j]; + fatal_assert(0 == metalogfile->starting_fileno); + if (metalogfile->fileno < fileno) { /* It has already been compacted */ + error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL + METALOG_EXTENSION"\"", dbfiles_path, 0U, metalogfile->fileno); + unlink_metadata_logfile(metalogfile); + freez(metalogfile); + continue; + } + tmp_metalogfiles[recovered_files++] = metalogfile; + } + + /* compaction temporary file is valid */ + tmp_metalogfiles[recovered_files++] = compactionfile; + ret = rename_metadata_logfile(compactionfile, 0, starting_fileno); + if (ret < 0) { + error("Cannot rename temporary compaction files. Cannot proceed."); + freez(tmp_metalogfiles); + return 1; + } + + memcpy(metalogfiles, tmp_metalogfiles, recovered_files * sizeof(*metalogfiles)); + *matched_files = recovered_files; + freez(tmp_metalogfiles); + + info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path); + return 0; +} diff --git a/database/engine/metadata_log/compaction.h b/database/engine/metadata_log/compaction.h new file mode 100644 index 0000000..d046134 --- /dev/null +++ b/database/engine/metadata_log/compaction.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_COMPACTION_H +#define NETDATA_COMPACTION_H + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include "../rrdengine.h" + +extern int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles, + unsigned *matched_files); + +#endif /* NETDATA_COMPACTION_H */ diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c new file mode 100644 index 0000000..b7c5c06 --- /dev/null +++ b/database/engine/metadata_log/logfile.c @@ -0,0 +1,453 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include <database/sqlite/sqlite_functions.h> +#include "metadatalog.h" +#include "metalogpluginsd.h" + + +void generate_metadata_logfile_path(struct metadata_logfile *metalogfile, char *str, size_t maxlen) +{ + (void) snprintf(str, maxlen, "%s/" METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION, + metalogfile->ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno); +} + +void metadata_logfile_init(struct metadata_logfile *metalogfile, struct metalog_instance *ctx, unsigned starting_fileno, + unsigned fileno) +{ + metalogfile->starting_fileno = starting_fileno; + metalogfile->fileno = fileno; + metalogfile->file = (uv_file)0; + metalogfile->pos = 0; + metalogfile->next = NULL; + metalogfile->ctx = ctx; +} + +int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, unsigned new_fileno) +{ + //struct metalog_instance *ctx = metalogfile->ctx; + uv_fs_t req; + int ret; + char oldpath[RRDENG_PATH_MAX], newpath[RRDENG_PATH_MAX]; + unsigned backup_starting_fileno, backup_fileno; + + backup_starting_fileno = metalogfile->starting_fileno; + backup_fileno = metalogfile->fileno; + generate_metadata_logfile_path(metalogfile, oldpath, sizeof(oldpath)); + metalogfile->starting_fileno = new_starting_fileno; + metalogfile->fileno = new_fileno; + generate_metadata_logfile_path(metalogfile, newpath, sizeof(newpath)); + + info("Renaming metadata log file \"%s\" to \"%s\".", oldpath, newpath); + ret = uv_fs_rename(NULL, &req, oldpath, newpath, NULL); + if (ret < 0) { + error("uv_fs_rename(%s): %s", oldpath, uv_strerror(ret)); + //++ctx->stats.fs_errors; /* this is racy, may miss some errors */ + rrd_stat_atomic_add(&global_fs_errors, 1); + /* restore previous values */ + metalogfile->starting_fileno = backup_starting_fileno; + metalogfile->fileno = backup_fileno; + } + uv_fs_req_cleanup(&req); + + return ret; +} + +int unlink_metadata_logfile(struct metadata_logfile *metalogfile) +{ + //struct metalog_instance *ctx = metalogfile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_metadata_logfile_path(metalogfile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + +static int check_metadata_logfile_superblock(uv_file file) +{ + int ret; + struct rrdeng_metalog_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + error("uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_METALOG_MAGIC, RRDENG_MAGIC_SZ)) { + error("File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + if (superblock->version > RRDENG_METALOG_VER) { + error("File has unknown version %"PRIu16". Compatibility is not guaranteed.", superblock->version); + } +error: + free(superblock); + return ret; +} + +void replay_record(struct metadata_logfile *metalogfile, struct rrdeng_metalog_record_header *header, void *payload) +{ + struct metalog_instance *ctx = metalogfile->ctx; + char *line, *nextline, *record_end; + int ret; + + debug(D_METADATALOG, "RECORD contents: %.*s", (int)header->payload_length, (char *)payload); + record_end = (char *)payload + header->payload_length - 1; + *record_end = '\0'; + + for (line = payload ; line ; line = nextline) { + nextline = strchr(line, '\n'); + if (nextline) { + *nextline++ = '\0'; + } + ret = parser_action(ctx->metalog_parser_object->parser, line); + debug(D_METADATALOG, "parser_action ret:%d", ret); + if (ret) + return; /* skip record due to error */ + }; +} + +/* This function only works with buffered I/O */ +static inline int metalogfile_read(struct metadata_logfile *metalogfile, void *buf, size_t len, uint64_t offset) +{ +// struct metalog_instance *ctx; + uv_file file; + uv_buf_t iov; + uv_fs_t req; + int ret; + +// ctx = metalogfile->ctx; + file = metalogfile->file; + iov = uv_buf_init(buf, len); + ret = uv_fs_read(NULL, &req, file, &iov, 1, offset, NULL); + if (unlikely(ret < 0 && ret != req.result)) { + fatal("uv_fs_read: %s", uv_strerror(ret)); + } + if (req.result < 0) { +// ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + error("%s: uv_fs_read - %s - record at offset %"PRIu64"(%u) in metadata logfile %u-%u.", __func__, + uv_strerror((int)req.result), offset, (unsigned)len, metalogfile->starting_fileno, metalogfile->fileno); + } + uv_fs_req_cleanup(&req); +// ctx->stats.io_read_bytes += len; +// ++ctx->stats.io_read_requests; + + return ret; +} + +/* Return 0 on success */ +static int metadata_record_integrity_check(void *record) +{ + int ret; + uint32_t data_size; + struct rrdeng_metalog_record_header *header; + struct rrdeng_metalog_record_trailer *trailer; + uLong crc; + + header = record; + data_size = header->header_length + header->payload_length; + trailer = record + data_size; + + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, record, data_size); + ret = crc32cmp(trailer->checksum, crc); + + return ret; +} + +#define MAX_READ_BYTES (RRDENG_BLOCK_SIZE * 32) /* no record should be over 128KiB in this version */ + +/* + * Iterates metadata log file records and creates database objects (host/chart/dimension) + */ +static void iterate_records(struct metadata_logfile *metalogfile) +{ + uint32_t file_size, pos, bytes_remaining, record_size; + void *buf; + struct rrdeng_metalog_record_header *header; + struct metalog_instance *ctx = metalogfile->ctx; + struct metalog_pluginsd_state *state = ctx->metalog_parser_object->private; + const size_t min_header_size = offsetof(struct rrdeng_metalog_record_header, header_length) + + sizeof(header->header_length); + + file_size = metalogfile->pos; + state->metalogfile = metalogfile; + + buf = mallocz(MAX_READ_BYTES); + + for (pos = sizeof(struct rrdeng_metalog_sb) ; pos < file_size ; pos += record_size) { + bytes_remaining = file_size - pos; + if (bytes_remaining < min_header_size) { + error("%s: unexpected end of file in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, + metalogfile->fileno); + break; + } + if (metalogfile_read(metalogfile, buf, min_header_size, pos) < 0) + break; + header = (struct rrdeng_metalog_record_header *)buf; + if (METALOG_STORE_PADDING == header->type) { + info("%s: Skipping padding in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, + metalogfile->fileno); + record_size = ALIGN_BYTES_FLOOR(pos + RRDENG_BLOCK_SIZE) - pos; + continue; + } + if (metalogfile_read(metalogfile, buf + min_header_size, sizeof(*header) - min_header_size, + pos + min_header_size) < 0) + break; + record_size = header->header_length + header->payload_length + sizeof(struct rrdeng_metalog_record_trailer); + if (header->header_length < min_header_size || record_size > bytes_remaining) { + error("%s: Corrupted record in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, + metalogfile->fileno); + break; + } + if (record_size > MAX_READ_BYTES) { + error("%s: Record is too long (%u bytes) in metadata logfile %u-%u.", __func__, record_size, + metalogfile->starting_fileno, metalogfile->fileno); + continue; + } + if (metalogfile_read(metalogfile, buf + sizeof(*header), record_size - sizeof(*header), + pos + sizeof(*header)) < 0) + break; + if (metadata_record_integrity_check(buf)) { + error("%s: Record at offset %"PRIu32" was read from disk. CRC32 check: FAILED", __func__, pos); + continue; + } + debug(D_METADATALOG, "%s: Record at offset %"PRIu32" was read from disk. CRC32 check: SUCCEEDED", __func__, + pos); + + replay_record(metalogfile, header, buf + header->header_length); + } + + freez(buf); +} + +int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *metalogfile) +{ + UNUSED(ctx); + uv_fs_t req; + uv_file file; + int ret, fd, error; + uint64_t file_size; + char path[RRDENG_PATH_MAX]; + + generate_metadata_logfile_path(metalogfile, path, sizeof(path)); + if (file_is_migrated(path)) + return 0; + + fd = open_file_buffered_io(path, O_RDWR, &file); + if (fd < 0) { +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; + } + info("Loading metadata log \"%s\".", path); + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_metalog_sb)); + if (ret) + goto error; + + ret = check_metadata_logfile_superblock(file); + if (ret) + goto error; +// ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb); +// ++ctx->stats.io_read_requests; + + metalogfile->file = file; + metalogfile->pos = file_size; + + iterate_records(metalogfile); + + info("Metadata log \"%s\" migrated to the database (size:%"PRIu64").", path, file_size); + add_migrated_file(path, file_size); + return 0; + +error: + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + return error; +} + +static int scan_metalog_files_cmp(const void *a, const void *b) +{ + struct metadata_logfile *file1, *file2; + char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; + + file1 = *(struct metadata_logfile **)a; + file2 = *(struct metadata_logfile **)b; + generate_metadata_logfile_path(file1, path1, sizeof(path1)); + generate_metadata_logfile_path(file2, path2, sizeof(path2)); + return strcmp(path1, path2); +} + +/* Returns number of metadata logfiles that were loaded or < 0 on error */ +static int scan_metalog_files(struct metalog_instance *ctx) +{ + int ret; + unsigned starting_no, no, matched_files, i, failed_to_load; + static uv_fs_t req; + uv_dirent_t dent; + struct metadata_logfile **metalogfiles, *metalogfile; + char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; + + ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret)); +// ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return ret; + } + info("Found %d files in path %s", ret, dbfiles_path); + + metalogfiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*metalogfiles)); + for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { + info("Scanning file \"%s/%s\"", dbfiles_path, dent.name); + ret = sscanf(dent.name, METALOG_PREFIX METALOG_FILE_NUMBER_SCAN_TMPL METALOG_EXTENSION, &starting_no, &no); + if (2 == ret) { + info("Matched file \"%s/%s\"", dbfiles_path, dent.name); + metalogfile = mallocz(sizeof(*metalogfile)); + metadata_logfile_init(metalogfile, ctx, starting_no, no); + metalogfiles[matched_files++] = metalogfile; + } + } + uv_fs_req_cleanup(&req); + + if (0 == matched_files) { + freez(metalogfiles); + return 0; + } + if (matched_files == MAX_DATAFILES) { + error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); + } + qsort(metalogfiles, matched_files, sizeof(*metalogfiles), scan_metalog_files_cmp); + ret = compaction_failure_recovery(ctx, metalogfiles, &matched_files); + if (ret) { /* If the files are corrupted fail */ + for (i = 0 ; i < matched_files ; ++i) { + freez(metalogfiles[i]); + } + freez(metalogfiles); + return UV_EINVAL; + } + //ctx->last_fileno = metalogfiles[matched_files - 1]->fileno; + + struct plugind cd = { + .enabled = 1, + .update_every = 0, + .pid = 0, + .serial_failures = 0, + .successful_collections = 0, + .obsolete = 0, + .started_t = INVALID_TIME, + .next = NULL, + .version = 0, + }; + + struct metalog_pluginsd_state metalog_parser_state; + metalog_pluginsd_state_init(&metalog_parser_state, ctx); + + PARSER_USER_OBJECT metalog_parser_object; + metalog_parser_object.enabled = cd.enabled; + metalog_parser_object.host = ctx->rrdeng_ctx->host; + metalog_parser_object.cd = &cd; + metalog_parser_object.trust_durations = 0; + metalog_parser_object.private = &metalog_parser_state; + + PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT); + if (unlikely(!parser)) { + error("Failed to initialize metadata log parser."); + failed_to_load = matched_files; + goto after_failed_to_parse; + } + parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host); + parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context); + parser_add_keyword(parser, PLUGINSD_KEYWORD_TOMBSTONE, pluginsd_tombstone); + parser->plugins_action->dimension_action = &metalog_pluginsd_dimension_action; + parser->plugins_action->chart_action = &metalog_pluginsd_chart_action; + parser->plugins_action->guid_action = &metalog_pluginsd_guid_action; + parser->plugins_action->context_action = &metalog_pluginsd_context_action; + parser->plugins_action->tombstone_action = &metalog_pluginsd_tombstone_action; + parser->plugins_action->host_action = &metalog_pluginsd_host_action; + + + metalog_parser_object.parser = parser; + ctx->metalog_parser_object = &metalog_parser_object; + + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { + metalogfile = metalogfiles[i]; + db_lock(); + db_execute("BEGIN TRANSACTION;"); + ret = load_metadata_logfile(ctx, metalogfile); + if (0 != ret) { + error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL + METALOG_EXTENSION"\"", dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno); + unlink_metadata_logfile(metalogfile); + ++failed_to_load; + db_execute("ROLLBACK TRANSACTION;"); + } + else + db_execute("COMMIT TRANSACTION;"); + db_unlock(); + freez(metalogfile); + } + matched_files -= failed_to_load; + debug(D_METADATALOG, "PARSER ended"); + + parser_destroy(parser); + + size_t count __maybe_unused = metalog_parser_object.count; + + debug(D_METADATALOG, "Parsing count=%u", (unsigned)count); +after_failed_to_parse: + + freez(metalogfiles); + + return matched_files; +} + +/* Return 0 on success. */ +int init_metalog_files(struct metalog_instance *ctx) +{ + int ret; + char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; + + ret = scan_metalog_files(ctx); + if (ret < 0) { + error("Failed to scan path \"%s\".", dbfiles_path); + return ret; + }/* else if (0 == ret) { + ctx->last_fileno = 1; + }*/ + + return 0; +} diff --git a/database/engine/metadata_log/logfile.h b/database/engine/metadata_log/logfile.h new file mode 100644 index 0000000..df12ac7 --- /dev/null +++ b/database/engine/metadata_log/logfile.h @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_LOGFILE_H +#define NETDATA_LOGFILE_H + +#include "metadatalogprotocol.h" +#include "../rrdengine.h" + +/* Forward declarations */ +struct metadata_logfile; +struct metalog_worker_config; + +#define METALOG_PREFIX "metadatalog-" +#define METALOG_EXTENSION ".mlf" + +/* only one event loop is supported for now */ +struct metadata_logfile { + unsigned fileno; /* Starts at 1 */ + unsigned starting_fileno; /* 0 for normal files, staring number during compaction */ + uv_file file; + uint64_t pos; + struct metalog_instance *ctx; + struct metadata_logfile *next; +}; + +struct metadata_logfile_list { + struct metadata_logfile *first; /* oldest */ + struct metadata_logfile *last; /* newest */ +}; + +extern void generate_metadata_logfile_path(struct metadata_logfile *metadatalog, char *str, size_t maxlen); +extern int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, + unsigned new_fileno); +extern int unlink_metadata_logfile(struct metadata_logfile *metalogfile); +extern int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *logfile); +extern int init_metalog_files(struct metalog_instance *ctx); + + +#endif /* NETDATA_LOGFILE_H */ diff --git a/database/engine/metadata_log/metadatalog.h b/database/engine/metadata_log/metadatalog.h new file mode 100644 index 0000000..b484686 --- /dev/null +++ b/database/engine/metadata_log/metadatalog.h @@ -0,0 +1,28 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METADATALOG_H +#define NETDATA_METADATALOG_H + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include "../rrdengine.h" +#include "metadatalogprotocol.h" +#include "logfile.h" +#include "metadatalogapi.h" +#include "compaction.h" + +/* Forward declerations */ +struct metalog_instance; +struct parser_user_object; + +#define METALOG_FILE_NUMBER_SCAN_TMPL "%5u-%5u" +#define METALOG_FILE_NUMBER_PRINT_TMPL "%5.5u-%5.5u" + +struct metalog_instance { + struct rrdengine_instance *rrdeng_ctx; + struct parser_user_object *metalog_parser_object; + uint8_t initialized; /* set to 1 to mark context initialized */ +}; + +#endif /* NETDATA_METADATALOG_H */ diff --git a/database/engine/metadata_log/metadatalogapi.c b/database/engine/metadata_log/metadatalogapi.c new file mode 100755 index 0000000..b206cca --- /dev/null +++ b/database/engine/metadata_log/metadatalogapi.c @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "metadatalog.h" + +/* + * Returns 0 on success, negative on error + */ +int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx) +{ + struct metalog_instance *ctx; + int error; + + ctx = callocz(1, sizeof(*ctx)); + ctx->initialized = 0; + rrdeng_parent_ctx->metalog_ctx = ctx; + + ctx->rrdeng_ctx = rrdeng_parent_ctx; + error = init_metalog_files(ctx); + if (error) { + goto error_after_init_rrd_files; + } + ctx->initialized = 1; /* notify dbengine that the metadata log has finished initializing */ + return 0; + +error_after_init_rrd_files: + freez(ctx); + return UV_EIO; +} + +/* This function is called by dbengine rotation logic when the metric has no writers */ +void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid) +{ + uuid_t multihost_uuid; + + delete_dimension_uuid(metric_uuid); + rrdeng_convert_legacy_uuid_to_multihost(ctx->rrdeng_ctx->machine_guid, metric_uuid, &multihost_uuid); + delete_dimension_uuid(&multihost_uuid); +} diff --git a/database/engine/metadata_log/metadatalogapi.h b/database/engine/metadata_log/metadatalogapi.h new file mode 100644 index 0000000..d558b93 --- /dev/null +++ b/database/engine/metadata_log/metadatalogapi.h @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METADATALOGAPI_H +#define NETDATA_METADATALOGAPI_H + +extern void metalog_commit_delete_chart(RRDSET *st); +extern void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid); + +/* must call once before using anything */ +extern int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx); + +#endif /* NETDATA_METADATALOGAPI_H */ diff --git a/database/engine/metadata_log/metadatalogprotocol.h b/database/engine/metadata_log/metadatalogprotocol.h new file mode 100644 index 0000000..1017213 --- /dev/null +++ b/database/engine/metadata_log/metadatalogprotocol.h @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METADATALOGPROTOCOL_H +#define NETDATA_METADATALOGPROTOCOL_H + +#include "../rrddiskprotocol.h" + +#define RRDENG_METALOG_MAGIC "netdata-metadata-log" + +#define RRDENG_METALOG_VER (1) + +#define RRDENG_METALOG_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + sizeof(uint16_t))) +/* + * Metadata log persistent super-block + */ +struct rrdeng_metalog_sb { + char magic_number[RRDENG_MAGIC_SZ]; + uint16_t version; + uint8_t padding[RRDENG_METALOG_SB_PADDING_SZ]; +} __attribute__ ((packed)); + +/* + * Metadata log record types + */ +#define METALOG_STORE_PADDING (0) +#define METALOG_CREATE_OBJECT (1) +#define METALOG_DELETE_OBJECT (2) +#define METALOG_OTHER (3) /* reserved */ + +/* + * Metadata log record header + */ +struct rrdeng_metalog_record_header { + /* when set to METALOG_STORE_PADDING jump to start of next block */ + uint8_t type; + + uint16_t header_length; + uint32_t payload_length; + /****************************************************** + * No fields above this point can ever change. * + ****************************************************** + * All fields below this point are subject to change. * + ******************************************************/ +} __attribute__ ((packed)); + +/* + * Metadata log record trailer + */ +struct rrdeng_metalog_record_trailer { + uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */ +} __attribute__ ((packed)); + +#endif /* NETDATA_METADATALOGPROTOCOL_H */ diff --git a/database/engine/metadata_log/metalogpluginsd.c b/database/engine/metadata_log/metalogpluginsd.c new file mode 100755 index 0000000..88c1453 --- /dev/null +++ b/database/engine/metadata_log/metalogpluginsd.c @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "metadatalog.h" +#include "metalogpluginsd.h" + +extern struct config stream_config; + +PARSER_RC metalog_pluginsd_host_action( + void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, + char *tags) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + + RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0); + if (host) { + if (unlikely(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)) { + error("Archived host '%s' has memory mode '%s', but the archived one is '%s'. Ignoring archived state.", + host->hostname, rrd_memory_mode_name(host->rrd_memory_mode), + rrd_memory_mode_name(RRD_MEMORY_MODE_DBENGINE)); + ((PARSER_USER_OBJECT *) user)->host = NULL; /* Ignore objects if memory mode is not dbengine */ + } + ((PARSER_USER_OBJECT *) user)->host = host; + return PARSER_RC_OK; + } + + if (strcmp(machine_guid, registry_get_this_machine_guid()) == 0) { + ((PARSER_USER_OBJECT *) user)->host = host; + return PARSER_RC_OK; + } + + if (likely(!uuid_parse(machine_guid, state->host_uuid))) { + int rc = sql_store_host(&state->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags); + if (unlikely(rc)) { + errno = 0; + error("Failed to store host %s with UUID %s in the database", hostname, machine_guid); + } + } + else { + errno = 0; + error("Host machine GUID %s is not valid", machine_guid); + } + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context, + char *title, char *units, char *plugin, char *module, int priority, + int update_every, RRDSET_TYPE chart_type, char *options) +{ + UNUSED(options); + + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; + + if (unlikely(uuid_is_null(state->host_uuid))) { + debug(D_METADATALOG, "Ignoring chart belonging to missing or ignored host."); + return PARSER_RC_OK; + } + uuid_copy(state->chart_uuid, state->uuid); + uuid_clear(state->uuid); /* Consume UUID */ + (void) sql_store_chart(&state->chart_uuid, &state->host_uuid, + type, id, name, family, context, title, units, + plugin, module, priority, update_every, + chart_type, RRD_MEMORY_MODE_DBENGINE, host ? host->rrd_history_entries : 1); + ((PARSER_USER_OBJECT *)user)->st_exists = 1; + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, + long multiplier, long divisor, char *options, RRD_ALGORITHM algorithm_type) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + UNUSED(user); + UNUSED(options); + UNUSED(algorithm); + UNUSED(st); + + if (unlikely(uuid_is_null(state->chart_uuid))) { + debug(D_METADATALOG, "Ignoring dimension belonging to missing or ignored chart."); + info("Ignoring dimension belonging to missing or ignored chart."); + return PARSER_RC_OK; + } + + if (unlikely(uuid_is_null(state->uuid))) { + debug(D_METADATALOG, "Ignoring dimension without unknown UUID"); + info("Ignoring dimension without unknown UUID"); + return PARSER_RC_OK; + } + + (void) sql_store_dimension(&state->uuid, &state->chart_uuid, id, name, multiplier, divisor, algorithm_type); + uuid_clear(state->uuid); /* Consume UUID */ + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + + uuid_copy(state->uuid, *uuid); + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid) +{ + struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; + + int rc = find_uuid_type(uuid); + + if (rc == 1) { + uuid_copy(state->host_uuid, *uuid); + ((PARSER_USER_OBJECT *)user)->st_exists = 0; + ((PARSER_USER_OBJECT *)user)->host_exists = 1; + } else if (rc == 2) { + uuid_copy(state->chart_uuid, *uuid); + ((PARSER_USER_OBJECT *)user)->st_exists = 1; + } else + uuid_copy(state->uuid, *uuid); + + return PARSER_RC_OK; +} + +PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid) +{ + UNUSED(user); + UNUSED(uuid); + + return PARSER_RC_OK; +} + +void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx) +{ + state->ctx = ctx; + state->skip_record = 0; + uuid_clear(state->uuid); + state->metalogfile = NULL; +} diff --git a/database/engine/metadata_log/metalogpluginsd.h b/database/engine/metadata_log/metalogpluginsd.h new file mode 100644 index 0000000..96808aa --- /dev/null +++ b/database/engine/metadata_log/metalogpluginsd.h @@ -0,0 +1,33 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_METALOGPLUGINSD_H +#define NETDATA_METALOGPLUGINSD_H + +#include "../../../collectors/plugins.d/pluginsd_parser.h" +#include "../../../collectors/plugins.d/plugins_d.h" +#include "../../../parser/parser.h" + +struct metalog_pluginsd_state { + struct metalog_instance *ctx; + uuid_t uuid; + uuid_t host_uuid; + uuid_t chart_uuid; + uint8_t skip_record; /* skip this record due to errors in parsing */ + struct metadata_logfile *metalogfile; /* current metadata log file being replayed */ +}; + +extern void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx); + +extern PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, + char *context, char *title, char *units, char *plugin, char *module, + int priority, int update_every, RRDSET_TYPE chart_type, char *options); +extern PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, + long multiplier, long divisor, char *options, + RRD_ALGORITHM algorithm_type); +extern PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid); +extern PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid); +extern PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid); +extern PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action); +extern PARSER_RC metalog_pluginsd_host_action(void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, char *tags); + +#endif /* NETDATA_METALOGPLUGINSD_H */ |