summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/Makefile.am1
-rw-r--r--database/engine/datafile.c31
-rw-r--r--database/engine/datafile.h22
-rw-r--r--database/engine/journalfile.c113
-rw-r--r--database/engine/journalfile.h22
-rw-r--r--database/engine/metadata_log/Makefile.am8
-rw-r--r--database/engine/metadata_log/compaction.c86
-rw-r--r--database/engine/metadata_log/compaction.h14
-rw-r--r--database/engine/metadata_log/logfile.c447
-rw-r--r--database/engine/metadata_log/logfile.h39
-rw-r--r--database/engine/metadata_log/metadatalog.h28
-rwxr-xr-xdatabase/engine/metadata_log/metadatalogapi.c39
-rw-r--r--database/engine/metadata_log/metadatalogapi.h12
-rw-r--r--database/engine/metadata_log/metadatalogprotocol.h53
-rwxr-xr-xdatabase/engine/metadata_log/metalogpluginsd.c140
-rw-r--r--database/engine/metadata_log/metalogpluginsd.h33
-rw-r--r--database/engine/pagecache.c256
-rw-r--r--database/engine/pagecache.h131
-rw-r--r--database/engine/rrddiskprotocol.h4
-rw-r--r--database/engine/rrdengine.c188
-rw-r--r--database/engine/rrdengine.h50
-rwxr-xr-xdatabase/engine/rrdengineapi.c606
-rw-r--r--database/engine/rrdengineapi.h67
-rw-r--r--database/engine/rrdenginelib.c57
-rw-r--r--database/engine/rrdenginelib.h14
-rw-r--r--database/engine/rrdenglocking.h10
26 files changed, 902 insertions, 1569 deletions
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am
index 43405001d..59250a997 100644
--- a/database/engine/Makefile.am
+++ b/database/engine/Makefile.am
@@ -4,7 +4,6 @@ AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
SUBDIRS = \
- metadata_log \
$(NULL)
dist_noinst_DATA = \
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 2ed98ef88..9c70068d9 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -174,7 +174,7 @@ int create_data_file(struct rrdengine_datafile *datafile)
rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
- free(superblock);
+ posix_memfree(superblock);
if (ret < 0) {
destroy_data_file(datafile);
return ret;
@@ -218,7 +218,7 @@ static int check_data_file_superblock(uv_file file)
ret = 0;
}
error:
- free(superblock);
+ posix_memfree(superblock);
return ret;
}
@@ -444,44 +444,17 @@ void finalize_data_files(struct rrdengine_instance *ctx)
struct rrdengine_journalfile *journalfile;
struct extent_info *extent, *next_extent;
- size_t extents_number = 0;
- size_t extents_bytes = 0;
- size_t page_compressed_sizes = 0;
-
- size_t files_number = 0;
- size_t files_bytes = 0;
-
for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) {
journalfile = datafile->journalfile;
next_datafile = datafile->next;
for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) {
- extents_number++;
- extents_bytes += sizeof(*extent) + sizeof(struct rrdeng_page_descr *) * extent->number_of_pages;
- page_compressed_sizes += extent->size;
-
next_extent = extent->next;
freez(extent);
}
close_journal_file(journalfile, datafile);
close_data_file(datafile);
-
- files_number++;
- files_bytes += sizeof(*journalfile) + sizeof(*datafile);
-
freez(journalfile);
freez(datafile);
}
-
- if(!files_number) files_number = 1;
- if(!extents_number) extents_number = 1;
-
- info("DBENGINE STATISTICS ON DATAFILES:"
- " Files %zu, structures %zu bytes, %0.2f bytes per file."
- " Extents %zu, structures %zu bytes, %0.2f bytes per extent."
- " Compressed size of all pages: %zu bytes."
- , files_number, files_bytes, (double)files_bytes/files_number
- , extents_number, extents_bytes, (double)extents_bytes/extents_number
- , page_compressed_sizes
- );
}
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index ae94bfdd0..1cf256aff 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -52,16 +52,16 @@ struct rrdengine_datafile_list {
struct rrdengine_datafile *last; /* newest */
};
-extern void df_extent_insert(struct extent_info *extent);
-extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
-extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
-extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
-extern int close_data_file(struct rrdengine_datafile *datafile);
-extern int unlink_data_file(struct rrdengine_datafile *datafile);
-extern int destroy_data_file(struct rrdengine_datafile *datafile);
-extern int create_data_file(struct rrdengine_datafile *datafile);
-extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
-extern int init_data_files(struct rrdengine_instance *ctx);
-extern void finalize_data_files(struct rrdengine_instance *ctx);
+void df_extent_insert(struct extent_info *extent);
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
+int close_data_file(struct rrdengine_datafile *datafile);
+int unlink_data_file(struct rrdengine_datafile *datafile);
+int destroy_data_file(struct rrdengine_datafile *datafile);
+int create_data_file(struct rrdengine_datafile *datafile);
+int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
+int init_data_files(struct rrdengine_instance *ctx);
+void finalize_data_files(struct rrdengine_instance *ctx);
#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index dc61f569d..500dd7880 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -17,7 +17,7 @@ static void flush_transaction_buffer_cb(uv_fs_t* req)
}
uv_fs_req_cleanup(req);
- free(io_descr->buf);
+ posix_memfree(io_descr->buf);
freez(io_descr);
}
@@ -225,7 +225,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
- free(superblock);
+ posix_memfree(superblock);
if (ret < 0) {
destroy_journal_file(journalfile, datafile);
return ret;
@@ -268,7 +268,7 @@ static int check_journal_file_superblock(uv_file file)
ret = 0;
}
error:
- free(superblock);
+ posix_memfree(superblock);
return ret;
}
@@ -311,20 +311,46 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
}
continue;
}
- uint64_t start_time = jf_metric_data->descr[i].start_time;
- uint64_t end_time = jf_metric_data->descr[i].end_time;
+ uint64_t start_time_ut = jf_metric_data->descr[i].start_time_ut;
+ uint64_t end_time_ut = jf_metric_data->descr[i].end_time_ut;
+ size_t entries = jf_metric_data->descr[i].page_length / page_type_size[page_type];
+ time_t update_every_s = (entries > 1) ? ((end_time_ut - start_time_ut) / USEC_PER_SEC / (entries - 1)) : 0;
+
+ if (unlikely(start_time_ut > end_time_ut)) {
+ ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter++;
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut < end_time_ut)
+ ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut = end_time_ut;
+ continue;
+ }
- if (unlikely(start_time > end_time)) {
- error("Invalid page encountered, start time %lu > end time %lu", start_time , end_time );
+ if (unlikely(start_time_ut == end_time_ut && entries != 1)) {
+ ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter++;
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut < end_time_ut)
+ ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut = end_time_ut;
continue;
}
- if (unlikely(start_time == end_time)) {
- size_t entries = jf_metric_data->descr[i].page_length / page_type_size[page_type];
- if (unlikely(entries > 1)) {
- error("Invalid page encountered, start time %lu = end time but %zu entries were found", start_time, entries);
- continue;
- }
+ if (unlikely(!entries)) {
+ ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter++;
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut < end_time_ut)
+ ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut = end_time_ut;
+ continue;
+ }
+
+ if(entries > 1 && update_every_s == 0) {
+ ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter++;
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut < end_time_ut)
+ ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut = end_time_ut;
+ continue;
+ }
+
+ if(start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1) != end_time_ut) {
+ ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter++;
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut < end_time_ut)
+ ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut = end_time_ut;
+
+ // let this be
+ // end_time_ut = start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1);
}
temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
@@ -340,7 +366,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
- *PValue = page_index = create_page_index(temp_id);
+ *PValue = page_index = create_page_index(temp_id, ctx);
page_index->prev = pg_cache->metrics_index.last_page_index;
pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
@@ -348,21 +374,32 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
descr = pg_cache_create_descr();
descr->page_length = jf_metric_data->descr[i].page_length;
- descr->start_time = start_time;
- descr->end_time = end_time;
+ descr->start_time_ut = start_time_ut;
+ descr->end_time_ut = end_time_ut;
+ descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s);
descr->id = &page_index->id;
descr->extent = extent;
descr->type = page_type;
extent->pages[valid_pages++] = descr;
pg_cache_insert(ctx, page_index, descr);
+
+ if(page_index->latest_time_ut == descr->end_time_ut)
+ page_index->latest_update_every_s = descr->update_every_s;
+
+ if(descr->update_every_s == 0)
+ fatal(
+ "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu",
+ (unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries);
}
extent->number_of_pages = valid_pages;
if (likely(valid_pages))
df_extent_insert(extent);
- else
+ else {
freez(extent);
+ ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++;
+ }
}
/*
@@ -442,27 +479,30 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
//data_file_size = journalfile->datafile->pos; TODO: utilize this?
max_id = 1;
- ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
- if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
+ bool journal_is_mmapped = (journalfile->data != NULL);
+ if (unlikely(!journal_is_mmapped)) {
+ ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
+ if (unlikely(ret))
+ fatal("posix_memalign:%s", strerror(ret));
}
-
+ else
+ buf = journalfile->data + sizeof(struct rrdeng_jf_sb);
for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) {
size_bytes = MIN(READAHEAD_BYTES, file_size - pos);
- iov = uv_buf_init(buf, size_bytes);
- ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
- if (ret < 0) {
- error("uv_fs_read: pos=%"PRIu64", %s", pos, uv_strerror(ret));
+ if (unlikely(!journal_is_mmapped)) {
+ iov = uv_buf_init(buf, size_bytes);
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto skip_file;
+ }
+ fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
- goto skip_file;
+ ++ctx->stats.io_read_requests;
+ ctx->stats.io_read_bytes += size_bytes;
}
- fatal_assert(req.result >= 0);
- uv_fs_req_cleanup(&req);
- ctx->stats.io_read_bytes += size_bytes;
- ++ctx->stats.io_read_requests;
- //pos_i = pos;
- //while (pos_i < pos + size_bytes) {
for (pos_i = 0 ; pos_i < size_bytes ; ) {
unsigned max_size;
@@ -475,9 +515,12 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
pos_i += ret;
max_id = MAX(max_id, id);
}
+ if (likely(journal_is_mmapped))
+ buf += size_bytes;
}
skip_file:
- free(buf);
+ if (unlikely(!journal_is_mmapped))
+ posix_memfree(buf);
return max_id;
}
@@ -512,12 +555,16 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
journalfile->file = file;
journalfile->pos = file_size;
+ journalfile->data = netdata_mmap(path, file_size, MAP_SHARED, 0);
+ info("Loading journal file \"%s\" using %s.", path, journalfile->data?"MMAP":"uv_fs_read");
max_id = iterate_transactions(ctx, journalfile);
ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1);
info("Journal file \"%s\" loaded (size:%"PRIu64").", path, file_size);
+ if (likely(journalfile->data))
+ netdata_munmap(journalfile->data, file_size);
return 0;
error:
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
index f6c43cd16..011c5065f 100644
--- a/database/engine/journalfile.h
+++ b/database/engine/journalfile.h
@@ -19,7 +19,7 @@ struct rrdengine_journalfile;
struct rrdengine_journalfile {
uv_file file;
uint64_t pos;
-
+ void *data;
struct rrdengine_datafile *datafile;
};
@@ -33,17 +33,17 @@ struct transaction_commit_log {
unsigned buf_size;
};
-extern void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
-extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
-extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size);
-extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc);
-extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
-extern int unlink_journal_file(struct rrdengine_journalfile *journalfile);
-extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
-extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
-extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
+void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size);
+void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc);
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+int unlink_journal_file(struct rrdengine_journalfile *journalfile);
+int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
+int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
struct rrdengine_datafile *datafile);
-extern void init_commit_log(struct rrdengine_instance *ctx);
+void init_commit_log(struct rrdengine_instance *ctx);
#endif /* NETDATA_JOURNALFILE_H */ \ No newline at end of file
diff --git a/database/engine/metadata_log/Makefile.am b/database/engine/metadata_log/Makefile.am
deleted file mode 100644
index 161784b8f..000000000
--- a/database/engine/metadata_log/Makefile.am
+++ /dev/null
@@ -1,8 +0,0 @@
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-AUTOMAKE_OPTIONS = subdir-objects
-MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
-
-dist_noinst_DATA = \
- README.md \
- $(NULL)
diff --git a/database/engine/metadata_log/compaction.c b/database/engine/metadata_log/compaction.c
deleted file mode 100644
index ba19e1edf..000000000
--- a/database/engine/metadata_log/compaction.c
+++ /dev/null
@@ -1,86 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-#define NETDATA_RRD_INTERNALS
-
-#include "metadatalog.h"
-
-/* Return 0 on success. */
-int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles,
- unsigned *matched_files)
-{
- int ret;
- unsigned starting_fileno, fileno, i, j, recovered_files;
- struct metadata_logfile *metalogfile = NULL, *compactionfile = NULL, **tmp_metalogfiles;
- char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
-
- for (i = 0 ; i < *matched_files ; ++i) {
- metalogfile = metalogfiles[i];
- if (0 == metalogfile->starting_fileno)
- continue; /* skip standard metadata log files */
- break; /* this is a compaction temporary file */
- }
- if (i == *matched_files) /* no recovery needed */
- return 0;
- info("Starting metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
-
- if (*matched_files - i > 1) { /* Can't have more than 1 temporary compaction files */
- error("Metadata log files are in an invalid state. Cannot proceed.");
- return 1;
- }
- compactionfile = metalogfile;
- starting_fileno = compactionfile->starting_fileno;
- fileno = compactionfile->fileno;
- /* scratchpad space to move file pointers around */
- tmp_metalogfiles = callocz(*matched_files, sizeof(*tmp_metalogfiles));
-
- for (j = 0, recovered_files = 0 ; j < i ; ++j) {
- metalogfile = metalogfiles[j];
- fatal_assert(0 == metalogfile->starting_fileno);
- if (metalogfile->fileno < starting_fileno) {
- tmp_metalogfiles[recovered_files++] = metalogfile;
- continue;
- }
- break; /* reached compaction file serial number */
- }
-
- if ((j == i) /* Shouldn't be possible, invalid compaction temporary file */ ||
- (metalogfile->fileno == starting_fileno && metalogfile->fileno == fileno)) {
- error("Deleting invalid compaction temporary file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
- METALOG_EXTENSION"\"", dbfiles_path, starting_fileno, fileno);
- unlink_metadata_logfile(compactionfile);
- freez(compactionfile);
- freez(tmp_metalogfiles);
- --*matched_files; /* delete the last one */
-
- info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
- return 0;
- }
-
- for ( ; j < i ; ++j) { /* continue iterating through normal metadata log files */
- metalogfile = metalogfiles[j];
- fatal_assert(0 == metalogfile->starting_fileno);
- if (metalogfile->fileno < fileno) { /* It has already been compacted */
- error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
- METALOG_EXTENSION"\"", dbfiles_path, 0U, metalogfile->fileno);
- unlink_metadata_logfile(metalogfile);
- freez(metalogfile);
- continue;
- }
- tmp_metalogfiles[recovered_files++] = metalogfile;
- }
-
- /* compaction temporary file is valid */
- tmp_metalogfiles[recovered_files++] = compactionfile;
- ret = rename_metadata_logfile(compactionfile, 0, starting_fileno);
- if (ret < 0) {
- error("Cannot rename temporary compaction files. Cannot proceed.");
- freez(tmp_metalogfiles);
- return 1;
- }
-
- memcpy(metalogfiles, tmp_metalogfiles, recovered_files * sizeof(*metalogfiles));
- *matched_files = recovered_files;
- freez(tmp_metalogfiles);
-
- info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path);
- return 0;
-}
diff --git a/database/engine/metadata_log/compaction.h b/database/engine/metadata_log/compaction.h
deleted file mode 100644
index d04613440..000000000
--- a/database/engine/metadata_log/compaction.h
+++ /dev/null
@@ -1,14 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_COMPACTION_H
-#define NETDATA_COMPACTION_H
-
-#ifndef _GNU_SOURCE
-#define _GNU_SOURCE
-#endif
-#include "../rrdengine.h"
-
-extern int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles,
- unsigned *matched_files);
-
-#endif /* NETDATA_COMPACTION_H */
diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c
deleted file mode 100644
index 07eb9b6fe..000000000
--- a/database/engine/metadata_log/logfile.c
+++ /dev/null
@@ -1,447 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-#include <database/sqlite/sqlite_functions.h>
-#include "metadatalog.h"
-#include "metalogpluginsd.h"
-
-
-void generate_metadata_logfile_path(struct metadata_logfile *metalogfile, char *str, size_t maxlen)
-{
- (void) snprintfz(str, maxlen, "%s/" METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION,
- metalogfile->ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
-}
-
-void metadata_logfile_init(struct metadata_logfile *metalogfile, struct metalog_instance *ctx, unsigned starting_fileno,
- unsigned fileno)
-{
- metalogfile->starting_fileno = starting_fileno;
- metalogfile->fileno = fileno;
- metalogfile->file = (uv_file)0;
- metalogfile->pos = 0;
- metalogfile->next = NULL;
- metalogfile->ctx = ctx;
-}
-
-int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, unsigned new_fileno)
-{
- //struct metalog_instance *ctx = metalogfile->ctx;
- uv_fs_t req;
- int ret;
- char oldpath[RRDENG_PATH_MAX], newpath[RRDENG_PATH_MAX];
- unsigned backup_starting_fileno, backup_fileno;
-
- backup_starting_fileno = metalogfile->starting_fileno;
- backup_fileno = metalogfile->fileno;
- generate_metadata_logfile_path(metalogfile, oldpath, sizeof(oldpath));
- metalogfile->starting_fileno = new_starting_fileno;
- metalogfile->fileno = new_fileno;
- generate_metadata_logfile_path(metalogfile, newpath, sizeof(newpath));
-
- info("Renaming metadata log file \"%s\" to \"%s\".", oldpath, newpath);
- ret = uv_fs_rename(NULL, &req, oldpath, newpath, NULL);
- if (ret < 0) {
- error("uv_fs_rename(%s): %s", oldpath, uv_strerror(ret));
- //++ctx->stats.fs_errors; /* this is racy, may miss some errors */
- rrd_stat_atomic_add(&global_fs_errors, 1);
- /* restore previous values */
- metalogfile->starting_fileno = backup_starting_fileno;
- metalogfile->fileno = backup_fileno;
- }
- uv_fs_req_cleanup(&req);
-
- return ret;
-}
-
-int unlink_metadata_logfile(struct metadata_logfile *metalogfile)
-{
- //struct metalog_instance *ctx = metalogfile->ctx;
- uv_fs_t req;
- int ret;
- char path[RRDENG_PATH_MAX];
-
- generate_metadata_logfile_path(metalogfile, path, sizeof(path));
-
- ret = uv_fs_unlink(NULL, &req, path, NULL);
- if (ret < 0) {
- error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
-// ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
- }
- uv_fs_req_cleanup(&req);
-
- return ret;
-}
-
-static int check_metadata_logfile_superblock(uv_file file)
-{
- int ret;
- struct rrdeng_metalog_sb *superblock;
- uv_buf_t iov;
- uv_fs_t req;
-
- ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
- if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
- }
- iov = uv_buf_init((void *)superblock, sizeof(*superblock));
-
- ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
- if (ret < 0) {
- error("uv_fs_read: %s", uv_strerror(ret));
- uv_fs_req_cleanup(&req);
- goto error;
- }
- fatal_assert(req.result >= 0);
- uv_fs_req_cleanup(&req);
-
- if (strncmp(superblock->magic_number, RRDENG_METALOG_MAGIC, RRDENG_MAGIC_SZ)) {
- error("File has invalid superblock.");
- ret = UV_EINVAL;
- } else {
- ret = 0;
- }
- if (superblock->version > RRDENG_METALOG_VER) {
- error("File has unknown version %"PRIu16". Compatibility is not guaranteed.", superblock->version);
- }
-error:
- free(superblock);
- return ret;
-}
-
-void replay_record(struct metadata_logfile *metalogfile, struct rrdeng_metalog_record_header *header, void *payload)
-{
- struct metalog_instance *ctx = metalogfile->ctx;
- char *line, *nextline, *record_end;
- int ret;
-
- debug(D_METADATALOG, "RECORD contents: %.*s", (int)header->payload_length, (char *)payload);
- record_end = (char *)payload + header->payload_length - 1;
- *record_end = '\0';
-
- for (line = payload ; line ; line = nextline) {
- nextline = strchr(line, '\n');
- if (nextline) {
- *nextline++ = '\0';
- }
- ret = parser_action(ctx->metalog_parser_object->parser, line);
- debug(D_METADATALOG, "parser_action ret:%d", ret);
- if (ret)
- return; /* skip record due to error */
- };
-}
-
-/* This function only works with buffered I/O */
-static inline int metalogfile_read(struct metadata_logfile *metalogfile, void *buf, size_t len, uint64_t offset)
-{
-// struct metalog_instance *ctx;
- uv_file file;
- uv_buf_t iov;
- uv_fs_t req;
- int ret;
-
-// ctx = metalogfile->ctx;
- file = metalogfile->file;
- iov = uv_buf_init(buf, len);
- ret = uv_fs_read(NULL, &req, file, &iov, 1, offset, NULL);
- if (unlikely(ret < 0 && ret != req.result)) {
- fatal("uv_fs_read: %s", uv_strerror(ret));
- }
- if (req.result < 0) {
-// ++ctx->stats.io_errors;
- rrd_stat_atomic_add(&global_io_errors, 1);
- error("%s: uv_fs_read - %s - record at offset %"PRIu64"(%u) in metadata logfile %u-%u.", __func__,
- uv_strerror((int)req.result), offset, (unsigned)len, metalogfile->starting_fileno, metalogfile->fileno);
- }
- uv_fs_req_cleanup(&req);
-// ctx->stats.io_read_bytes += len;
-// ++ctx->stats.io_read_requests;
-
- return ret;
-}
-
-/* Return 0 on success */
-static int metadata_record_integrity_check(void *record)
-{
- int ret;
- uint32_t data_size;
- struct rrdeng_metalog_record_header *header;
- struct rrdeng_metalog_record_trailer *trailer;
- uLong crc;
-
- header = record;
- data_size = header->header_length + header->payload_length;
- trailer = record + data_size;
-
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, record, data_size);
- ret = crc32cmp(trailer->checksum, crc);
-
- return ret;
-}
-
-#define MAX_READ_BYTES (RRDENG_BLOCK_SIZE * 32) /* no record should be over 128KiB in this version */
-
-/*
- * Iterates metadata log file records and creates database objects (host/chart/dimension)
- */
-static void iterate_records(struct metadata_logfile *metalogfile)
-{
- uint32_t file_size, pos, bytes_remaining, record_size;
- void *buf;
- struct rrdeng_metalog_record_header *header;
- struct metalog_instance *ctx = metalogfile->ctx;
- struct metalog_pluginsd_state *state = ctx->metalog_parser_object->private;
- const size_t min_header_size = offsetof(struct rrdeng_metalog_record_header, header_length) +
- sizeof(header->header_length);
-
- file_size = metalogfile->pos;
- state->metalogfile = metalogfile;
-
- buf = mallocz(MAX_READ_BYTES);
-
- for (pos = sizeof(struct rrdeng_metalog_sb) ; pos < file_size ; pos += record_size) {
- bytes_remaining = file_size - pos;
- if (bytes_remaining < min_header_size) {
- error("%s: unexpected end of file in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
- metalogfile->fileno);
- break;
- }
- if (metalogfile_read(metalogfile, buf, min_header_size, pos) < 0)
- break;
- header = (struct rrdeng_metalog_record_header *)buf;
- if (METALOG_STORE_PADDING == header->type) {
- info("%s: Skipping padding in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
- metalogfile->fileno);
- record_size = ALIGN_BYTES_FLOOR(pos + RRDENG_BLOCK_SIZE) - pos;
- continue;
- }
- if (metalogfile_read(metalogfile, buf + min_header_size, sizeof(*header) - min_header_size,
- pos + min_header_size) < 0)
- break;
- record_size = header->header_length + header->payload_length + sizeof(struct rrdeng_metalog_record_trailer);
- if (header->header_length < min_header_size || record_size > bytes_remaining) {
- error("%s: Corrupted record in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
- metalogfile->fileno);
- break;
- }
- if (record_size > MAX_READ_BYTES) {
- error("%s: Record is too long (%u bytes) in metadata logfile %u-%u.", __func__, record_size,
- metalogfile->starting_fileno, metalogfile->fileno);
- continue;
- }
- if (metalogfile_read(metalogfile, buf + sizeof(*header), record_size - sizeof(*header),
- pos + sizeof(*header)) < 0)
- break;
- if (metadata_record_integrity_check(buf)) {
- error("%s: Record at offset %"PRIu32" was read from disk. CRC32 check: FAILED", __func__, pos);
- continue;
- }
- debug(D_METADATALOG, "%s: Record at offset %"PRIu32" was read from disk. CRC32 check: SUCCEEDED", __func__,
- pos);
-
- replay_record(metalogfile, header, buf + header->header_length);
- }
-
- freez(buf);
-}
-
-int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *metalogfile)
-{
- UNUSED(ctx);
- uv_fs_t req;
- uv_file file;
- int ret, fd, error;
- uint64_t file_size;
- char path[RRDENG_PATH_MAX];
-
- generate_metadata_logfile_path(metalogfile, path, sizeof(path));
- if (file_is_migrated(path))
- return 0;
-
- fd = open_file_buffered_io(path, O_RDWR, &file);
- if (fd < 0) {
-// ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
- return fd;
- }
- info("Loading metadata log \"%s\".", path);
-
- ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_metalog_sb));
- if (ret)
- goto error;
-
- ret = check_metadata_logfile_superblock(file);
- if (ret)
- goto error;
-// ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
-// ++ctx->stats.io_read_requests;
-
- metalogfile->file = file;
- metalogfile->pos = file_size;
-
- iterate_records(metalogfile);
-
- info("Metadata log \"%s\" migrated to the database (size:%"PRIu64").", path, file_size);
- add_migrated_file(path, file_size);
- return 0;
-
-error:
- error = ret;
- ret = uv_fs_close(NULL, &req, file, NULL);
- if (ret < 0) {
- error("uv_fs_close(%s): %s", path, uv_strerror(ret));
-// ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
- }
- uv_fs_req_cleanup(&req);
- return error;
-}
-
-static int scan_metalog_files_cmp(const void *a, const void *b)
-{
- struct metadata_logfile *file1, *file2;
- char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
-
- file1 = *(struct metadata_logfile **)a;
- file2 = *(struct metadata_logfile **)b;
- generate_metadata_logfile_path(file1, path1, sizeof(path1));
- generate_metadata_logfile_path(file2, path2, sizeof(path2));
- return strcmp(path1, path2);
-}
-
-/* Returns number of metadata logfiles that were loaded or < 0 on error */
-static int scan_metalog_files(struct metalog_instance *ctx)
-{
- int ret;
- unsigned starting_no, no, matched_files, i, failed_to_load;
- static uv_fs_t req;
- uv_dirent_t dent;
- struct metadata_logfile **metalogfiles, *metalogfile;
- char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
-
- ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL);
- if (ret < 0) {
- fatal_assert(req.result < 0);
- uv_fs_req_cleanup(&req);
- error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret));
-// ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
- return ret;
- }
- info("Found %d files in path %s", ret, dbfiles_path);
-
- metalogfiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*metalogfiles));
- for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
- info("Scanning file \"%s/%s\"", dbfiles_path, dent.name);
- ret = sscanf(dent.name, METALOG_PREFIX METALOG_FILE_NUMBER_SCAN_TMPL METALOG_EXTENSION, &starting_no, &no);
- if (2 == ret) {
- info("Matched file \"%s/%s\"", dbfiles_path, dent.name);
- metalogfile = mallocz(sizeof(*metalogfile));
- metadata_logfile_init(metalogfile, ctx, starting_no, no);
- metalogfiles[matched_files++] = metalogfile;
- }
- }
- uv_fs_req_cleanup(&req);
-
- if (0 == matched_files) {
- freez(metalogfiles);
- return 0;
- }
- if (matched_files == MAX_DATAFILES) {
- error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
- }
- qsort(metalogfiles, matched_files, sizeof(*metalogfiles), scan_metalog_files_cmp);
- ret = compaction_failure_recovery(ctx, metalogfiles, &matched_files);
- if (ret) { /* If the files are corrupted fail */
- for (i = 0 ; i < matched_files ; ++i) {
- freez(metalogfiles[i]);
- }
- freez(metalogfiles);
- return UV_EINVAL;
- }
- //ctx->last_fileno = metalogfiles[matched_files - 1]->fileno;
-
- struct plugind cd = {
- .enabled = 1,
- .update_every = 0,
- .pid = 0,
- .serial_failures = 0,
- .successful_collections = 0,
- .obsolete = 0,
- .started_t = INVALID_TIME,
- .next = NULL,
- .version = 0,
- };
-
- struct metalog_pluginsd_state metalog_parser_state;
- metalog_pluginsd_state_init(&metalog_parser_state, ctx);
-
- PARSER_USER_OBJECT metalog_parser_object = {
- .enabled = cd.enabled,
- .host = ctx->rrdeng_ctx->host,
- .cd = &cd,
- .trust_durations = 0,
- .private = &metalog_parser_state
- };
-
- PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_TOMBSTONE, pluginsd_tombstone);
- parser->plugins_action->dimension_action = &metalog_pluginsd_dimension_action;
- parser->plugins_action->chart_action = &metalog_pluginsd_chart_action;
- parser->plugins_action->guid_action = &metalog_pluginsd_guid_action;
- parser->plugins_action->context_action = &metalog_pluginsd_context_action;
- parser->plugins_action->tombstone_action = &metalog_pluginsd_tombstone_action;
- parser->plugins_action->host_action = &metalog_pluginsd_host_action;
-
-
- metalog_parser_object.parser = parser;
- ctx->metalog_parser_object = &metalog_parser_object;
-
- for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
- metalogfile = metalogfiles[i];
- db_lock();
- db_execute("BEGIN TRANSACTION;");
- ret = load_metadata_logfile(ctx, metalogfile);
- if (0 != ret) {
- error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
- METALOG_EXTENSION"\"", dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
- unlink_metadata_logfile(metalogfile);
- ++failed_to_load;
- db_execute("ROLLBACK TRANSACTION;");
- }
- else
- db_execute("COMMIT TRANSACTION;");
- db_unlock();
- freez(metalogfile);
- }
- matched_files -= failed_to_load;
- debug(D_METADATALOG, "PARSER ended");
-
- parser_destroy(parser);
-
- size_t count __maybe_unused = metalog_parser_object.count;
-
- debug(D_METADATALOG, "Parsing count=%u", (unsigned)count);
-
- freez(metalogfiles);
- return matched_files;
-}
-
-/* Return 0 on success. */
-int init_metalog_files(struct metalog_instance *ctx)
-{
- int ret;
- char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
-
- ret = scan_metalog_files(ctx);
- if (ret < 0) {
- error("Failed to scan path \"%s\".", dbfiles_path);
- return ret;
- }/* else if (0 == ret) {
- ctx->last_fileno = 1;
- }*/
-
- return 0;
-}
diff --git a/database/engine/metadata_log/logfile.h b/database/engine/metadata_log/logfile.h
deleted file mode 100644
index df12ac714..000000000
--- a/database/engine/metadata_log/logfile.h
+++ /dev/null
@@ -1,39 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_LOGFILE_H
-#define NETDATA_LOGFILE_H
-
-#include "metadatalogprotocol.h"
-#include "../rrdengine.h"
-
-/* Forward declarations */
-struct metadata_logfile;
-struct metalog_worker_config;
-
-#define METALOG_PREFIX "metadatalog-"
-#define METALOG_EXTENSION ".mlf"
-
-/* only one event loop is supported for now */
-struct metadata_logfile {
- unsigned fileno; /* Starts at 1 */
- unsigned starting_fileno; /* 0 for normal files, staring number during compaction */
- uv_file file;
- uint64_t pos;
- struct metalog_instance *ctx;
- struct metadata_logfile *next;
-};
-
-struct metadata_logfile_list {
- struct metadata_logfile *first; /* oldest */
- struct metadata_logfile *last; /* newest */
-};
-
-extern void generate_metadata_logfile_path(struct metadata_logfile *metadatalog, char *str, size_t maxlen);
-extern int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno,
- unsigned new_fileno);
-extern int unlink_metadata_logfile(struct metadata_logfile *metalogfile);
-extern int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *logfile);
-extern int init_metalog_files(struct metalog_instance *ctx);
-
-
-#endif /* NETDATA_LOGFILE_H */
diff --git a/database/engine/metadata_log/metadatalog.h b/database/engine/metadata_log/metadatalog.h
deleted file mode 100644
index 483036a91..000000000
--- a/database/engine/metadata_log/metadatalog.h
+++ /dev/null
@@ -1,28 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_METADATALOG_H
-#define NETDATA_METADATALOG_H
-
-#ifndef _GNU_SOURCE
-#define _GNU_SOURCE
-#endif
-#include "../rrdengine.h"
-#include "metadatalogprotocol.h"
-#include "logfile.h"
-#include "metadatalogapi.h"
-#include "compaction.h"
-
-/* Forward declarations */
-struct metalog_instance;
-struct parser_user_object;
-
-#define METALOG_FILE_NUMBER_SCAN_TMPL "%5u-%5u"
-#define METALOG_FILE_NUMBER_PRINT_TMPL "%5.5u-%5.5u"
-
-struct metalog_instance {
- struct rrdengine_instance *rrdeng_ctx;
- struct parser_user_object *metalog_parser_object;
- uint8_t initialized; /* set to 1 to mark context initialized */
-};
-
-#endif /* NETDATA_METADATALOG_H */
diff --git a/database/engine/metadata_log/metadatalogapi.c b/database/engine/metadata_log/metadatalogapi.c
deleted file mode 100755
index b206cca05..000000000
--- a/database/engine/metadata_log/metadatalogapi.c
+++ /dev/null
@@ -1,39 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-#define NETDATA_RRD_INTERNALS
-
-#include "metadatalog.h"
-
-/*
- * Returns 0 on success, negative on error
- */
-int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx)
-{
- struct metalog_instance *ctx;
- int error;
-
- ctx = callocz(1, sizeof(*ctx));
- ctx->initialized = 0;
- rrdeng_parent_ctx->metalog_ctx = ctx;
-
- ctx->rrdeng_ctx = rrdeng_parent_ctx;
- error = init_metalog_files(ctx);
- if (error) {
- goto error_after_init_rrd_files;
- }
- ctx->initialized = 1; /* notify dbengine that the metadata log has finished initializing */
- return 0;
-
-error_after_init_rrd_files:
- freez(ctx);
- return UV_EIO;
-}
-
-/* This function is called by dbengine rotation logic when the metric has no writers */
-void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid)
-{
- uuid_t multihost_uuid;
-
- delete_dimension_uuid(metric_uuid);
- rrdeng_convert_legacy_uuid_to_multihost(ctx->rrdeng_ctx->machine_guid, metric_uuid, &multihost_uuid);
- delete_dimension_uuid(&multihost_uuid);
-}
diff --git a/database/engine/metadata_log/metadatalogapi.h b/database/engine/metadata_log/metadatalogapi.h
deleted file mode 100644
index d558b9317..000000000
--- a/database/engine/metadata_log/metadatalogapi.h
+++ /dev/null
@@ -1,12 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_METADATALOGAPI_H
-#define NETDATA_METADATALOGAPI_H
-
-extern void metalog_commit_delete_chart(RRDSET *st);
-extern void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid);
-
-/* must call once before using anything */
-extern int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx);
-
-#endif /* NETDATA_METADATALOGAPI_H */
diff --git a/database/engine/metadata_log/metadatalogprotocol.h b/database/engine/metadata_log/metadatalogprotocol.h
deleted file mode 100644
index 1017213ae..000000000
--- a/database/engine/metadata_log/metadatalogprotocol.h
+++ /dev/null
@@ -1,53 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_METADATALOGPROTOCOL_H
-#define NETDATA_METADATALOGPROTOCOL_H
-
-#include "../rrddiskprotocol.h"
-
-#define RRDENG_METALOG_MAGIC "netdata-metadata-log"
-
-#define RRDENG_METALOG_VER (1)
-
-#define RRDENG_METALOG_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + sizeof(uint16_t)))
-/*
- * Metadata log persistent super-block
- */
-struct rrdeng_metalog_sb {
- char magic_number[RRDENG_MAGIC_SZ];
- uint16_t version;
- uint8_t padding[RRDENG_METALOG_SB_PADDING_SZ];
-} __attribute__ ((packed));
-
-/*
- * Metadata log record types
- */
-#define METALOG_STORE_PADDING (0)
-#define METALOG_CREATE_OBJECT (1)
-#define METALOG_DELETE_OBJECT (2)
-#define METALOG_OTHER (3) /* reserved */
-
-/*
- * Metadata log record header
- */
-struct rrdeng_metalog_record_header {
- /* when set to METALOG_STORE_PADDING jump to start of next block */
- uint8_t type;
-
- uint16_t header_length;
- uint32_t payload_length;
- /******************************************************
- * No fields above this point can ever change. *
- ******************************************************
- * All fields below this point are subject to change. *
- ******************************************************/
-} __attribute__ ((packed));
-
-/*
- * Metadata log record trailer
- */
-struct rrdeng_metalog_record_trailer {
- uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */
-} __attribute__ ((packed));
-
-#endif /* NETDATA_METADATALOGPROTOCOL_H */
diff --git a/database/engine/metadata_log/metalogpluginsd.c b/database/engine/metadata_log/metalogpluginsd.c
deleted file mode 100755
index a5301bc10..000000000
--- a/database/engine/metadata_log/metalogpluginsd.c
+++ /dev/null
@@ -1,140 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-#define NETDATA_RRD_INTERNALS
-
-#include "metadatalog.h"
-#include "metalogpluginsd.h"
-
-extern struct config stream_config;
-
-PARSER_RC metalog_pluginsd_host_action(
- void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone,
- char *tags)
-{
- struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
-
- RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
- if (host) {
- if (unlikely(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)) {
- error("Archived host '%s' has memory mode '%s', but the archived one is '%s'. Ignoring archived state.",
- host->hostname, rrd_memory_mode_name(host->rrd_memory_mode),
- rrd_memory_mode_name(RRD_MEMORY_MODE_DBENGINE));
- ((PARSER_USER_OBJECT *) user)->host = NULL; /* Ignore objects if memory mode is not dbengine */
- }
- ((PARSER_USER_OBJECT *) user)->host = host;
- return PARSER_RC_OK;
- }
-
- if (strcmp(machine_guid, registry_get_this_machine_guid()) == 0) {
- ((PARSER_USER_OBJECT *) user)->host = host;
- return PARSER_RC_OK;
- }
-
- if (likely(!uuid_parse(machine_guid, state->host_uuid))) {
- int rc = sql_store_host(&state->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags, 1);
- if (unlikely(rc)) {
- errno = 0;
- error("Failed to store host %s with UUID %s in the database", hostname, machine_guid);
- }
- }
- else {
- errno = 0;
- error("Host machine GUID %s is not valid", machine_guid);
- }
-
- return PARSER_RC_OK;
-}
-
-PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context,
- char *title, char *units, char *plugin, char *module, int priority,
- int update_every, RRDSET_TYPE chart_type, char *options)
-{
- UNUSED(options);
-
- struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
- RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
-
- if (unlikely(uuid_is_null(state->host_uuid))) {
- debug(D_METADATALOG, "Ignoring chart belonging to missing or ignored host.");
- return PARSER_RC_OK;
- }
- uuid_copy(state->chart_uuid, state->uuid);
- uuid_clear(state->uuid); /* Consume UUID */
- (void) sql_store_chart(&state->chart_uuid, &state->host_uuid,
- type, id, name, family, context, title, units,
- plugin, module, priority, update_every,
- chart_type, RRD_MEMORY_MODE_DBENGINE, host ? host->rrd_history_entries : 1);
- ((PARSER_USER_OBJECT *)user)->st_exists = 1;
-
- return PARSER_RC_OK;
-}
-
-PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm,
- long multiplier, long divisor, char *options, RRD_ALGORITHM algorithm_type)
-{
- struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
- UNUSED(user);
- UNUSED(options);
- UNUSED(algorithm);
- UNUSED(st);
-
- if (unlikely(uuid_is_null(state->chart_uuid))) {
- debug(D_METADATALOG, "Ignoring dimension belonging to missing or ignored chart.");
- info("Ignoring dimension belonging to missing or ignored chart.");
- return PARSER_RC_OK;
- }
-
- if (unlikely(uuid_is_null(state->uuid))) {
- debug(D_METADATALOG, "Ignoring dimension without unknown UUID");
- info("Ignoring dimension without unknown UUID");
- return PARSER_RC_OK;
- }
-
- (void) sql_store_dimension(&state->uuid, &state->chart_uuid, id, name, multiplier, divisor, algorithm_type);
- uuid_clear(state->uuid); /* Consume UUID */
-
- return PARSER_RC_OK;
-}
-
-PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid)
-{
- struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
-
- uuid_copy(state->uuid, *uuid);
-
- return PARSER_RC_OK;
-}
-
-PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid)
-{
- struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
-
- int rc = find_uuid_type(uuid);
-
- if (rc == 1) {
- uuid_copy(state->host_uuid, *uuid);
- ((PARSER_USER_OBJECT *)user)->st_exists = 0;
- ((PARSER_USER_OBJECT *)user)->host_exists = 1;
- } else if (rc == 2) {
- uuid_copy(state->chart_uuid, *uuid);
- ((PARSER_USER_OBJECT *)user)->st_exists = 1;
- } else
- uuid_copy(state->uuid, *uuid);
-
- return PARSER_RC_OK;
-}
-
-PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid)
-{
- UNUSED(user);
- UNUSED(uuid);
-
- return PARSER_RC_OK;
-}
-
-void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx)
-{
- state->ctx = ctx;
- state->skip_record = 0;
- uuid_clear(state->uuid);
- state->metalogfile = NULL;
-}
diff --git a/database/engine/metadata_log/metalogpluginsd.h b/database/engine/metadata_log/metalogpluginsd.h
deleted file mode 100644
index 4fd8c3900..000000000
--- a/database/engine/metadata_log/metalogpluginsd.h
+++ /dev/null
@@ -1,33 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_METALOGPLUGINSD_H
-#define NETDATA_METALOGPLUGINSD_H
-
-#include "collectors/plugins.d/pluginsd_parser.h"
-#include "collectors/plugins.d/plugins_d.h"
-#include "parser/parser.h"
-
-struct metalog_pluginsd_state {
- struct metalog_instance *ctx;
- uuid_t uuid;
- uuid_t host_uuid;
- uuid_t chart_uuid;
- uint8_t skip_record; /* skip this record due to errors in parsing */
- struct metadata_logfile *metalogfile; /* current metadata log file being replayed */
-};
-
-extern void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx);
-
-extern PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family,
- char *context, char *title, char *units, char *plugin, char *module,
- int priority, int update_every, RRDSET_TYPE chart_type, char *options);
-extern PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm,
- long multiplier, long divisor, char *options,
- RRD_ALGORITHM algorithm_type);
-extern PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid);
-extern PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid);
-extern PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid);
-extern PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC metalog_pluginsd_host_action(void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, char *tags);
-
-#endif /* NETDATA_METALOGPLUGINSD_H */
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index 39f7642d0..d65cb35a5 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -4,8 +4,8 @@
#include "rrdengine.h"
ARAL page_descr_aral = {
- .element_size = sizeof(struct rrdeng_page_descr),
- .elements = 20000,
+ .requested_element_size = sizeof(struct rrdeng_page_descr),
+ .initial_elements = 20000,
.filename = "page_descriptors",
.cache_dir = &netdata_configured_cache_dir,
.use_mmap = false,
@@ -127,12 +127,13 @@ struct rrdeng_page_descr *pg_cache_create_descr(void)
descr = rrdeng_page_descr_mallocz();
descr->page_length = 0;
- descr->start_time = INVALID_TIME;
- descr->end_time = INVALID_TIME;
+ descr->start_time_ut = INVALID_TIME;
+ descr->end_time_ut = INVALID_TIME;
descr->id = NULL;
descr->extent = NULL;
descr->pg_cache_descr_state = 0;
descr->pg_cache_descr = NULL;
+ descr->update_every_s = 0;
return descr;
}
@@ -476,7 +477,7 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
uv_rwlock_wrlock(&page_index->lock);
- ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
+ ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0);
if (unlikely(0 == ret)) {
uv_rwlock_wrunlock(&page_index->lock);
if (unlikely(debug_flags & D_RRDENGINE)) {
@@ -506,7 +507,7 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d
while (!pg_cache_try_get_unsafe(descr, 1)) {
debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ print_page_cache_descr(descr, "", true);
pg_cache_wait_event_unsafe(descr);
}
}
@@ -517,7 +518,7 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d
while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ print_page_cache_descr(descr, "", true);
pg_cache_wait_event_unsafe(descr);
}
}
@@ -548,8 +549,8 @@ static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t
{
usec_t pg_start, pg_end;
- pg_start = descr->start_time;
- pg_end = descr->end_time;
+ pg_start = descr->start_time_ut;
+ pg_end = descr->end_time_ut;
return (pg_start < start_time && pg_end >= start_time) ||
(pg_start >= start_time && pg_start <= end_time);
@@ -557,7 +558,7 @@ static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t
static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
{
- return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
+ return (point_in_time >= descr->start_time_ut && point_in_time <= descr->end_time_ut);
}
/* The caller must hold the page index lock */
@@ -592,14 +593,14 @@ static inline struct rrdeng_page_descr *
/* Update metric oldest and latest timestamps efficiently when adding new values */
void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
{
- usec_t oldest_time = page_index->oldest_time;
- usec_t latest_time = page_index->latest_time;
+ usec_t oldest_time = page_index->oldest_time_ut;
+ usec_t latest_time = page_index->latest_time_ut;
- if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) {
- page_index->oldest_time = descr->start_time;
+ if (unlikely(oldest_time == INVALID_TIME || descr->start_time_ut < oldest_time)) {
+ page_index->oldest_time_ut = descr->start_time_ut;
}
- if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) {
- page_index->latest_time = descr->end_time;
+ if (likely(descr->end_time_ut > latest_time || latest_time == INVALID_TIME)) {
+ page_index->latest_time_ut = descr->end_time_ut;
}
}
@@ -618,23 +619,23 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
if (likely(NULL != firstPValue)) {
descr = *firstPValue;
- oldest_time = descr->start_time;
+ oldest_time = descr->start_time_ut;
}
lastIndex = (Word_t)-1;
lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
if (likely(NULL != lastPValue)) {
descr = *lastPValue;
- latest_time = descr->end_time;
+ latest_time = descr->end_time_ut;
}
uv_rwlock_rdunlock(&page_index->lock);
if (unlikely(NULL == firstPValue)) {
fatal_assert(NULL == lastPValue);
- page_index->oldest_time = page_index->latest_time = INVALID_TIME;
+ page_index->oldest_time_ut = page_index->latest_time_ut = INVALID_TIME;
return;
}
- page_index->oldest_time = oldest_time;
- page_index->latest_time = latest_time;
+ page_index->oldest_time_ut = oldest_time;
+ page_index->latest_time_ut = latest_time;
}
/* If index is NULL lookup by UUID (descr->id) */
@@ -669,7 +670,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
}
uv_rwlock_wrlock(&page_index->lock);
- PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
+ PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0);
*PValue = descr;
++page_index->page_count;
pg_cache_add_new_metric_time(page_index, descr);
@@ -681,7 +682,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
}
-usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
+usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr = NULL;
@@ -699,25 +700,25 @@ usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id,
}
uv_rwlock_rdlock(&page_index->lock);
- descr = find_first_page_in_time_range(page_index, start_time, end_time);
+ descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
if (NULL == descr) {
uv_rwlock_rdunlock(&page_index->lock);
return INVALID_TIME;
}
uv_rwlock_rdunlock(&page_index->lock);
- return descr->start_time;
+ return descr->start_time_ut;
}
/**
* Return page information for the first page before point_in_time that satisfies the filter.
* @param ctx DB context
* @param page_index page index of a metric
- * @param point_in_time the pages that are searched must be older than this timestamp
+ * @param point_in_time_ut the pages that are searched must be older than this timestamp
* @param filter decides if the page satisfies the caller's criteria
* @param page_info the result of the search is set in this pointer
*/
void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
- usec_t point_in_time, pg_cache_page_info_filter_t *filter,
+ usec_t point_in_time_ut, pg_cache_page_info_filter_t *filter,
struct rrdeng_page_info *page_info)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -728,7 +729,7 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c
(void)pg_cache;
fatal_assert(NULL != page_index);
- Index = (Word_t)(point_in_time / USEC_PER_SEC);
+ Index = (Word_t)(point_in_time_ut / USEC_PER_SEC);
uv_rwlock_rdlock(&page_index->lock);
do {
PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
@@ -736,12 +737,12 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c
} while (descr != NULL && !filter(descr));
if (unlikely(NULL == descr)) {
page_info->page_length = 0;
- page_info->start_time = INVALID_TIME;
- page_info->end_time = INVALID_TIME;
+ page_info->start_time_ut = INVALID_TIME;
+ page_info->end_time_ut = INVALID_TIME;
} else {
page_info->page_length = descr->page_length;
- page_info->start_time = descr->start_time;
- page_info->end_time = descr->end_time;
+ page_info->start_time_ut = descr->start_time_ut;
+ page_info->end_time_ut = descr->end_time_ut;
}
uv_rwlock_rdunlock(&page_index->lock);
}
@@ -750,7 +751,7 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c
* Searches for an unallocated page without triggering disk I/O. Attempts to reserve the page and get a reference.
* @param ctx DB context
* @param id lookup by UUID
- * @param start_time exact starting time in usec
+ * @param start_time_ut exact starting time in usec
* @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
* @return the page descriptor or NULL on failure. It can fail if:
* 1. The page is already allocated to the page cache.
@@ -758,7 +759,7 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c
* 3. It did not succeed to reserve a spot in the page cache.
*/
struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
- usec_t start_time)
+ usec_t start_time_ut)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr = NULL;
@@ -781,7 +782,7 @@ struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_
}
uv_rwlock_rdlock(&page_index->lock);
- Index = (Word_t)(start_time / USEC_PER_SEC);
+ Index = (Word_t)(start_time_ut / USEC_PER_SEC);
PValue = JudyLGet(page_index->JudyL_array, Index, PJE0);
if (likely(NULL != PValue)) {
descr = *PValue;
@@ -818,15 +819,15 @@ struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_
* Does not get a reference.
* @param ctx DB context
* @param id UUID
- * @param start_time inclusive starting time in usec
- * @param end_time inclusive ending time in usec
+ * @param start_time_ut inclusive starting time in usec
+ * @param end_time_ut inclusive ending time in usec
* @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
* with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
* If page_info_arrayp is set to NULL nothing was allocated.
* @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
* @return the number of pages that overlap with the time range [start_time,end_time].
*/
-unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
+unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut,
struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -854,14 +855,14 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta
}
uv_rwlock_rdlock(&page_index->lock);
- descr = find_first_page_in_time_range(page_index, start_time, end_time);
+ descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
if (NULL == descr) {
uv_rwlock_rdunlock(&page_index->lock);
debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
*ret_page_indexp = NULL;
return 0;
} else {
- Index = (Word_t)(descr->start_time / USEC_PER_SEC);
+ Index = (Word_t)(descr->start_time_ut / USEC_PER_SEC);
}
if (page_info_arrayp) {
page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
@@ -869,7 +870,7 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta
}
for (count = 0, preload_count = 0 ;
- descr != NULL && is_page_in_time_range(descr, start_time, end_time) ;
+ descr != NULL && is_page_in_time_range(descr, start_time_ut, end_time_ut) ;
PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
/* Iterate all pages in range */
@@ -881,8 +882,8 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta
page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
*page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
}
- (*page_info_arrayp)[count].start_time = descr->start_time;
- (*page_info_arrayp)[count].end_time = descr->end_time;
+ (*page_info_arrayp)[count].start_time_ut = descr->start_time_ut;
+ (*page_info_arrayp)[count].end_time_ut = descr->end_time_ut;
(*page_info_arrayp)[count].page_length = descr->page_length;
}
++count;
@@ -974,7 +975,7 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta
*/
struct rrdeng_page_descr *
pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
- usec_t point_in_time)
+ usec_t point_in_time_ut)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr = NULL;
@@ -1003,15 +1004,15 @@ struct rrdeng_page_descr *
page_not_in_cache = 0;
uv_rwlock_rdlock(&page_index->lock);
while (1) {
- Index = (Word_t)(point_in_time / USEC_PER_SEC);
+ Index = (Word_t)(point_in_time_ut / USEC_PER_SEC);
PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
if (likely(NULL != PValue)) {
descr = *PValue;
}
if (NULL == PValue ||
0 == descr->page_length ||
- (INVALID_TIME != point_in_time &&
- !is_point_in_time_in_page(descr, point_in_time))) {
+ (INVALID_TIME != point_in_time_ut &&
+ !is_point_in_time_in_page(descr, point_in_time_ut))) {
/* non-empty page not found */
uv_rwlock_rdunlock(&page_index->lock);
@@ -1038,7 +1039,7 @@ struct rrdeng_page_descr *
debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ print_page_cache_descr(descr, "", true);
while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
pg_cache_wait_event_unsafe(descr);
}
@@ -1053,7 +1054,7 @@ struct rrdeng_page_descr *
uv_rwlock_rdunlock(&page_index->lock);
debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ print_page_cache_descr(descr, "", true);
if (!(flags & RRD_PAGE_POPULATED))
page_not_in_cache = 1;
pg_cache_wait_event_unsafe(descr);
@@ -1081,7 +1082,7 @@ struct rrdeng_page_descr *
*/
struct rrdeng_page_descr *
pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
- usec_t start_time, usec_t end_time)
+ usec_t start_time_ut, usec_t end_time_ut)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr = NULL;
@@ -1110,7 +1111,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index
uv_rwlock_rdlock(&page_index->lock);
int retry_count = 0;
while (1) {
- descr = find_first_page_in_time_range(page_index, start_time, end_time);
+ descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
if (NULL == descr || 0 == descr->page_length || retry_count == default_rrdeng_page_fetch_retries) {
/* non-empty page not found */
if (retry_count == default_rrdeng_page_fetch_retries)
@@ -1140,7 +1141,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index
debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ print_page_cache_descr(descr, "", true);
while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
pg_cache_wait_event_unsafe(descr);
}
@@ -1155,7 +1156,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index
uv_rwlock_rdunlock(&page_index->lock);
debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ print_page_cache_descr(descr, "", true);
if (!(flags & RRD_PAGE_POPULATED))
page_not_in_cache = 1;
@@ -1180,7 +1181,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index
return descr;
}
-struct pg_cache_page_index *create_page_index(uuid_t *id)
+struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_instance *ctx)
{
struct pg_cache_page_index *page_index;
@@ -1188,11 +1189,15 @@ struct pg_cache_page_index *create_page_index(uuid_t *id)
page_index->JudyL_array = (Pvoid_t) NULL;
uuid_copy(page_index->id, *id);
fatal_assert(0 == uv_rwlock_init(&page_index->lock));
- page_index->oldest_time = INVALID_TIME;
- page_index->latest_time = INVALID_TIME;
+ page_index->oldest_time_ut = INVALID_TIME;
+ page_index->latest_time_ut = INVALID_TIME;
page_index->prev = NULL;
page_index->page_count = 0;
+ page_index->refcount = 0;
page_index->writers = 0;
+ page_index->ctx = ctx;
+ page_index->alignment = NULL;
+ page_index->latest_update_every_s = default_rrd_update_every;
return page_index;
}
@@ -1238,24 +1243,6 @@ void init_page_cache(struct rrdengine_instance *ctx)
init_committed_page_index(ctx);
}
-
-
-/*
- * METRIC # number
- * 1. INDEX: JudyHS # bytes
- * 2. DATA: page_index # bytes
- *
- * PAGE (1 page of 1 metric) # number
- * 1. INDEX AT METRIC: page_index->JudyL_array # bytes
- * 2. DATA: descr # bytes
- *
- * PAGE CACHE (1 page of 1 metric at the cache) # number
- * 1. pg_cache_descr (if PG_CACHE_DESCR_ALLOCATED) # bytes
- * 2. data (if RRD_PAGE_POPULATED) # bytes
- *
- */
-
-
void free_page_cache(struct rrdengine_instance *ctx)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -1265,30 +1252,15 @@ void free_page_cache(struct rrdengine_instance *ctx)
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
- Word_t metrics_number = 0,
- metrics_bytes = 0,
- metrics_index_bytes = 0,
- metrics_duration = 0;
-
- Word_t pages_number = 0,
- pages_bytes = 0,
- pages_index_bytes = 0;
-
- Word_t pages_size_per_type[256] = { 0 },
- pages_count_per_type[256] = { 0 };
-
- Word_t cache_pages_number = 0,
- cache_pages_bytes = 0,
- cache_pages_data_bytes = 0;
-
- size_t points_in_db = 0,
- uncompressed_points_size = 0,
- seconds_in_db = 0,
- single_point_pages = 0;
-
- Word_t pages_dirty_index_bytes = 0;
-
- usec_t oldest_time_ut = LONG_MAX, latest_time_ut = 0;
+ // if we are exiting, the OS will recover all memory so do not slow down the shutdown process
+ // Do the cleanup if we are compiling with NETDATA_INTERNAL_CHECKS
+ // This affects the reporting of dbengine statistics which are available in real time
+ // via the /api/v1/dbengine_stats endpoint
+#ifndef NETDATA_DBENGINE_FREE
+ if (netdata_exit)
+ return;
+#endif
+ Word_t metrics_index_bytes = 0, pages_index_bytes = 0, pages_dirty_index_bytes = 0;
/* Free committed page index */
pages_dirty_index_bytes = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
@@ -1305,116 +1277,30 @@ void free_page_cache(struct rrdengine_instance *ctx)
PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
descr = unlikely(NULL == PValue) ? NULL : *PValue;
- size_t metric_duration = 0;
- size_t metric_update_every = 0;
- size_t metric_single_point_pages = 0;
-
while (descr != NULL) {
/* Iterate all page descriptors of this metric */
if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
- cache_pages_number++;
-
/* Check rrdenglocking.c */
pg_cache_descr = descr->pg_cache_descr;
if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
dbengine_page_free(pg_cache_descr->page);
- cache_pages_data_bytes += RRDENG_BLOCK_SIZE;
}
rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
- cache_pages_bytes += sizeof(*pg_cache_descr);
}
-
- if(descr->start_time < oldest_time_ut)
- oldest_time_ut = descr->start_time;
-
- if(descr->end_time > latest_time_ut)
- latest_time_ut = descr->end_time;
-
- pages_size_per_type[descr->type] += descr->page_length;
- pages_count_per_type[descr->type]++;
-
- size_t points_in_page = (descr->page_length / PAGE_POINT_SIZE_BYTES(descr));
- size_t page_duration = ((descr->end_time - descr->start_time) / USEC_PER_SEC);
- size_t update_every = (page_duration == 0) ? 1 : page_duration / (points_in_page - 1);
-
- if (!page_duration && metric_update_every) {
- page_duration = metric_update_every;
- update_every = metric_update_every;
- }
- else if(page_duration)
- metric_update_every = update_every;
-
- uncompressed_points_size += descr->page_length;
-
- if(page_duration > 0) {
- page_duration = update_every * points_in_page;
- metric_duration += page_duration;
- seconds_in_db += page_duration;
- points_in_db += descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
- }
- else
- metric_single_point_pages++;
-
rrdeng_page_descr_freez(descr);
- pages_bytes += sizeof(*descr);
- pages_number++;
PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
descr = unlikely(NULL == PValue) ? NULL : *PValue;
}
- if(metric_single_point_pages && metric_update_every) {
- points_in_db += metric_single_point_pages;
- seconds_in_db += metric_update_every * metric_single_point_pages;
- metric_duration += metric_update_every * metric_single_point_pages;
- }
- else
- single_point_pages += metric_single_point_pages;
-
/* Free page index */
pages_index_bytes += JudyLFreeArray(&page_index->JudyL_array, PJE0);
fatal_assert(NULL == page_index->JudyL_array);
freez(page_index);
-
- metrics_number++;
- metrics_bytes += sizeof(*page_index);
- metrics_duration += metric_duration;
}
/* Free metrics index */
metrics_index_bytes = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array);
-
- if(!metrics_number) metrics_number = 1;
- if(!pages_number) pages_number = 1;
- if(!cache_pages_number) cache_pages_number = 1;
- if(!points_in_db) points_in_db = 1;
- if(latest_time_ut == oldest_time_ut) oldest_time_ut -= USEC_PER_SEC;
-
- if(single_point_pages) {
- long double avg_duration = (long double)seconds_in_db / points_in_db;
- points_in_db += single_point_pages;
- seconds_in_db += (size_t)(avg_duration * single_point_pages);
- }
-
- info("DBENGINE STATISTICS ON METRICS:"
- " Metrics: %lu (structures %lu bytes - per metric %0.2f, index (HS) %lu bytes - per metric %0.2f bytes - duration %zu secs) |"
- " Page descriptors: %lu (structures %lu bytes - per page %0.2f bytes, index (L) %lu bytes - per page %0.2f, dirty index %lu bytes). |"
- " Page cache: %lu pages (structures %lu bytes - per page %0.2f bytes, data %lu bytes). |"
- " Points in db %zu, uncompressed size of points database %zu bytes. |"
- " Duration of all points %zu seconds, average point duration %0.2f seconds."
- " Duration of the database %llu seconds, average metric duration %0.2f seconds, average metric lifetime %0.2f%%."
- , metrics_number, metrics_bytes, (double)metrics_bytes/metrics_number, metrics_index_bytes, (double)metrics_index_bytes/metrics_number, metrics_duration
- , pages_number, pages_bytes, (double)pages_bytes/pages_number, pages_index_bytes, (double)pages_index_bytes/pages_number, pages_dirty_index_bytes
- , cache_pages_number, cache_pages_bytes, (double)cache_pages_bytes/cache_pages_number, cache_pages_data_bytes
- , points_in_db, uncompressed_points_size
- , seconds_in_db, (double)seconds_in_db/points_in_db
- , (latest_time_ut - oldest_time_ut) / USEC_PER_SEC, (double)metrics_duration/metrics_number
- , (double)metrics_duration/metrics_number * 100.0 / ((latest_time_ut - oldest_time_ut) / USEC_PER_SEC)
- );
-
- for(int i = 0; i < 256 ;i++) {
- if(pages_count_per_type[i])
- info("DBENGINE STATISTICS ON PAGE TYPES: page type %d total pages %lu, average page size %0.2f bytes", i, pages_count_per_type[i], (double)pages_size_per_type[i]/pages_count_per_type[i]);
- }
+ info("Freed %lu bytes of memory from page cache.", pages_dirty_index_bytes + pages_index_bytes + metrics_index_bytes);
}
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index b938b9e05..2f4d6b332 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -60,18 +60,19 @@ struct rrdeng_page_descr {
volatile unsigned long pg_cache_descr_state;
/* page information */
- usec_t start_time;
- usec_t end_time;
- uint32_t page_length;
+ usec_t start_time_ut;
+ usec_t end_time_ut;
+ uint32_t update_every_s:24;
uint8_t type;
+ uint32_t page_length;
};
#define PAGE_INFO_SCRATCH_SZ (8)
struct rrdeng_page_info {
uint8_t scratch[PAGE_INFO_SCRATCH_SZ]; /* scratch area to be used by page-cache users */
- usec_t start_time;
- usec_t end_time;
+ usec_t start_time_ut;
+ usec_t end_time_ut;
uint32_t page_length;
};
@@ -80,6 +81,11 @@ typedef int pg_cache_page_info_filter_t(struct rrdeng_page_descr *);
#define PAGE_CACHE_MAX_PRELOAD_PAGES (256)
+struct pg_alignment {
+ uint32_t page_length;
+ uint32_t refcount;
+};
+
/* maps time ranges to pages */
struct pg_cache_page_index {
uuid_t id;
@@ -89,6 +95,7 @@ struct pg_cache_page_index {
*/
Pvoid_t JudyL_array;
Word_t page_count;
+ unsigned short refcount;
unsigned short writers;
uv_rwlock_t lock;
@@ -96,13 +103,17 @@ struct pg_cache_page_index {
* Only one effective writer, data deletion workqueue.
* It's also written during the DB loading phase.
*/
- usec_t oldest_time;
+ usec_t oldest_time_ut;
/*
* Only one effective writer, data collection thread.
* It's also written by the data deletion workqueue when data collection is disabled for this metric.
*/
- usec_t latest_time;
+ usec_t latest_time_ut;
+
+ struct rrdengine_instance *ctx;
+ struct pg_alignment *alignment;
+ uint32_t latest_update_every_s;
struct pg_cache_page_index *prev;
};
@@ -152,93 +163,93 @@ struct page_cache { /* TODO: add statistics */
unsigned populated_pages;
};
-extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr);
-extern void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
-extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr);
-extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
-extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
+void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr);
+void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr);
+unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
struct rrdeng_page_descr *descr);
-extern void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
+void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
struct rrdeng_page_descr *descr);
-extern void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
+void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
struct rrdeng_page_descr *descr);
-extern struct rrdeng_page_descr *pg_cache_create_descr(void);
-extern int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access);
-extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr);
-extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
-extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
+struct rrdeng_page_descr *pg_cache_create_descr(void);
+int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access);
+void pg_cache_put_unsafe(struct rrdeng_page_descr *descr);
+void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
struct rrdeng_page_descr *descr);
-extern uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
+uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
uint8_t remove_dirty, uint8_t is_exclusive_holder, uuid_t *metric_id);
-extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id,
- usec_t start_time, usec_t end_time);
-extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
- usec_t point_in_time, pg_cache_page_info_filter_t *filter,
+usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id,
+ usec_t start_time_ut, usec_t end_time_ut);
+void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
+ usec_t point_in_time_ut, pg_cache_page_info_filter_t *filter,
struct rrdeng_page_info *page_info);
-extern struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
- usec_t start_time);
-extern unsigned
- pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
+struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
+ usec_t start_time_ut);
+unsigned
+ pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut,
struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp);
-extern struct rrdeng_page_descr *
+struct rrdeng_page_descr *
pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
- usec_t point_in_time);
-extern struct rrdeng_page_descr *
+ usec_t point_in_time_ut);
+struct rrdeng_page_descr *
pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
- usec_t start_time, usec_t end_time);
-extern struct pg_cache_page_index *create_page_index(uuid_t *id);
-extern void init_page_cache(struct rrdengine_instance *ctx);
-extern void free_page_cache(struct rrdengine_instance *ctx);
-extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr);
-extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
-extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx);
-extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx);
-extern unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx);
-
-extern void rrdeng_page_descr_aral_go_singlethreaded(void);
-extern void rrdeng_page_descr_aral_go_multithreaded(void);
-extern void rrdeng_page_descr_use_malloc(void);
-extern void rrdeng_page_descr_use_mmap(void);
-extern bool rrdeng_page_descr_is_mmap(void);
-extern struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void);
-extern void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr);
+ usec_t start_time_ut, usec_t end_time_ut);
+struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_instance *ctx);
+void init_page_cache(struct rrdengine_instance *ctx);
+void free_page_cache(struct rrdengine_instance *ctx);
+void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr);
+void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
+unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx);
+unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx);
+unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx);
+
+void rrdeng_page_descr_aral_go_singlethreaded(void);
+void rrdeng_page_descr_aral_go_multithreaded(void);
+void rrdeng_page_descr_use_malloc(void);
+void rrdeng_page_descr_use_mmap(void);
+bool rrdeng_page_descr_is_mmap(void);
+struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void);
+void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr);
static inline void
- pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp)
+ pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_time_ut_p, uint32_t *page_lengthp)
{
- usec_t end_time, old_end_time;
+ usec_t end_time_ut, old_end_time_ut;
uint32_t page_length;
if (NULL == descr->extent) {
/* this page is currently being modified, get consistent info locklessly */
do {
- end_time = descr->end_time;
+ end_time_ut = descr->end_time_ut;
__sync_synchronize();
- old_end_time = end_time;
+ old_end_time_ut = end_time_ut;
page_length = descr->page_length;
__sync_synchronize();
- end_time = descr->end_time;
+ end_time_ut = descr->end_time_ut;
__sync_synchronize();
- } while ((end_time != old_end_time || (end_time & 1) != 0));
+ } while ((end_time_ut != old_end_time_ut || (end_time_ut & 1) != 0));
- *end_timep = end_time;
+ *end_time_ut_p = end_time_ut;
*page_lengthp = page_length;
} else {
- *end_timep = descr->end_time;
+ *end_time_ut_p = descr->end_time_ut;
*page_lengthp = descr->page_length;
}
}
/* The caller must hold a reference to the page and must have already set the new data */
-static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time, uint32_t page_length)
+static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time_ut, uint32_t page_length)
{
- fatal_assert(!(end_time & 1));
+ fatal_assert(!(end_time_ut & 1));
__sync_synchronize();
- descr->end_time |= 1; /* mark start of uncertainty period by adding 1 microsecond */
+ descr->end_time_ut |= 1; /* mark start of uncertainty period by adding 1 microsecond */
__sync_synchronize();
descr->page_length = page_length;
__sync_synchronize();
- descr->end_time = end_time; /* mark end of uncertainty period */
+ descr->end_time_ut = end_time_ut; /* mark end of uncertainty period */
}
#endif /* NETDATA_PAGECACHE_H */
diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h
index cb57385a4..5b4be9498 100644
--- a/database/engine/rrddiskprotocol.h
+++ b/database/engine/rrddiskprotocol.h
@@ -46,8 +46,8 @@ struct rrdeng_extent_page_descr {
uint8_t uuid[UUID_SZ];
uint32_t page_length;
- uint64_t start_time;
- uint64_t end_time;
+ uint64_t start_time_ut;
+ uint64_t end_time_ut;
} __attribute__ ((packed));
/*
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 8b35051d8..e4cd37e98 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -30,7 +30,7 @@ void dbengine_page_free(void *page) {
if (unlikely(db_engine_use_malloc))
freez(page);
else
- munmap(page, RRDENG_BLOCK_SIZE);
+ netdata_munmap(page, RRDENG_BLOCK_SIZE);
}
static void sanity_check(void)
@@ -206,8 +206,8 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str
/* care, we don't hold the descriptor mutex */
if (!uuid_compare(*extent->pages[j]->id, *descr->id) &&
extent->pages[j]->page_length == descr->page_length &&
- extent->pages[j]->start_time == descr->start_time &&
- extent->pages[j]->end_time == descr->end_time) {
+ extent->pages[j]->start_time_ut == descr->start_time_ut &&
+ extent->pages[j]->end_time_ut == descr->end_time_ut) {
break;
}
page_offset += extent->pages[j]->page_length;
@@ -272,11 +272,9 @@ static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type)
}
}
-void read_extent_cb(uv_fs_t* req)
+static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed)
{
- struct rrdengine_worker_config* wc = req->loop->data;
struct rrdengine_instance *ctx = wc->ctx;
- struct extent_io_descriptor *xt_io_descr;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
int ret;
@@ -289,21 +287,20 @@ void read_extent_cb(uv_fs_t* req)
struct rrdeng_df_extent_trailer *trailer;
uLong crc;
- xt_io_descr = req->data;
header = xt_io_descr->buf;
payload_length = header->payload_length;
count = header->number_of_pages;
payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer);
- if (req->result < 0) {
+ if (unlikely(read_failed)) {
struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
have_read_error = 1;
- error("%s: uv_fs_read - %s - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__,
- uv_strerror((int)req->result), xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ error("%s: uv_fs_read - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, xt_io_descr->pos,
+ xt_io_descr->bytes, datafile->tier, datafile->fileno);
goto after_crc_check;
}
crc = crc32(0L, Z_NULL, 0);
@@ -378,8 +375,8 @@ after_crc_check:
/* care, we don't hold the descriptor mutex */
if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj->id) &&
header->descr[i].page_length == descrj->page_length &&
- header->descr[i].start_time == descrj->start_time &&
- header->descr[i].end_time == descrj->end_time) {
+ header->descr[i].start_time_ut == descrj->start_time_ut &&
+ header->descr[i].end_time_ut == descrj->end_time_ut) {
descr = descrj;
break;
}
@@ -387,7 +384,7 @@ after_crc_check:
is_prefetched_page = 0;
if (!descr) { /* This extent page has not been requested. Try populating it for locality (best effort). */
descr = pg_cache_lookup_unpopulated_and_lock(ctx, (uuid_t *)header->descr[i].uuid,
- header->descr[i].start_time);
+ header->descr[i].start_time_ut);
if (!descr)
continue; /* Failed to reserve a suitable page */
is_prefetched_page = 1;
@@ -421,11 +418,67 @@ after_crc_check:
}
if (xt_io_descr->completion)
completion_mark_complete(xt_io_descr->completion);
+}
+
+static void read_extent_cb(uv_fs_t *req)
+{
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct extent_io_descriptor *xt_io_descr;
+
+ xt_io_descr = req->data;
+ do_extent_processing(wc, xt_io_descr, req->result < 0);
uv_fs_req_cleanup(req);
- free(xt_io_descr->buf);
+ posix_memfree(xt_io_descr->buf);
freez(xt_io_descr);
}
+static void read_mmap_extent_cb(uv_work_t *req, int status __maybe_unused)
+{
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct extent_io_descriptor *xt_io_descr;
+ xt_io_descr = req->data;
+
+ if (likely(xt_io_descr->map_base)) {
+ do_extent_processing(wc, xt_io_descr, false);
+ munmap(xt_io_descr->map_base, xt_io_descr->map_length);
+ freez(xt_io_descr);
+ return;
+ }
+
+ // MMAP failed, so do uv_fs_read
+ int ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(xt_io_descr->bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ unsigned real_io_size = ALIGN_BYTES_CEILING( xt_io_descr->bytes);
+ xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
+ xt_io_descr->req.data = xt_io_descr;
+ ret = uv_fs_read(req->loop, &xt_io_descr->req, xt_io_descr->file, &xt_io_descr->iov, 1, (unsigned) xt_io_descr->pos, read_extent_cb);
+ fatal_assert(-1 != ret);
+ ctx->stats.io_read_bytes += real_io_size;
+ ctx->stats.io_read_extent_bytes += real_io_size;
+}
+
+static void do_mmap_read_extent(uv_work_t *req)
+{
+ struct extent_io_descriptor *xt_io_descr = (struct extent_io_descriptor * )req->data;
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ off_t map_start = ALIGN_BYTES_FLOOR(xt_io_descr->pos);
+ size_t length = ALIGN_BYTES_CEILING(xt_io_descr->pos + xt_io_descr->bytes) - map_start;
+ unsigned real_io_size = xt_io_descr->bytes;
+
+ void *data = mmap(NULL, length, PROT_READ, MAP_SHARED, xt_io_descr->file, map_start);
+ if (likely(data != MAP_FAILED)) {
+ xt_io_descr->map_base = data;
+ xt_io_descr->map_length = length;
+ xt_io_descr->buf = data + (xt_io_descr->pos - map_start);
+ ctx->stats.io_read_bytes += real_io_size;
+ ctx->stats.io_read_extent_bytes += real_io_size;
+ }
+}
static void do_read_extent(struct rrdengine_worker_config* wc,
struct rrdeng_page_descr **descr,
@@ -435,8 +488,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
struct rrdengine_instance *ctx = wc->ctx;
struct page_cache_descr *pg_cache_descr;
int ret;
- unsigned i, size_bytes, pos, real_io_size;
-// uint32_t payload_length;
+ unsigned i, size_bytes, pos;
struct extent_io_descriptor *xt_io_descr;
struct rrdengine_datafile *datafile;
struct extent_info *extent = descr[0]->extent;
@@ -452,18 +504,17 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
rrdeng_page_descr_mutex_lock(ctx, descr[i]);
pg_cache_descr = descr[i]->pg_cache_descr;
pg_cache_descr->flags |= RRD_PAGE_READ_PENDING;
-// payload_length = descr[i]->page_length;
rrdeng_page_descr_mutex_unlock(ctx, descr[i]);
-
xt_io_descr->descr_array[i] = descr[i];
}
xt_io_descr->descr_count = count;
+ xt_io_descr->file = datafile->file;
xt_io_descr->bytes = size_bytes;
xt_io_descr->pos = pos;
- xt_io_descr->req.data = xt_io_descr;
+ xt_io_descr->req_worker.data = xt_io_descr;
xt_io_descr->completion = NULL;
- /* xt_io_descr->descr_commit_idx_array[0] */
xt_io_descr->release_descr = release_descr;
+ xt_io_descr->buf = NULL;
xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
if (xt_is_cached) {
@@ -483,19 +534,10 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
}
}
- ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
- if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
- /* freez(xt_io_descr);
- return;*/
- }
- real_io_size = ALIGN_BYTES_CEILING(size_bytes);
- xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
- ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb);
+ ret = uv_queue_work(wc->loop, &xt_io_descr->req_worker, do_mmap_read_extent, read_mmap_extent_cb);
fatal_assert(-1 != ret);
- ctx->stats.io_read_bytes += real_io_size;
+
++ctx->stats.io_read_requests;
- ctx->stats.io_read_extent_bytes += real_io_size;
++ctx->stats.io_read_extents;
ctx->stats.pg_cache_backfills += count;
}
@@ -696,7 +738,7 @@ void flush_pages_cb(uv_fs_t* req)
if (xt_io_descr->completion)
completion_mark_complete(xt_io_descr->completion);
uv_fs_req_cleanup(req);
- free(xt_io_descr->buf);
+ posix_memfree(xt_io_descr->buf);
freez(xt_io_descr);
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
@@ -820,8 +862,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
header->descr[i].type = descr->type;
uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
header->descr[i].page_length = descr->page_length;
- header->descr[i].start_time = descr->start_time;
- header->descr[i].end_time = descr->end_time;
+ header->descr[i].start_time_ut = descr->start_time_ut;
+ header->descr[i].end_time_ut = descr->end_time_ut;
pos += sizeof(header->descr[i]);
}
for (i = 0 ; i < count ; ++i) {
@@ -922,7 +964,6 @@ static void after_delete_old_data(struct rrdengine_worker_config* wc)
wc->now_deleting_files = NULL;
wc->cleanup_thread_deleting_files = 0;
- aclk_data_rotated();
rrdcontext_db_rotation();
/* interrupt event loop */
@@ -948,12 +989,12 @@ static void delete_old_data(void *arg)
for (i = 0 ; i < count ; ++i) {
descr = extent->pages[i];
can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id);
- if (unlikely(can_delete_metric && ctx->metalog_ctx->initialized)) {
+ if (unlikely(can_delete_metric)) {
/*
* If the metric is empty, has no active writers and if the metadata log has been initialized then
* attempt to delete the corresponding netdata dimension.
*/
- metalog_delete_dimension_by_uuid(ctx->metalog_ctx, &metric_id);
+ metaqueue_delete_dimension_uuid(&metric_id);
}
}
next = extent->next;
@@ -1044,7 +1085,70 @@ static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc)
/* return 0 on success */
int init_rrd_files(struct rrdengine_instance *ctx)
{
- return init_data_files(ctx);
+ int ret = init_data_files(ctx);
+
+ BUFFER *wb = buffer_create(1000);
+ size_t all_errors = 0;
+ usec_t now = now_realtime_usec();
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter) {
+ buffer_sprintf(wb, "%s%zu pages had start time > end time (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter) {
+ buffer_sprintf(wb, "%s%zu pages had start time = end time with more than 1 entries (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter) {
+ buffer_sprintf(wb, "%s%zu pages had zero points (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter) {
+ buffer_sprintf(wb, "%s%zu pages had update every == 0 with entries > 1 (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter) {
+ buffer_sprintf(wb, "%s%zu pages had a different number of points compared to their timestamps (latest: %llu secs ago; these page have been loaded)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter) {
+ buffer_sprintf(wb, "%s%zu extents have been dropped because they didn't have any valid pages"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter;
+ }
+
+ if(all_errors)
+ info("DBENGINE: tier %d: %s", ctx->tier, buffer_tostring(wb));
+
+ buffer_free(wb);
+ return ret;
}
void finalize_rrd_files(struct rrdengine_instance *ctx)
@@ -1139,10 +1243,6 @@ void timer_cb(uv_timer_t* handle)
uv_stop(handle->loop);
uv_update_time(handle->loop);
- if (unlikely(!ctx->metalog_ctx->initialized)) {
- worker_is_idle();
- return; /* Wait for the metadata log to initialize */
- }
rrdeng_test_quota(wc);
debug(D_RRDENGINE, "%s: timeout reached.", __func__);
if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) {
@@ -1329,7 +1429,7 @@ void rrdeng_worker(void* arg)
}
/* cleanup operations of the event loop */
- info("Shutting down RRD engine event loop.");
+ info("Shutting down RRD engine event loop for tier %d", ctx->tier);
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
@@ -1344,7 +1444,7 @@ void rrdeng_worker(void* arg)
wal_flush_transaction_buffer(wc);
uv_run(loop, UV_RUN_DEFAULT);
- info("Shutting down RRD engine event loop complete.");
+ info("Shutting down RRD engine event loop for tier %d complete", ctx->tier);
/* TODO: don't let the API block by waiting to enqueue commands */
uv_cond_destroy(&wc->cmd_cond);
/* uv_mutex_destroy(&wc->cmd_mutex); */
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 4b383b622..fedadbe86 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -17,7 +17,6 @@
#include "rrdenginelib.h"
#include "datafile.h"
#include "journalfile.h"
-#include "metadata_log/metadatalog.h"
#include "rrdengineapi.h"
#include "pagecache.h"
#include "rrdenglocking.h"
@@ -37,29 +36,25 @@ struct rrdengine_instance;
#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
struct rrdeng_collect_handle {
- struct rrdeng_metric_handle *metric_handle;
+ struct pg_cache_page_index *page_index;
struct rrdeng_page_descr *descr;
unsigned long page_correlation_id;
- struct rrdengine_instance *ctx;
// set to 1 when this dimension is not page aligned with the other dimensions in the chart
uint8_t unaligned_page;
};
struct rrdeng_query_handle {
- struct rrdeng_metric_handle *metric_handle;
struct rrdeng_page_descr *descr;
struct rrdengine_instance *ctx;
struct pg_cache_page_index *page_index;
- time_t next_page_time;
- time_t now;
+ time_t wanted_start_time_s;
+ time_t now_s;
unsigned position;
unsigned entries;
- TIER_QUERY_FETCH tier_query_fetch_type;
storage_number *page;
- usec_t page_end_time;
+ usec_t page_end_time_ut;
uint32_t page_length;
- usec_t dt;
- time_t dt_sec;
+ time_t dt_s;
};
typedef enum {
@@ -110,8 +105,12 @@ struct rrdeng_cmdqueue {
struct extent_io_descriptor {
uv_fs_t req;
+ uv_work_t req_worker;
uv_buf_t iov;
+ uv_file file;
void *buf;
+ void *map_base;
+ size_t map_length;
uint64_t pos;
unsigned bytes;
struct completion *completion;
@@ -230,8 +229,16 @@ extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of dele
#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */
#define QUIESCED (2) /* is set after all threads have finished running */
+typedef enum {
+ LOAD_ERRORS_PAGE_FLIPPED_TIME = 0,
+ LOAD_ERRORS_PAGE_EQUAL_TIME = 1,
+ LOAD_ERRORS_PAGE_ZERO_ENTRIES = 2,
+ LOAD_ERRORS_PAGE_UPDATE_ZERO = 3,
+ LOAD_ERRORS_PAGE_FLEXY_TIME = 4,
+ LOAD_ERRORS_DROPPED_EXTENT = 5,
+} INVALID_PAGE_ID;
+
struct rrdengine_instance {
- struct metalog_instance *metalog_ctx;
struct rrdengine_worker_config worker_config;
struct completion rrdengine_completion;
struct page_cache pg_cache;
@@ -254,16 +261,21 @@ struct rrdengine_instance {
uint8_t page_type; /* Default page type for this context */
struct rrdengine_statistics stats;
+
+ struct {
+ size_t counter;
+ usec_t latest_end_time_ut;
+ } load_errors[6];
};
-extern void *dbengine_page_alloc(void);
-extern void dbengine_page_free(void *page);
+void *dbengine_page_alloc(void);
+void dbengine_page_free(void *page);
-extern int init_rrd_files(struct rrdengine_instance *ctx);
-extern void finalize_rrd_files(struct rrdengine_instance *ctx);
-extern void rrdeng_test_quota(struct rrdengine_worker_config* wc);
-extern void rrdeng_worker(void* arg);
-extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
-extern struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc);
+int init_rrd_files(struct rrdengine_instance *ctx);
+void finalize_rrd_files(struct rrdengine_instance *ctx);
+void rrdeng_test_quota(struct rrdengine_worker_config* wc);
+void rrdeng_worker(void* arg);
+void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
+struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc);
#endif /* NETDATA_RRDENGINE_H */
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index f4da29407..27503baee 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
+#include "../storage_engine.h"
/* Default global database instance */
struct rrdengine_instance multidb_ctx_storage_tier0;
@@ -35,14 +36,31 @@ int default_multidb_disk_quota_mb = 256;
/* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */
uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1;
-static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host, int tier) {
- if(tier < 0 || tier >= RRD_STORAGE_TIERS) tier = 0;
- if(!host->storage_instance[tier]) tier = 0;
- return (struct rrdengine_instance *)host->storage_instance[tier];
+// ----------------------------------------------------------------------------
+// metrics groups
+
+STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) {
+ return callocz(1, sizeof(struct pg_alignment));
+}
+
+void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) {
+ if(!smg) return;
+
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ struct pg_alignment *pa = (struct pg_alignment *)smg;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ if(pa->refcount == 0)
+ freez(pa);
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
}
+// ----------------------------------------------------------------------------
+// metric handle for legacy dbs
+
/* This UUID is not unique across hosts */
-void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid)
+void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid)
{
EVP_MD_CTX *evpctx;
unsigned char hash_value[EVP_MAX_MD_SIZE];
@@ -75,98 +93,136 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu
memcpy(ret_uuid, hash_value, sizeof(uuid_t));
}
-struct rrdeng_metric_handle {
- RRDDIM *rd;
- struct rrdengine_instance *ctx;
- uuid_t *rrdeng_uuid; // database engine metric UUID
- struct pg_cache_page_index *page_index;
-};
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg) {
+ uuid_t legacy_uuid;
+ rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid);
+ return rrdeng_metric_get(db_instance, &legacy_uuid, smg);
+}
-void rrdeng_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle) {
- freez(db_metric_handle);
+// ----------------------------------------------------------------------------
+// metric handle
+
+void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) {
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+
+ unsigned short refcount = __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
+ if(refcount == 0 && page_index->alignment) {
+ __atomic_sub_fetch(&page_index->alignment->refcount, 1, __ATOMIC_SEQ_CST);
+ page_index->alignment = NULL;
+ }
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance) {
+STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) {
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
+ return db_metric_handle;
+}
+
+STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) {
struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
- struct page_cache *pg_cache;
- uuid_t legacy_uuid;
- uuid_t multihost_legacy_uuid;
- Pvoid_t *PValue;
+ struct pg_alignment *pa = (struct pg_alignment *)smg;
+ struct page_cache *pg_cache = &ctx->pg_cache;
struct pg_cache_page_index *page_index = NULL;
- int is_multihost_child = 0;
- RRDHOST *host = rd->rrdset->rrdhost;
-
- pg_cache = &ctx->pg_cache;
-
- rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid);
- if (host != localhost && is_storage_engine_shared((STORAGE_INSTANCE *)ctx))
- is_multihost_child = 1;
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &legacy_uuid, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
+ Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t));
+ if (likely(NULL != PValue))
page_index = *PValue;
- }
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (is_multihost_child || NULL == PValue) {
- /* First time we see the legacy UUID or metric belongs to child host in multi-host DB.
- * Drop legacy support, normal path */
-
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t), PJE0);
- fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
- *PValue = page_index = create_page_index(&rd->metric_uuid);
- page_index->prev = pg_cache->metrics_index.last_page_index;
- pg_cache->metrics_index.last_page_index = page_index;
- uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+
+ if (likely(page_index)) {
+ __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
+
+ if(pa) {
+ if(page_index->alignment && page_index->alignment != pa && page_index->writers > 0)
+ fatal("DBENGINE: page_index has a different alignment (page_index refcount is %u, writers is %u).",
+ page_index->refcount, page_index->writers);
+
+ page_index->alignment = pa;
+ __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST);
}
- } else {
- /* There are legacy UUIDs in the database, implement backward compatibility */
+ }
- rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid,
- &multihost_legacy_uuid);
+ return (STORAGE_METRIC_HANDLE *)page_index;
+}
+
+STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) {
+ internal_fatal(!db_instance, "DBENGINE: db_instance is NULL");
- int need_to_store = uuid_compare(rd->metric_uuid, multihost_legacy_uuid);
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ struct pg_alignment *pa = (struct pg_alignment *)smg;
+ struct pg_cache_page_index *page_index;
+ struct page_cache *pg_cache = &ctx->pg_cache;
- uuid_copy(rd->metric_uuid, multihost_legacy_uuid);
+ uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
+ Pvoid_t *PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t), PJE0);
+ fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ *PValue = page_index = create_page_index(uuid, ctx);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
+ page_index->alignment = pa;
+ page_index->refcount = 1;
+ if(pa)
+ pa->refcount++;
+ uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+
+ return (STORAGE_METRIC_HANDLE *)page_index;
+}
+
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) {
+ STORAGE_METRIC_HANDLE *db_metric_handle;
+
+ db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid, smg);
+ if(!db_metric_handle) {
+ db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset), smg);
+ if(db_metric_handle) {
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ uuid_copy(rd->metric_uuid, page_index->id);
+ }
+ }
+ if(!db_metric_handle)
+ db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid, smg);
- if (unlikely(need_to_store && !ctx->tier))
- (void)sql_store_dimension(&rd->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor,
- rd->algorithm);
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ if(uuid_compare(rd->metric_uuid, page_index->id) != 0) {
+ char uuid1[UUID_STR_LEN + 1];
+ char uuid2[UUID_STR_LEN + 1];
+
+ uuid_unparse(rd->metric_uuid, uuid1);
+ uuid_unparse(page_index->id, uuid2);
+ fatal("DBENGINE: uuids do not match, asked for metric '%s', but got page_index of metric '%s'", uuid1, uuid2);
}
- struct rrdeng_metric_handle *mh = mallocz(sizeof(struct rrdeng_metric_handle));
- mh->rd = rd;
- mh->ctx = ctx;
- mh->rrdeng_uuid = &page_index->id;
- mh->page_index = page_index;
- return (STORAGE_METRIC_HANDLE *)mh;
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ if(page_index->ctx != ctx)
+ fatal("DBENGINE: mixed up rrdengine instances, asked for metric from %p, got from %p", ctx, page_index->ctx);
+#endif
+
+ return db_metric_handle;
}
+
+// ----------------------------------------------------------------------------
+// collect ops
+
/*
* Gets a handle for storing metrics to the database.
* The handle must be released with rrdeng_store_metric_final().
*/
-STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle) {
- struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle;
-
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every) {
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
struct rrdeng_collect_handle *handle;
- struct pg_cache_page_index *page_index;
+
+ if(!page_index->alignment)
+ fatal("DBENGINE: metric group is required for collect operations");
handle = callocz(1, sizeof(struct rrdeng_collect_handle));
- handle->metric_handle = metric_handle;
- handle->ctx = metric_handle->ctx;
+ handle->page_index = page_index;
handle->descr = NULL;
handle->unaligned_page = 0;
+ page_index->latest_update_every_s = update_every;
- page_index = metric_handle->page_index;
uv_rwlock_wrlock(&page_index->lock);
++page_index->writers;
uv_rwlock_wrunlock(&page_index->lock);
@@ -214,7 +270,7 @@ static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) {
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
// struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle;
- struct rrdengine_instance *ctx = handle->ctx;
+ struct rrdengine_instance *ctx = handle->page_index->ctx;
struct rrdeng_page_descr *descr = handle->descr;
if (unlikely(!ctx)) return;
@@ -227,9 +283,7 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h
page_is_empty = page_has_only_empty_metrics(descr);
if (page_is_empty) {
- debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ print_page_cache_descr(descr, "Page has empty metrics only, deleting", true);
pg_cache_put(ctx, descr);
pg_cache_punch_hole(ctx, descr, 1, 0, NULL);
} else
@@ -242,8 +296,8 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h
handle->descr = NULL;
}
-void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
- usec_t point_in_time,
+static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection_handle,
+ usec_t point_in_time_ut,
NETDATA_DOUBLE n,
NETDATA_DOUBLE min_value,
NETDATA_DOUBLE max_value,
@@ -252,11 +306,10 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
SN_FLAGS flags)
{
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
- struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle;
- struct rrdengine_instance *ctx = handle->ctx;
+ struct pg_cache_page_index *page_index = handle->page_index;
+ struct rrdengine_instance *ctx = handle->page_index->ctx;
struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr = handle->descr;
- RRDDIM *rd = metric_handle->rd;
void *page;
uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
@@ -264,21 +317,33 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
if (descr) {
/* Make alignment decisions */
- if (descr->page_length == rd->rrdset->rrddim_page_alignment) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(descr->end_time_ut + page_index->latest_update_every_s * USEC_PER_SEC != point_in_time_ut) {
+ char buffer[200 + 1];
+ snprintfz(buffer, 200,
+ "metrics collected are %s, end_time_ut = %llu, point_in_time_ut = %llu, update_every = %u, delta = %llu",
+ (point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC > page_index->latest_update_every_s)?"far apart":"not aligned",
+ descr->end_time_ut / USEC_PER_SEC,
+ point_in_time_ut / USEC_PER_SEC,
+ page_index->latest_update_every_s,
+ point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC);
+ print_page_cache_descr(descr, buffer, false);
+ }
+#endif
+
+ if (descr->page_length == page_index->alignment->page_length) {
/* this is the leading dimension that defines chart alignment */
perfect_page_alignment = 1;
}
/* is the metric far enough out of alignment with the others? */
- if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < rd->rrdset->rrddim_page_alignment)) {
+ if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < page_index->alignment->page_length)) {
handle->unaligned_page = 1;
- debug(D_RRDENGINE, "Metric page is not aligned with chart:");
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ print_page_cache_descr(descr, "Metric page is not aligned with chart", true);
}
if (unlikely(handle->unaligned_page &&
/* did the other metrics change page? */
- rd->rrdset->rrddim_page_alignment <= PAGE_POINT_SIZE_BYTES(descr))) {
- debug(D_RRDENGINE, "Flushing unaligned metric page.");
+ page_index->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) {
+ print_page_cache_descr(descr, "must_flush_unaligned_page = 1", true);
must_flush_unaligned_page = 1;
handle->unaligned_page = 0;
}
@@ -286,16 +351,21 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
if (unlikely(NULL == descr ||
descr->page_length + PAGE_POINT_SIZE_BYTES(descr) > RRDENG_BLOCK_SIZE ||
must_flush_unaligned_page)) {
- rrdeng_store_metric_flush_current_page(collection_handle);
- page = rrdeng_create_page(ctx, &metric_handle->page_index->id, &descr);
+ if(descr) {
+ print_page_cache_descr(descr, "flushing metric", true);
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+
+ page = rrdeng_create_page(ctx, &page_index->id, &descr);
fatal_assert(page);
+ descr->update_every_s = page_index->latest_update_every_s;
handle->descr = descr;
handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1);
- if (0 == rd->rrdset->rrddim_page_alignment) {
+ if (0 == page_index->alignment->page_length) {
/* this is the leading dimension that defines chart alignment */
perfect_page_alignment = 1;
}
@@ -330,13 +400,13 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
break;
}
- pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + PAGE_POINT_SIZE_BYTES(descr));
+ pg_cache_atomic_set_pg_info(descr, point_in_time_ut, descr->page_length + PAGE_POINT_SIZE_BYTES(descr));
if (perfect_page_alignment)
- rd->rrdset->rrddim_page_alignment = descr->page_length;
- if (unlikely(INVALID_TIME == descr->start_time)) {
+ page_index->alignment->page_length = descr->page_length;
+ if (unlikely(INVALID_TIME == descr->start_time_ut)) {
unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers;
- descr->start_time = point_in_time;
+ descr->start_time_ut = point_in_time_ut;
new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1);
while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) {
@@ -350,20 +420,111 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
}
}
- pg_cache_insert(ctx, metric_handle->page_index, descr);
+ pg_cache_insert(ctx, page_index, descr);
} else {
- pg_cache_add_new_metric_time(metric_handle->page_index, descr);
+ pg_cache_add_new_metric_time(page_index, descr);
}
+
+// {
+// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 };
+// if(uuid_compare(u, page_index->id) == 0) {
+// char buffer[100];
+// snprintfz(buffer, 100, "store system.cpu, collect:%u, page_index first:%u, last:%u",
+// (uint32_t)(point_in_time / USEC_PER_SEC),
+// (uint32_t)(page_index->oldest_time / USEC_PER_SEC),
+// (uint32_t)(page_index->latest_time / USEC_PER_SEC));
+//
+// print_page_cache_descr(descr, buffer, false);
+// }
+// }
}
+void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
+ usec_t point_in_time_ut,
+ NETDATA_DOUBLE n,
+ NETDATA_DOUBLE min_value,
+ NETDATA_DOUBLE max_value,
+ uint16_t count,
+ uint16_t anomaly_count,
+ SN_FLAGS flags)
+{
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ struct pg_cache_page_index *page_index = handle->page_index;
+ struct rrdeng_page_descr *descr = handle->descr;
+
+ if(likely(descr)) {
+ usec_t last_point_in_time_ut = descr->end_time_ut;
+ usec_t update_every_ut = page_index->latest_update_every_s * USEC_PER_SEC;
+ size_t points_gap = (point_in_time_ut <= last_point_in_time_ut) ?
+ (size_t)0 :
+ (size_t)((point_in_time_ut - last_point_in_time_ut) / update_every_ut);
+
+ if(unlikely(points_gap != 1)) {
+ if (unlikely(points_gap <= 0)) {
+ time_t now = now_realtime_sec();
+ static __thread size_t counter = 0;
+ static __thread time_t last_time_logged = 0;
+ counter++;
+
+ if(now - last_time_logged > 600) {
+ error("DBENGINE: collected point is in the past (repeated %zu times in the last %zu secs). Ignoring these data collection points.",
+ counter, (size_t)(last_time_logged?(now - last_time_logged):0));
+
+ last_time_logged = now;
+ counter = 0;
+ }
+ return;
+ }
+
+ size_t point_size = PAGE_POINT_SIZE_BYTES(descr);
+ size_t page_size_in_points = RRDENG_BLOCK_SIZE / point_size;
+ size_t used_points = descr->page_length / point_size;
+ size_t remaining_points_in_page = page_size_in_points - used_points;
+
+ bool new_point_is_aligned = true;
+ if(unlikely((point_in_time_ut - last_point_in_time_ut) / points_gap != update_every_ut))
+ new_point_is_aligned = false;
+
+ if(unlikely(points_gap > remaining_points_in_page || !new_point_is_aligned)) {
+// char buffer[200];
+// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Cutting page.",
+// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s);
+// print_page_cache_descr(descr, buffer, false);
+
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+ else {
+// char buffer[200];
+// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Filling the gap.",
+// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s);
+// print_page_cache_descr(descr, buffer, false);
+
+ // loop to fill the gap
+ usec_t step_ut = page_index->latest_update_every_s * USEC_PER_SEC;
+ usec_t last_point_filled_ut = last_point_in_time_ut + step_ut;
+
+ while (last_point_filled_ut < point_in_time_ut) {
+ rrdeng_store_metric_next_internal(
+ collection_handle, last_point_filled_ut, NAN, NAN, NAN,
+ 1, 0, SN_EMPTY_SLOT);
+
+ last_point_filled_ut += step_ut;
+ }
+ }
+ }
+ }
+
+ rrdeng_store_metric_next_internal(collection_handle, point_in_time_ut, n, min_value, max_value, count, anomaly_count, flags);
+}
+
+
/*
* Releases the database reference from the handle for storing metrics.
* Returns 1 if it's safe to delete the dimension.
*/
int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
- struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle;
- struct pg_cache_page_index *page_index = metric_handle->page_index;
+ struct pg_cache_page_index *page_index = handle->page_index;
uint8_t can_delete_metric = 0;
@@ -378,6 +539,18 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
return can_delete_metric;
}
+void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) {
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ struct pg_cache_page_index *page_index = handle->page_index;
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ uv_rwlock_rdlock(&page_index->lock);
+ page_index->latest_update_every_s = update_every;
+ uv_rwlock_rdunlock(&page_index->lock);
+}
+
+// ----------------------------------------------------------------------------
+// query ops
+
//static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info)
//{
// return (uint32_t *)&page_info->scratch[0];
@@ -392,49 +565,45 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
* Gets a handle for loading metrics from the database.
* The handle must be released with rrdeng_load_metric_final().
*/
-void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type)
+void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrdimm_handle, time_t start_time_s, time_t end_time_s)
{
- struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle;
- struct rrdengine_instance *ctx = metric_handle->ctx;
- RRDDIM *rd = metric_handle->rd;
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ struct rrdengine_instance *ctx = page_index->ctx;
// fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time);
struct rrdeng_query_handle *handle;
unsigned pages_nr;
- rrdimm_handle->start_time = start_time;
- rrdimm_handle->end_time = end_time;
+ if(!page_index->latest_update_every_s)
+ page_index->latest_update_every_s = default_rrd_update_every;
+
+ rrdimm_handle->start_time_s = start_time_s;
+ rrdimm_handle->end_time_s = end_time_s;
handle = callocz(1, sizeof(struct rrdeng_query_handle));
- handle->next_page_time = start_time;
- handle->now = start_time;
- handle->tier_query_fetch_type = tier_query_fetch_type;
- // TODO we should store the dt of each page in each page
- // this will produce wrong values for dt in case the user changes
- // the update every of the charts or the tier grouping iterations
- handle->dt_sec = get_tier_grouping(ctx->tier) * (time_t)rd->update_every;
- handle->dt = handle->dt_sec * USEC_PER_SEC;
+ handle->wanted_start_time_s = start_time_s;
+ handle->now_s = start_time_s;
handle->position = 0;
handle->ctx = ctx;
- handle->metric_handle = metric_handle;
handle->descr = NULL;
+ handle->dt_s = page_index->latest_update_every_s;
rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle;
- pages_nr = pg_cache_preload(ctx, metric_handle->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
+ pages_nr = pg_cache_preload(ctx, &page_index->id, start_time_s * USEC_PER_SEC, end_time_s * USEC_PER_SEC,
NULL, &handle->page_index);
if (unlikely(NULL == handle->page_index || 0 == pages_nr))
// there are no metrics to load
- handle->next_page_time = INVALID_TIME;
+ handle->wanted_start_time_s = INVALID_TIME;
}
-static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) {
+static int rrdeng_load_page_next(struct storage_engine_query_handle *rrdimm_handle, bool debug_this __maybe_unused) {
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
struct rrdengine_instance *ctx = handle->ctx;
struct rrdeng_page_descr *descr = handle->descr;
uint32_t page_length;
- usec_t page_end_time;
+ usec_t page_end_time_ut;
unsigned position;
if (likely(descr)) {
@@ -446,14 +615,15 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) {
pg_cache_put(ctx, descr);
handle->descr = NULL;
- handle->next_page_time = (handle->page_end_time / USEC_PER_SEC) + 1;
+ handle->wanted_start_time_s = (time_t)((handle->page_end_time_ut / USEC_PER_SEC) + handle->dt_s);
- if (unlikely(handle->next_page_time > rrdimm_handle->end_time))
+ if (unlikely(handle->wanted_start_time_s > rrdimm_handle->end_time_s))
return 1;
}
- usec_t next_page_time = handle->next_page_time * USEC_PER_SEC;
- descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
+ usec_t wanted_start_time_ut = handle->wanted_start_time_s * USEC_PER_SEC;
+ descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
+ wanted_start_time_ut, rrdimm_handle->end_time_s * USEC_PER_SEC);
if (NULL == descr)
return 1;
@@ -462,77 +632,116 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) {
#endif
handle->descr = descr;
- pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
- if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == page_end_time))
+ pg_cache_atomic_get_pg_info(descr, &page_end_time_ut, &page_length);
+ if (unlikely(INVALID_TIME == descr->start_time_ut || INVALID_TIME == page_end_time_ut || 0 == descr->update_every_s)) {
+ error("DBENGINE: discarding invalid page descriptor (start_time = %llu, end_time = %llu, update_every_s = %d)",
+ descr->start_time_ut, page_end_time_ut, descr->update_every_s);
return 1;
+ }
- if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) {
+ if (unlikely(descr->start_time_ut != page_end_time_ut && wanted_start_time_ut > descr->start_time_ut)) {
// we're in the middle of the page somewhere
unsigned entries = page_length / PAGE_POINT_SIZE_BYTES(descr);
- position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) /
- (page_end_time - descr->start_time);
+ position = ((uint64_t)(wanted_start_time_ut - descr->start_time_ut)) * (entries - 1) /
+ (page_end_time_ut - descr->start_time_ut);
}
else
position = 0;
- handle->page_end_time = page_end_time;
+ handle->page_end_time_ut = page_end_time_ut;
handle->page_length = page_length;
+ handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr);
handle->page = descr->pg_cache_descr->page;
- usec_t entries = handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr);
- if (likely(entries > 1))
- handle->dt = (page_end_time - descr->start_time) / (entries - 1);
- else {
- // TODO we should store the dt of each page in each page
- // now we keep the dt of whatever was before
- ;
- }
-
- handle->dt_sec = (time_t)(handle->dt / USEC_PER_SEC);
+ handle->dt_s = descr->update_every_s;
handle->position = position;
+// if(debug_this)
+// info("DBENGINE: rrdeng_load_page_next(), "
+// "position:%d, "
+// "start_time_ut:%llu, "
+// "page_end_time_ut:%llu, "
+// "next_page_time_ut:%llu, "
+// "in_out:%s"
+// , position
+// , descr->start_time_ut
+// , page_end_time_ut
+// ,
+// wanted_start_time_ut, in_out?"true":"false"
+// );
+
return 0;
}
// Returns the metric and sets its timestamp into current_time
// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
-STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) {
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
// struct rrdeng_metric_handle *metric_handle = handle->metric_handle;
- STORAGE_POINT sp;
struct rrdeng_page_descr *descr = handle->descr;
+ time_t now = handle->now_s + handle->dt_s;
+
+// bool debug_this = false;
+// {
+// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 };
+// if(uuid_compare(u, handle->page_index->id) == 0) {
+// char buffer[100];
+// snprintfz(buffer, 100, "load system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u",
+// (uint32_t)(now),
+// (uint32_t)(handle->dt_s),
+// (uint32_t)(handle->position),
+// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC),
+// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC));
+//
+// print_page_cache_descr(descr, buffer, false);
+// debug_this = true;
+// }
+// }
+
+ STORAGE_POINT sp;
unsigned position = handle->position + 1;
- time_t now = handle->now + handle->dt_sec;
storage_number_tier1_t tier1_value;
- if (unlikely(INVALID_TIME == handle->next_page_time)) {
- handle->next_page_time = INVALID_TIME;
- handle->now = now;
- storage_point_empty(sp, now - handle->dt_sec, now);
+ if (unlikely(INVALID_TIME == handle->wanted_start_time_s)) {
+ handle->wanted_start_time_s = INVALID_TIME;
+ handle->now_s = now;
+ storage_point_empty(sp, now - handle->dt_s, now);
return sp;
}
if (unlikely(!descr || position >= handle->entries)) {
// We need to get a new page
- if(rrdeng_load_page_next(rrdimm_handle)) {
+ if(rrdeng_load_page_next(rrddim_handle, false)) {
// next calls will not load any more metrics
- handle->next_page_time = INVALID_TIME;
- handle->now = now;
- storage_point_empty(sp, now - handle->dt_sec, now);
+ handle->wanted_start_time_s = INVALID_TIME;
+ handle->now_s = now;
+ storage_point_empty(sp, now - handle->dt_s, now);
return sp;
}
descr = handle->descr;
position = handle->position;
- now = (time_t)((descr->start_time + position * handle->dt) / USEC_PER_SEC);
+ now = (time_t)((descr->start_time_ut / USEC_PER_SEC) + position * descr->update_every_s);
+
+// if(debug_this) {
+// char buffer[100];
+// snprintfz(buffer, 100, "NEW PAGE system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u",
+// (uint32_t)(now),
+// (uint32_t)(handle->dt_s),
+// (uint32_t)(handle->position),
+// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC),
+// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC));
+//
+// print_page_cache_descr(descr, buffer, false);
+// }
}
- sp.start_time = now - handle->dt_sec;
+ sp.start_time = now - handle->dt_s;
sp.end_time = now;
handle->position = position;
- handle->now = now;
+ handle->now_s = now;
switch(descr->type) {
case PAGE_METRICS: {
@@ -567,24 +776,32 @@ STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle)
break;
}
- if (unlikely(now >= rrdimm_handle->end_time)) {
+ if (unlikely(now >= rrddim_handle->end_time_s)) {
// next calls will not load any more metrics
- handle->next_page_time = INVALID_TIME;
+ handle->wanted_start_time_s = INVALID_TIME;
}
+// if(debug_this)
+// info("DBENGINE: returning point: "
+// "time from %ld to %ld // query from %ld to %ld // wanted_start_time_s %ld"
+// , sp.start_time, sp.end_time
+// , rrddim_handle->start_time_s, rrddim_handle->end_time_s
+// , handle->wanted_start_time_s
+// );
+
return sp;
}
-int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
+int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrdimm_handle)
{
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
- return (INVALID_TIME == handle->next_page_time);
+ return (INVALID_TIME == handle->wanted_start_time_s);
}
/*
* Releases the database reference from the handle for loading metrics.
*/
-void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
+void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrdimm_handle)
{
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
struct rrdengine_instance *ctx = handle->ctx;
@@ -603,46 +820,12 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
}
time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
- struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle;
-
- struct pg_cache_page_index *page_index = metric_handle->page_index;
- return page_index->latest_time / USEC_PER_SEC;
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ return (time_t)(page_index->latest_time_ut / USEC_PER_SEC);
}
time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
- struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle;
-
- struct pg_cache_page_index *page_index = metric_handle->page_index;
- return page_index->oldest_time / USEC_PER_SEC;
-}
-
-int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t, int tier)
-{
- struct page_cache *pg_cache;
- struct rrdengine_instance *ctx;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
-
- ctx = get_rrdeng_ctx_from_host(localhost, tier);
- if (unlikely(!ctx)) {
- error("Failed to fetch multidb context");
- return 1;
- }
- pg_cache = &ctx->pg_cache;
-
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
-
- if (likely(page_index)) {
- *first_entry_t = page_index->oldest_time / USEC_PER_SEC;
- *last_entry_t = page_index->latest_time / USEC_PER_SEC;
- return 0;
- }
-
- return 1;
+ struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
+ return (time_t)(page_index->oldest_time_ut / USEC_PER_SEC);
}
int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t)
@@ -667,8 +850,8 @@ int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
if (likely(page_index)) {
- *first_entry_t = page_index->oldest_time / USEC_PER_SEC;
- *last_entry_t = page_index->latest_time / USEC_PER_SEC;
+ *first_entry_t = page_index->oldest_time_ut / USEC_PER_SEC;
+ *last_entry_t = page_index->latest_time_ut / USEC_PER_SEC;
return 0;
}
@@ -695,7 +878,7 @@ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrde
debug(D_RRDENGINE, "Created new page:");
if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ print_page_cache_descr(descr, "", true);
rrdeng_page_descr_mutex_unlock(ctx, descr);
*ret_descr = descr;
return page;
@@ -767,13 +950,13 @@ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **
}
/* Gets a reference for the page */
-void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle)
+void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time_ut, void **handle)
{
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
debug(D_RRDENGINE, "Reading existing page:");
- descr = pg_cache_lookup(ctx, NULL, id, point_in_time);
+ descr = pg_cache_lookup(ctx, NULL, id, point_in_time_ut);
if (NULL == descr) {
*handle = NULL;
@@ -849,7 +1032,7 @@ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
* Returns 0 on success, negative on error
*/
int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
- unsigned disk_space_mb, int tier) {
+ unsigned disk_space_mb, size_t tier) {
struct rrdengine_instance *ctx;
int error;
uint32_t max_open_files;
@@ -897,7 +1080,6 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p
ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure;
ctx->metric_API_max_producers = 0;
ctx->quiesce = NO_QUIESCE;
- ctx->metalog_ctx = NULL; /* only set this after the metadata log has finished initializing */
ctx->host = host;
memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
@@ -918,11 +1100,11 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p
if (ctx->worker_config.error) {
goto error_after_rrdeng_worker;
}
- error = metalog_init(ctx);
- if (error) {
- error("Failed to initialize metadata log file event loop.");
- goto error_after_rrdeng_worker;
- }
+// error = metalog_init(ctx);
+// if (error) {
+// error("Failed to initialize metadata log file event loop.");
+// goto error_after_rrdeng_worker;
+// }
return 0;
@@ -1010,13 +1192,13 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) {
size_t points = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
if(likely(points > 1))
- update_every_usec = (descr->end_time - descr->start_time) / (points - 1);
+ update_every_usec = (descr->end_time_ut - descr->start_time_ut) / (points - 1);
else {
update_every_usec = default_rrd_update_every * get_tier_grouping(ctx->tier) * USEC_PER_SEC;
stats.single_point_pages++;
}
- time_t duration_secs = (time_t)((descr->end_time - descr->start_time + update_every_usec)/USEC_PER_SEC);
+ time_t duration_secs = (time_t)((descr->end_time_ut - descr->start_time_ut + update_every_usec)/USEC_PER_SEC);
stats.extents_pages++;
stats.pages_uncompressed_bytes += descr->page_length;
@@ -1028,11 +1210,11 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) {
stats.page_types[descr->type].pages_duration_secs += duration_secs;
stats.page_types[descr->type].points += points;
- if(!stats.first_t || (descr->start_time - update_every_usec) < stats.first_t)
- stats.first_t = (descr->start_time - update_every_usec) / USEC_PER_SEC;
+ if(!stats.first_t || (descr->start_time_ut - update_every_usec) < stats.first_t)
+ stats.first_t = (descr->start_time_ut - update_every_usec) / USEC_PER_SEC;
- if(!stats.last_t || descr->end_time > stats.last_t)
- stats.last_t = descr->end_time / USEC_PER_SEC;
+ if(!stats.last_t || descr->end_time_ut > stats.last_t)
+ stats.last_t = descr->end_time_ut / USEC_PER_SEC;
}
}
}
@@ -1072,7 +1254,7 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) {
}
}
- stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index));
+ stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index) + sizeof(struct pg_alignment));
stats.sizeof_page = struct_natural_alignment(sizeof(struct rrdeng_page_descr));
stats.sizeof_datafile = struct_natural_alignment(sizeof(struct rrdengine_datafile)) + struct_natural_alignment(sizeof(struct rrdengine_journalfile));
stats.sizeof_page_in_cache = struct_natural_alignment(sizeof(struct page_cache_descr));
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index 509aa48ca..85375044f 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -25,58 +25,63 @@ extern size_t page_type_size[];
#define PAGE_POINT_SIZE_BYTES(x) page_type_size[(x)->type]
struct rrdeng_region_info {
- time_t start_time;
+ time_t start_time_s;
int update_every;
unsigned points;
};
-extern void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr);
-extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
+void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr);
+void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
Word_t page_correlation_id);
-extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle);
-extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle);
-extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);
+void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle);
+void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time_ut, void **handle);
+void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);
-extern void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid);
-extern void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid,
+void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid);
+void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid,
uuid_t *ret_uuid);
-extern STORAGE_METRIC_HANDLE *rrdeng_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance);
-extern void rrdeng_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle);
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
+STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg);
+STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg);
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg);
+void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle);
+STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle);
-extern STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle);
-extern void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle);
-extern void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE n,
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every);
+void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle);
+void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
+void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE n,
NETDATA_DOUBLE min_value,
NETDATA_DOUBLE max_value,
uint16_t count,
uint16_t anomaly_count,
SN_FLAGS flags);
-extern int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle);
+int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle);
-extern unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time,
- struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list);
+void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrdimm_handle,
+ time_t start_time_s, time_t end_time_s);
+STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle);
-extern void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *rrdimm_handle,
- time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type);
-extern STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle);
-extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle);
-extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
-extern time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
-extern time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
+int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrdimm_handle);
+void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrdimm_handle);
+time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
+time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
-extern void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
+void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
/* must call once before using anything */
-extern int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
- unsigned disk_space_mb, int tier);
+int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
+ unsigned disk_space_mb, size_t tier);
-extern int rrdeng_exit(struct rrdengine_instance *ctx);
-extern void rrdeng_prepare_exit(struct rrdengine_instance *ctx);
-extern int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t, int tier);
-extern int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t);
+int rrdeng_exit(struct rrdengine_instance *ctx);
+void rrdeng_prepare_exit(struct rrdengine_instance *ctx);
+int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t);
+
+extern STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
+extern void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
typedef struct rrdengine_size_statistics {
size_t default_granularity_secs;
@@ -134,6 +139,6 @@ typedef struct rrdengine_size_statistics {
double average_page_size_bytes;
} RRDENG_SIZE_STATS;
-extern RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx);
+RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx);
#endif /* NETDATA_RRDENGINEAPI_H */
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
index 287b86be8..58bd9c437 100644
--- a/database/engine/rrdenginelib.c
+++ b/database/engine/rrdenginelib.c
@@ -4,28 +4,45 @@
#define BUFSIZE (512)
/* Caller must hold descriptor lock */
-void print_page_cache_descr(struct rrdeng_page_descr *descr)
+void print_page_cache_descr(struct rrdeng_page_descr *descr, const char *msg, bool log_debug)
{
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- char uuid_str[UUID_STR_LEN];
- char str[BUFSIZE + 1];
- int pos = 0;
+ if(log_debug && !(debug_flags & D_RRDENGINE))
+ return;
- uuid_unparse_lower(*descr->id, uuid_str);
- pos += snprintfz(str, BUFSIZE - pos, "page(%p) id=%s\n"
- "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
- pg_cache_descr->page, uuid_str,
- descr->page_length,
- (uint64_t)descr->start_time,
- (uint64_t)descr->end_time);
- if (!descr->extent) {
- pos += snprintfz(str + pos, BUFSIZE - pos, "N/A");
- } else {
- pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset);
+ BUFFER *wb = buffer_create(512);
+
+ if(!descr) {
+ buffer_sprintf(wb, "DBENGINE: %s : descr is NULL", msg);
}
+ else {
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ char uuid_str[UUID_STR_LEN];
+
+ uuid_unparse_lower(*descr->id, uuid_str);
+ buffer_sprintf(wb, "DBENGINE: %s : page(%p) metric:%s, len:%"PRIu32", time:%"PRIu64"->%"PRIu64", update_every:%u, type:%u, xt_offset:",
+ msg,
+ pg_cache_descr->page, uuid_str,
+ descr->page_length,
+ (uint64_t)descr->start_time_ut,
+ (uint64_t)descr->end_time_ut,
+ (uint32_t)descr->update_every_s,
+ (uint32_t)descr->type
+ );
+ if (!descr->extent) {
+ buffer_strcat(wb, "N/A");
+ } else {
+ buffer_sprintf(wb, "%"PRIu64, descr->extent->offset);
+ }
+
+ buffer_sprintf(wb, ", flags:0x%2.2lX refcnt:%u", pg_cache_descr->flags, pg_cache_descr->refcnt);
+ }
+
+ if(log_debug)
+ debug(D_RRDENGINE, "%s", buffer_tostring(wb));
+ else
+ internal_error(true, "%s", buffer_tostring(wb));
- snprintfz(str + pos, BUFSIZE - pos, " flags:0x%2.2lX refcnt:%u\n\n", pg_cache_descr->flags, pg_cache_descr->refcnt);
- debug(D_RRDENGINE, "%s", str);
+ buffer_free(wb);
}
void print_page_descr(struct rrdeng_page_descr *descr)
@@ -39,8 +56,8 @@ void print_page_descr(struct rrdeng_page_descr *descr)
"--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:",
uuid_str,
descr->page_length,
- (uint64_t)descr->start_time,
- (uint64_t)descr->end_time);
+ (uint64_t)descr->start_time_ut,
+ (uint64_t)descr->end_time_ut);
if (!descr->extent) {
pos += snprintfz(str + pos, BUFSIZE - pos, "N/A");
} else {
diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h
index 32eebf103..6b1a15fb1 100644
--- a/database/engine/rrdenginelib.h
+++ b/database/engine/rrdenginelib.h
@@ -83,10 +83,10 @@ static inline void crc32set(void *crcp, uLong crc)
*(uint32_t *)crcp = crc;
}
-extern void print_page_cache_descr(struct rrdeng_page_descr *page_cache_descr);
-extern void print_page_descr(struct rrdeng_page_descr *descr);
-extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size);
-extern int open_file_for_io(char *path, int flags, uv_file *file, int direct);
+void print_page_cache_descr(struct rrdeng_page_descr *descr, const char *msg, bool log_debug);
+void print_page_descr(struct rrdeng_page_descr *descr);
+int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size);
+int open_file_for_io(char *path, int flags, uv_file *file, int direct);
static inline int open_file_direct_io(char *path, int flags, uv_file *file)
{
return open_file_for_io(path, flags, file, 1);
@@ -95,8 +95,8 @@ static inline int open_file_buffered_io(char *path, int flags, uv_file *file)
{
return open_file_for_io(path, flags, file, 0);
}
-extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size);
-extern int compute_multidb_diskspace();
-extern int is_legacy_child(const char *machine_guid);
+char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size);
+int compute_multidb_diskspace();
+int is_legacy_child(const char *machine_guid);
#endif /* NETDATA_RRDENGINELIB_H */
diff --git a/database/engine/rrdenglocking.h b/database/engine/rrdenglocking.h
index 127ddc90c..078eab38b 100644
--- a/database/engine/rrdenglocking.h
+++ b/database/engine/rrdenglocking.h
@@ -8,10 +8,10 @@
/* Forward declarations */
struct page_cache_descr;
-extern struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx);
-extern void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr);
-extern void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
-extern void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
-extern void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx);
+void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr);
+void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
#endif /* NETDATA_RRDENGLOCKING_H */ \ No newline at end of file