summaryrefslogtreecommitdiffstats
path: root/database/engine/metadata_log
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 11:08:07 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 11:08:07 +0000
commitc69cb8cc094cc916adbc516b09e944cd3d137c01 (patch)
treef2878ec41fb6d0e3613906c6722fc02b934eeb80 /database/engine/metadata_log
parentInitial commit. (diff)
downloadnetdata-c69cb8cc094cc916adbc516b09e944cd3d137c01.tar.xz
netdata-c69cb8cc094cc916adbc516b09e944cd3d137c01.zip
Adding upstream version 1.29.3.upstream/1.29.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/metadata_log')
-rw-r--r--database/engine/metadata_log/Makefile.am8
-rw-r--r--database/engine/metadata_log/README.md0
-rw-r--r--database/engine/metadata_log/compaction.c86
-rw-r--r--database/engine/metadata_log/compaction.h14
-rw-r--r--database/engine/metadata_log/logfile.c453
-rw-r--r--database/engine/metadata_log/logfile.h39
-rw-r--r--database/engine/metadata_log/metadatalog.h28
-rwxr-xr-xdatabase/engine/metadata_log/metadatalogapi.c39
-rw-r--r--database/engine/metadata_log/metadatalogapi.h12
-rw-r--r--database/engine/metadata_log/metadatalogprotocol.h53
-rwxr-xr-xdatabase/engine/metadata_log/metalogpluginsd.c140
-rw-r--r--database/engine/metadata_log/metalogpluginsd.h33
12 files changed, 905 insertions, 0 deletions
diff --git a/database/engine/metadata_log/Makefile.am b/database/engine/metadata_log/Makefile.am
new file mode 100644
index 0000000..161784b
--- /dev/null
+++ b/database/engine/metadata_log/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/database/engine/metadata_log/README.md b/database/engine/metadata_log/README.md
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/database/engine/metadata_log/README.md
diff --git a/database/engine/metadata_log/compaction.c b/database/engine/metadata_log/compaction.c
new file mode 100644
index 0000000..ba19e1e
--- /dev/null
+++ b/database/engine/metadata_log/compaction.c
@@ -0,0 +1,86 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "metadatalog.h"
+
+/* Return 0 on success. */
+int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles,
+ unsigned *matched_files)
+{
+ int ret;
+ unsigned starting_fileno, fileno, i, j, recovered_files;
+ struct metadata_logfile *metalogfile = NULL, *compactionfile = NULL, **tmp_metalogfiles;
+ char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
+
+ for (i = 0 ; i < *matched_files ; ++i) {
+ metalogfile = metalogfiles[i];
+ if (0 == metalogfile->starting_fileno)
+ continue; /* skip standard metadata log files */
+ break; /* this is a compaction temporary file */
+ }
+ if (i == *matched_files) /* no recovery needed */
+ return 0;
+ info("Starting metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
+
+ if (*matched_files - i > 1) { /* Can't have more than 1 temporary compaction files */
+ error("Metadata log files are in an invalid state. Cannot proceed.");
+ return 1;
+ }
+ compactionfile = metalogfile;
+ starting_fileno = compactionfile->starting_fileno;
+ fileno = compactionfile->fileno;
+ /* scratchpad space to move file pointers around */
+ tmp_metalogfiles = callocz(*matched_files, sizeof(*tmp_metalogfiles));
+
+ for (j = 0, recovered_files = 0 ; j < i ; ++j) {
+ metalogfile = metalogfiles[j];
+ fatal_assert(0 == metalogfile->starting_fileno);
+ if (metalogfile->fileno < starting_fileno) {
+ tmp_metalogfiles[recovered_files++] = metalogfile;
+ continue;
+ }
+ break; /* reached compaction file serial number */
+ }
+
+ if ((j == i) /* Shouldn't be possible, invalid compaction temporary file */ ||
+ (metalogfile->fileno == starting_fileno && metalogfile->fileno == fileno)) {
+ error("Deleting invalid compaction temporary file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
+ METALOG_EXTENSION"\"", dbfiles_path, starting_fileno, fileno);
+ unlink_metadata_logfile(compactionfile);
+ freez(compactionfile);
+ freez(tmp_metalogfiles);
+ --*matched_files; /* delete the last one */
+
+ info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
+ return 0;
+ }
+
+ for ( ; j < i ; ++j) { /* continue iterating through normal metadata log files */
+ metalogfile = metalogfiles[j];
+ fatal_assert(0 == metalogfile->starting_fileno);
+ if (metalogfile->fileno < fileno) { /* It has already been compacted */
+ error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
+ METALOG_EXTENSION"\"", dbfiles_path, 0U, metalogfile->fileno);
+ unlink_metadata_logfile(metalogfile);
+ freez(metalogfile);
+ continue;
+ }
+ tmp_metalogfiles[recovered_files++] = metalogfile;
+ }
+
+ /* compaction temporary file is valid */
+ tmp_metalogfiles[recovered_files++] = compactionfile;
+ ret = rename_metadata_logfile(compactionfile, 0, starting_fileno);
+ if (ret < 0) {
+ error("Cannot rename temporary compaction files. Cannot proceed.");
+ freez(tmp_metalogfiles);
+ return 1;
+ }
+
+ memcpy(metalogfiles, tmp_metalogfiles, recovered_files * sizeof(*metalogfiles));
+ *matched_files = recovered_files;
+ freez(tmp_metalogfiles);
+
+ info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
+ return 0;
+}
diff --git a/database/engine/metadata_log/compaction.h b/database/engine/metadata_log/compaction.h
new file mode 100644
index 0000000..d046134
--- /dev/null
+++ b/database/engine/metadata_log/compaction.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_COMPACTION_H
+#define NETDATA_COMPACTION_H
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include "../rrdengine.h"
+
+extern int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles,
+ unsigned *matched_files);
+
+#endif /* NETDATA_COMPACTION_H */
diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c
new file mode 100644
index 0000000..b7c5c06
--- /dev/null
+++ b/database/engine/metadata_log/logfile.c
@@ -0,0 +1,453 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include <database/sqlite/sqlite_functions.h>
+#include "metadatalog.h"
+#include "metalogpluginsd.h"
+
+
+void generate_metadata_logfile_path(struct metadata_logfile *metalogfile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION,
+ metalogfile->ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
+}
+
+void metadata_logfile_init(struct metadata_logfile *metalogfile, struct metalog_instance *ctx, unsigned starting_fileno,
+ unsigned fileno)
+{
+ metalogfile->starting_fileno = starting_fileno;
+ metalogfile->fileno = fileno;
+ metalogfile->file = (uv_file)0;
+ metalogfile->pos = 0;
+ metalogfile->next = NULL;
+ metalogfile->ctx = ctx;
+}
+
+int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, unsigned new_fileno)
+{
+ //struct metalog_instance *ctx = metalogfile->ctx;
+ uv_fs_t req;
+ int ret;
+ char oldpath[RRDENG_PATH_MAX], newpath[RRDENG_PATH_MAX];
+ unsigned backup_starting_fileno, backup_fileno;
+
+ backup_starting_fileno = metalogfile->starting_fileno;
+ backup_fileno = metalogfile->fileno;
+ generate_metadata_logfile_path(metalogfile, oldpath, sizeof(oldpath));
+ metalogfile->starting_fileno = new_starting_fileno;
+ metalogfile->fileno = new_fileno;
+ generate_metadata_logfile_path(metalogfile, newpath, sizeof(newpath));
+
+ info("Renaming metadata log file \"%s\" to \"%s\".", oldpath, newpath);
+ ret = uv_fs_rename(NULL, &req, oldpath, newpath, NULL);
+ if (ret < 0) {
+ error("uv_fs_rename(%s): %s", oldpath, uv_strerror(ret));
+ //++ctx->stats.fs_errors; /* this is racy, may miss some errors */
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ /* restore previous values */
+ metalogfile->starting_fileno = backup_starting_fileno;
+ metalogfile->fileno = backup_fileno;
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+int unlink_metadata_logfile(struct metadata_logfile *metalogfile)
+{
+ //struct metalog_instance *ctx = metalogfile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_metadata_logfile_path(metalogfile, path, sizeof(path));
+
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+static int check_metadata_logfile_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_metalog_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ fatal_assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_METALOG_MAGIC, RRDENG_MAGIC_SZ)) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ if (superblock->version > RRDENG_METALOG_VER) {
+ error("File has unknown version %"PRIu16". Compatibility is not guaranteed.", superblock->version);
+ }
+error:
+ free(superblock);
+ return ret;
+}
+
+void replay_record(struct metadata_logfile *metalogfile, struct rrdeng_metalog_record_header *header, void *payload)
+{
+ struct metalog_instance *ctx = metalogfile->ctx;
+ char *line, *nextline, *record_end;
+ int ret;
+
+ debug(D_METADATALOG, "RECORD contents: %.*s", (int)header->payload_length, (char *)payload);
+ record_end = (char *)payload + header->payload_length - 1;
+ *record_end = '\0';
+
+ for (line = payload ; line ; line = nextline) {
+ nextline = strchr(line, '\n');
+ if (nextline) {
+ *nextline++ = '\0';
+ }
+ ret = parser_action(ctx->metalog_parser_object->parser, line);
+ debug(D_METADATALOG, "parser_action ret:%d", ret);
+ if (ret)
+ return; /* skip record due to error */
+ };
+}
+
+/* This function only works with buffered I/O */
+static inline int metalogfile_read(struct metadata_logfile *metalogfile, void *buf, size_t len, uint64_t offset)
+{
+// struct metalog_instance *ctx;
+ uv_file file;
+ uv_buf_t iov;
+ uv_fs_t req;
+ int ret;
+
+// ctx = metalogfile->ctx;
+ file = metalogfile->file;
+ iov = uv_buf_init(buf, len);
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, offset, NULL);
+ if (unlikely(ret < 0 && ret != req.result)) {
+ fatal("uv_fs_read: %s", uv_strerror(ret));
+ }
+ if (req.result < 0) {
+// ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ error("%s: uv_fs_read - %s - record at offset %"PRIu64"(%u) in metadata logfile %u-%u.", __func__,
+ uv_strerror((int)req.result), offset, (unsigned)len, metalogfile->starting_fileno, metalogfile->fileno);
+ }
+ uv_fs_req_cleanup(&req);
+// ctx->stats.io_read_bytes += len;
+// ++ctx->stats.io_read_requests;
+
+ return ret;
+}
+
+/* Return 0 on success */
+static int metadata_record_integrity_check(void *record)
+{
+ int ret;
+ uint32_t data_size;
+ struct rrdeng_metalog_record_header *header;
+ struct rrdeng_metalog_record_trailer *trailer;
+ uLong crc;
+
+ header = record;
+ data_size = header->header_length + header->payload_length;
+ trailer = record + data_size;
+
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, record, data_size);
+ ret = crc32cmp(trailer->checksum, crc);
+
+ return ret;
+}
+
+#define MAX_READ_BYTES (RRDENG_BLOCK_SIZE * 32) /* no record should be over 128KiB in this version */
+
+/*
+ * Iterates metadata log file records and creates database objects (host/chart/dimension)
+ */
+static void iterate_records(struct metadata_logfile *metalogfile)
+{
+ uint32_t file_size, pos, bytes_remaining, record_size;
+ void *buf;
+ struct rrdeng_metalog_record_header *header;
+ struct metalog_instance *ctx = metalogfile->ctx;
+ struct metalog_pluginsd_state *state = ctx->metalog_parser_object->private;
+ const size_t min_header_size = offsetof(struct rrdeng_metalog_record_header, header_length) +
+ sizeof(header->header_length);
+
+ file_size = metalogfile->pos;
+ state->metalogfile = metalogfile;
+
+ buf = mallocz(MAX_READ_BYTES);
+
+ for (pos = sizeof(struct rrdeng_metalog_sb) ; pos < file_size ; pos += record_size) {
+ bytes_remaining = file_size - pos;
+ if (bytes_remaining < min_header_size) {
+ error("%s: unexpected end of file in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
+ metalogfile->fileno);
+ break;
+ }
+ if (metalogfile_read(metalogfile, buf, min_header_size, pos) < 0)
+ break;
+ header = (struct rrdeng_metalog_record_header *)buf;
+ if (METALOG_STORE_PADDING == header->type) {
+ info("%s: Skipping padding in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
+ metalogfile->fileno);
+ record_size = ALIGN_BYTES_FLOOR(pos + RRDENG_BLOCK_SIZE) - pos;
+ continue;
+ }
+ if (metalogfile_read(metalogfile, buf + min_header_size, sizeof(*header) - min_header_size,
+ pos + min_header_size) < 0)
+ break;
+ record_size = header->header_length + header->payload_length + sizeof(struct rrdeng_metalog_record_trailer);
+ if (header->header_length < min_header_size || record_size > bytes_remaining) {
+ error("%s: Corrupted record in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
+ metalogfile->fileno);
+ break;
+ }
+ if (record_size > MAX_READ_BYTES) {
+ error("%s: Record is too long (%u bytes) in metadata logfile %u-%u.", __func__, record_size,
+ metalogfile->starting_fileno, metalogfile->fileno);
+ continue;
+ }
+ if (metalogfile_read(metalogfile, buf + sizeof(*header), record_size - sizeof(*header),
+ pos + sizeof(*header)) < 0)
+ break;
+ if (metadata_record_integrity_check(buf)) {
+ error("%s: Record at offset %"PRIu32" was read from disk. CRC32 check: FAILED", __func__, pos);
+ continue;
+ }
+ debug(D_METADATALOG, "%s: Record at offset %"PRIu32" was read from disk. CRC32 check: SUCCEEDED", __func__,
+ pos);
+
+ replay_record(metalogfile, header, buf + header->header_length);
+ }
+
+ freez(buf);
+}
+
+int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *metalogfile)
+{
+ UNUSED(ctx);
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd, error;
+ uint64_t file_size;
+ char path[RRDENG_PATH_MAX];
+
+ generate_metadata_logfile_path(metalogfile, path, sizeof(path));
+ if (file_is_migrated(path))
+ return 0;
+
+ fd = open_file_buffered_io(path, O_RDWR, &file);
+ if (fd < 0) {
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
+ }
+ info("Loading metadata log \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_metalog_sb));
+ if (ret)
+ goto error;
+
+ ret = check_metadata_logfile_superblock(file);
+ if (ret)
+ goto error;
+// ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
+// ++ctx->stats.io_read_requests;
+
+ metalogfile->file = file;
+ metalogfile->pos = file_size;
+
+ iterate_records(metalogfile);
+
+ info("Metadata log \"%s\" migrated to the database (size:%"PRIu64").", path, file_size);
+ add_migrated_file(path, file_size);
+ return 0;
+
+error:
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+ return error;
+}
+
+static int scan_metalog_files_cmp(const void *a, const void *b)
+{
+ struct metadata_logfile *file1, *file2;
+ char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
+
+ file1 = *(struct metadata_logfile **)a;
+ file2 = *(struct metadata_logfile **)b;
+ generate_metadata_logfile_path(file1, path1, sizeof(path1));
+ generate_metadata_logfile_path(file2, path2, sizeof(path2));
+ return strcmp(path1, path2);
+}
+
+/* Returns number of metadata logfiles that were loaded or < 0 on error */
+static int scan_metalog_files(struct metalog_instance *ctx)
+{
+ int ret;
+ unsigned starting_no, no, matched_files, i, failed_to_load;
+ static uv_fs_t req;
+ uv_dirent_t dent;
+ struct metadata_logfile **metalogfiles, *metalogfile;
+ char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
+
+ ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL);
+ if (ret < 0) {
+ fatal_assert(req.result < 0);
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret));
+// ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return ret;
+ }
+ info("Found %d files in path %s", ret, dbfiles_path);
+
+ metalogfiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*metalogfiles));
+ for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
+ info("Scanning file \"%s/%s\"", dbfiles_path, dent.name);
+ ret = sscanf(dent.name, METALOG_PREFIX METALOG_FILE_NUMBER_SCAN_TMPL METALOG_EXTENSION, &starting_no, &no);
+ if (2 == ret) {
+ info("Matched file \"%s/%s\"", dbfiles_path, dent.name);
+ metalogfile = mallocz(sizeof(*metalogfile));
+ metadata_logfile_init(metalogfile, ctx, starting_no, no);
+ metalogfiles[matched_files++] = metalogfile;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+
+ if (0 == matched_files) {
+ freez(metalogfiles);
+ return 0;
+ }
+ if (matched_files == MAX_DATAFILES) {
+ error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
+ }
+ qsort(metalogfiles, matched_files, sizeof(*metalogfiles), scan_metalog_files_cmp);
+ ret = compaction_failure_recovery(ctx, metalogfiles, &matched_files);
+ if (ret) { /* If the files are corrupted fail */
+ for (i = 0 ; i < matched_files ; ++i) {
+ freez(metalogfiles[i]);
+ }
+ freez(metalogfiles);
+ return UV_EINVAL;
+ }
+ //ctx->last_fileno = metalogfiles[matched_files - 1]->fileno;
+
+ struct plugind cd = {
+ .enabled = 1,
+ .update_every = 0,
+ .pid = 0,
+ .serial_failures = 0,
+ .successful_collections = 0,
+ .obsolete = 0,
+ .started_t = INVALID_TIME,
+ .next = NULL,
+ .version = 0,
+ };
+
+ struct metalog_pluginsd_state metalog_parser_state;
+ metalog_pluginsd_state_init(&metalog_parser_state, ctx);
+
+ PARSER_USER_OBJECT metalog_parser_object;
+ metalog_parser_object.enabled = cd.enabled;
+ metalog_parser_object.host = ctx->rrdeng_ctx->host;
+ metalog_parser_object.cd = &cd;
+ metalog_parser_object.trust_durations = 0;
+ metalog_parser_object.private = &metalog_parser_state;
+
+ PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT);
+ if (unlikely(!parser)) {
+ error("Failed to initialize metadata log parser.");
+ failed_to_load = matched_files;
+ goto after_failed_to_parse;
+ }
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_TOMBSTONE, pluginsd_tombstone);
+ parser->plugins_action->dimension_action = &metalog_pluginsd_dimension_action;
+ parser->plugins_action->chart_action = &metalog_pluginsd_chart_action;
+ parser->plugins_action->guid_action = &metalog_pluginsd_guid_action;
+ parser->plugins_action->context_action = &metalog_pluginsd_context_action;
+ parser->plugins_action->tombstone_action = &metalog_pluginsd_tombstone_action;
+ parser->plugins_action->host_action = &metalog_pluginsd_host_action;
+
+
+ metalog_parser_object.parser = parser;
+ ctx->metalog_parser_object = &metalog_parser_object;
+
+ for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
+ metalogfile = metalogfiles[i];
+ db_lock();
+ db_execute("BEGIN TRANSACTION;");
+ ret = load_metadata_logfile(ctx, metalogfile);
+ if (0 != ret) {
+ error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
+ METALOG_EXTENSION"\"", dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
+ unlink_metadata_logfile(metalogfile);
+ ++failed_to_load;
+ db_execute("ROLLBACK TRANSACTION;");
+ }
+ else
+ db_execute("COMMIT TRANSACTION;");
+ db_unlock();
+ freez(metalogfile);
+ }
+ matched_files -= failed_to_load;
+ debug(D_METADATALOG, "PARSER ended");
+
+ parser_destroy(parser);
+
+ size_t count __maybe_unused = metalog_parser_object.count;
+
+ debug(D_METADATALOG, "Parsing count=%u", (unsigned)count);
+after_failed_to_parse:
+
+ freez(metalogfiles);
+
+ return matched_files;
+}
+
+/* Return 0 on success. */
+int init_metalog_files(struct metalog_instance *ctx)
+{
+ int ret;
+ char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
+
+ ret = scan_metalog_files(ctx);
+ if (ret < 0) {
+ error("Failed to scan path \"%s\".", dbfiles_path);
+ return ret;
+ }/* else if (0 == ret) {
+ ctx->last_fileno = 1;
+ }*/
+
+ return 0;
+}
diff --git a/database/engine/metadata_log/logfile.h b/database/engine/metadata_log/logfile.h
new file mode 100644
index 0000000..df12ac7
--- /dev/null
+++ b/database/engine/metadata_log/logfile.h
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_LOGFILE_H
+#define NETDATA_LOGFILE_H
+
+#include "metadatalogprotocol.h"
+#include "../rrdengine.h"
+
+/* Forward declarations */
+struct metadata_logfile;
+struct metalog_worker_config;
+
+#define METALOG_PREFIX "metadatalog-"
+#define METALOG_EXTENSION ".mlf"
+
+/* only one event loop is supported for now */
+struct metadata_logfile {
+ unsigned fileno; /* Starts at 1 */
+ unsigned starting_fileno; /* 0 for normal files, staring number during compaction */
+ uv_file file;
+ uint64_t pos;
+ struct metalog_instance *ctx;
+ struct metadata_logfile *next;
+};
+
+struct metadata_logfile_list {
+ struct metadata_logfile *first; /* oldest */
+ struct metadata_logfile *last; /* newest */
+};
+
+extern void generate_metadata_logfile_path(struct metadata_logfile *metadatalog, char *str, size_t maxlen);
+extern int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno,
+ unsigned new_fileno);
+extern int unlink_metadata_logfile(struct metadata_logfile *metalogfile);
+extern int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *logfile);
+extern int init_metalog_files(struct metalog_instance *ctx);
+
+
+#endif /* NETDATA_LOGFILE_H */
diff --git a/database/engine/metadata_log/metadatalog.h b/database/engine/metadata_log/metadatalog.h
new file mode 100644
index 0000000..b484686
--- /dev/null
+++ b/database/engine/metadata_log/metadatalog.h
@@ -0,0 +1,28 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METADATALOG_H
+#define NETDATA_METADATALOG_H
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include "../rrdengine.h"
+#include "metadatalogprotocol.h"
+#include "logfile.h"
+#include "metadatalogapi.h"
+#include "compaction.h"
+
+/* Forward declerations */
+struct metalog_instance;
+struct parser_user_object;
+
+#define METALOG_FILE_NUMBER_SCAN_TMPL "%5u-%5u"
+#define METALOG_FILE_NUMBER_PRINT_TMPL "%5.5u-%5.5u"
+
+struct metalog_instance {
+ struct rrdengine_instance *rrdeng_ctx;
+ struct parser_user_object *metalog_parser_object;
+ uint8_t initialized; /* set to 1 to mark context initialized */
+};
+
+#endif /* NETDATA_METADATALOG_H */
diff --git a/database/engine/metadata_log/metadatalogapi.c b/database/engine/metadata_log/metadatalogapi.c
new file mode 100755
index 0000000..b206cca
--- /dev/null
+++ b/database/engine/metadata_log/metadatalogapi.c
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "metadatalog.h"
+
+/*
+ * Returns 0 on success, negative on error
+ */
+int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx)
+{
+ struct metalog_instance *ctx;
+ int error;
+
+ ctx = callocz(1, sizeof(*ctx));
+ ctx->initialized = 0;
+ rrdeng_parent_ctx->metalog_ctx = ctx;
+
+ ctx->rrdeng_ctx = rrdeng_parent_ctx;
+ error = init_metalog_files(ctx);
+ if (error) {
+ goto error_after_init_rrd_files;
+ }
+ ctx->initialized = 1; /* notify dbengine that the metadata log has finished initializing */
+ return 0;
+
+error_after_init_rrd_files:
+ freez(ctx);
+ return UV_EIO;
+}
+
+/* This function is called by dbengine rotation logic when the metric has no writers */
+void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid)
+{
+ uuid_t multihost_uuid;
+
+ delete_dimension_uuid(metric_uuid);
+ rrdeng_convert_legacy_uuid_to_multihost(ctx->rrdeng_ctx->machine_guid, metric_uuid, &multihost_uuid);
+ delete_dimension_uuid(&multihost_uuid);
+}
diff --git a/database/engine/metadata_log/metadatalogapi.h b/database/engine/metadata_log/metadatalogapi.h
new file mode 100644
index 0000000..d558b93
--- /dev/null
+++ b/database/engine/metadata_log/metadatalogapi.h
@@ -0,0 +1,12 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METADATALOGAPI_H
+#define NETDATA_METADATALOGAPI_H
+
+extern void metalog_commit_delete_chart(RRDSET *st);
+extern void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid);
+
+/* must call once before using anything */
+extern int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx);
+
+#endif /* NETDATA_METADATALOGAPI_H */
diff --git a/database/engine/metadata_log/metadatalogprotocol.h b/database/engine/metadata_log/metadatalogprotocol.h
new file mode 100644
index 0000000..1017213
--- /dev/null
+++ b/database/engine/metadata_log/metadatalogprotocol.h
@@ -0,0 +1,53 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METADATALOGPROTOCOL_H
+#define NETDATA_METADATALOGPROTOCOL_H
+
+#include "../rrddiskprotocol.h"
+
+#define RRDENG_METALOG_MAGIC "netdata-metadata-log"
+
+#define RRDENG_METALOG_VER (1)
+
+#define RRDENG_METALOG_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + sizeof(uint16_t)))
+/*
+ * Metadata log persistent super-block
+ */
+struct rrdeng_metalog_sb {
+ char magic_number[RRDENG_MAGIC_SZ];
+ uint16_t version;
+ uint8_t padding[RRDENG_METALOG_SB_PADDING_SZ];
+} __attribute__ ((packed));
+
+/*
+ * Metadata log record types
+ */
+#define METALOG_STORE_PADDING (0)
+#define METALOG_CREATE_OBJECT (1)
+#define METALOG_DELETE_OBJECT (2)
+#define METALOG_OTHER (3) /* reserved */
+
+/*
+ * Metadata log record header
+ */
+struct rrdeng_metalog_record_header {
+ /* when set to METALOG_STORE_PADDING jump to start of next block */
+ uint8_t type;
+
+ uint16_t header_length;
+ uint32_t payload_length;
+ /******************************************************
+ * No fields above this point can ever change. *
+ ******************************************************
+ * All fields below this point are subject to change. *
+ ******************************************************/
+} __attribute__ ((packed));
+
+/*
+ * Metadata log record trailer
+ */
+struct rrdeng_metalog_record_trailer {
+ uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
+} __attribute__ ((packed));
+
+#endif /* NETDATA_METADATALOGPROTOCOL_H */
diff --git a/database/engine/metadata_log/metalogpluginsd.c b/database/engine/metadata_log/metalogpluginsd.c
new file mode 100755
index 0000000..88c1453
--- /dev/null
+++ b/database/engine/metadata_log/metalogpluginsd.c
@@ -0,0 +1,140 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+
+#include "metadatalog.h"
+#include "metalogpluginsd.h"
+
+extern struct config stream_config;
+
+PARSER_RC metalog_pluginsd_host_action(
+ void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone,
+ char *tags)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+
+ RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
+ if (host) {
+ if (unlikely(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)) {
+ error("Archived host '%s' has memory mode '%s', but the archived one is '%s'. Ignoring archived state.",
+ host->hostname, rrd_memory_mode_name(host->rrd_memory_mode),
+ rrd_memory_mode_name(RRD_MEMORY_MODE_DBENGINE));
+ ((PARSER_USER_OBJECT *) user)->host = NULL; /* Ignore objects if memory mode is not dbengine */
+ }
+ ((PARSER_USER_OBJECT *) user)->host = host;
+ return PARSER_RC_OK;
+ }
+
+ if (strcmp(machine_guid, registry_get_this_machine_guid()) == 0) {
+ ((PARSER_USER_OBJECT *) user)->host = host;
+ return PARSER_RC_OK;
+ }
+
+ if (likely(!uuid_parse(machine_guid, state->host_uuid))) {
+ int rc = sql_store_host(&state->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags);
+ if (unlikely(rc)) {
+ errno = 0;
+ error("Failed to store host %s with UUID %s in the database", hostname, machine_guid);
+ }
+ }
+ else {
+ errno = 0;
+ error("Host machine GUID %s is not valid", machine_guid);
+ }
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context,
+ char *title, char *units, char *plugin, char *module, int priority,
+ int update_every, RRDSET_TYPE chart_type, char *options)
+{
+ UNUSED(options);
+
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+ RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+
+ if (unlikely(uuid_is_null(state->host_uuid))) {
+ debug(D_METADATALOG, "Ignoring chart belonging to missing or ignored host.");
+ return PARSER_RC_OK;
+ }
+ uuid_copy(state->chart_uuid, state->uuid);
+ uuid_clear(state->uuid); /* Consume UUID */
+ (void) sql_store_chart(&state->chart_uuid, &state->host_uuid,
+ type, id, name, family, context, title, units,
+ plugin, module, priority, update_every,
+ chart_type, RRD_MEMORY_MODE_DBENGINE, host ? host->rrd_history_entries : 1);
+ ((PARSER_USER_OBJECT *)user)->st_exists = 1;
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm,
+ long multiplier, long divisor, char *options, RRD_ALGORITHM algorithm_type)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+ UNUSED(user);
+ UNUSED(options);
+ UNUSED(algorithm);
+ UNUSED(st);
+
+ if (unlikely(uuid_is_null(state->chart_uuid))) {
+ debug(D_METADATALOG, "Ignoring dimension belonging to missing or ignored chart.");
+ info("Ignoring dimension belonging to missing or ignored chart.");
+ return PARSER_RC_OK;
+ }
+
+ if (unlikely(uuid_is_null(state->uuid))) {
+ debug(D_METADATALOG, "Ignoring dimension without unknown UUID");
+ info("Ignoring dimension without unknown UUID");
+ return PARSER_RC_OK;
+ }
+
+ (void) sql_store_dimension(&state->uuid, &state->chart_uuid, id, name, multiplier, divisor, algorithm_type);
+ uuid_clear(state->uuid); /* Consume UUID */
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+
+ uuid_copy(state->uuid, *uuid);
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid)
+{
+ struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
+
+ int rc = find_uuid_type(uuid);
+
+ if (rc == 1) {
+ uuid_copy(state->host_uuid, *uuid);
+ ((PARSER_USER_OBJECT *)user)->st_exists = 0;
+ ((PARSER_USER_OBJECT *)user)->host_exists = 1;
+ } else if (rc == 2) {
+ uuid_copy(state->chart_uuid, *uuid);
+ ((PARSER_USER_OBJECT *)user)->st_exists = 1;
+ } else
+ uuid_copy(state->uuid, *uuid);
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid)
+{
+ UNUSED(user);
+ UNUSED(uuid);
+
+ return PARSER_RC_OK;
+}
+
+void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx)
+{
+ state->ctx = ctx;
+ state->skip_record = 0;
+ uuid_clear(state->uuid);
+ state->metalogfile = NULL;
+}
diff --git a/database/engine/metadata_log/metalogpluginsd.h b/database/engine/metadata_log/metalogpluginsd.h
new file mode 100644
index 0000000..96808aa
--- /dev/null
+++ b/database/engine/metadata_log/metalogpluginsd.h
@@ -0,0 +1,33 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_METALOGPLUGINSD_H
+#define NETDATA_METALOGPLUGINSD_H
+
+#include "../../../collectors/plugins.d/pluginsd_parser.h"
+#include "../../../collectors/plugins.d/plugins_d.h"
+#include "../../../parser/parser.h"
+
+struct metalog_pluginsd_state {
+ struct metalog_instance *ctx;
+ uuid_t uuid;
+ uuid_t host_uuid;
+ uuid_t chart_uuid;
+ uint8_t skip_record; /* skip this record due to errors in parsing */
+ struct metadata_logfile *metalogfile; /* current metadata log file being replayed */
+};
+
+extern void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx);
+
+extern PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family,
+ char *context, char *title, char *units, char *plugin, char *module,
+ int priority, int update_every, RRDSET_TYPE chart_type, char *options);
+extern PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm,
+ long multiplier, long divisor, char *options,
+ RRD_ALGORITHM algorithm_type);
+extern PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid);
+extern PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid);
+extern PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid);
+extern PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC metalog_pluginsd_host_action(void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, char *tags);
+
+#endif /* NETDATA_METALOGPLUGINSD_H */