diff options
Diffstat (limited to 'database/engine')
26 files changed, 902 insertions, 1569 deletions
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am index 43405001..59250a99 100644 --- a/database/engine/Makefile.am +++ b/database/engine/Makefile.am @@ -4,7 +4,6 @@ AUTOMAKE_OPTIONS = subdir-objects MAINTAINERCLEANFILES = $(srcdir)/Makefile.in SUBDIRS = \ - metadata_log \ $(NULL) dist_noinst_DATA = \ diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 2ed98ef8..9c70068d 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -174,7 +174,7 @@ int create_data_file(struct rrdengine_datafile *datafile) rrd_stat_atomic_add(&global_io_errors, 1); } uv_fs_req_cleanup(&req); - free(superblock); + posix_memfree(superblock); if (ret < 0) { destroy_data_file(datafile); return ret; @@ -218,7 +218,7 @@ static int check_data_file_superblock(uv_file file) ret = 0; } error: - free(superblock); + posix_memfree(superblock); return ret; } @@ -444,44 +444,17 @@ void finalize_data_files(struct rrdengine_instance *ctx) struct rrdengine_journalfile *journalfile; struct extent_info *extent, *next_extent; - size_t extents_number = 0; - size_t extents_bytes = 0; - size_t page_compressed_sizes = 0; - - size_t files_number = 0; - size_t files_bytes = 0; - for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) { journalfile = datafile->journalfile; next_datafile = datafile->next; for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) { - extents_number++; - extents_bytes += sizeof(*extent) + sizeof(struct rrdeng_page_descr *) * extent->number_of_pages; - page_compressed_sizes += extent->size; - next_extent = extent->next; freez(extent); } close_journal_file(journalfile, datafile); close_data_file(datafile); - - files_number++; - files_bytes += sizeof(*journalfile) + sizeof(*datafile); - freez(journalfile); freez(datafile); } - - if(!files_number) files_number = 1; - if(!extents_number) extents_number = 1; - - info("DBENGINE STATISTICS ON DATAFILES:" - " Files %zu, structures %zu bytes, %0.2f bytes per file." - " Extents %zu, structures %zu bytes, %0.2f bytes per extent." - " Compressed size of all pages: %zu bytes." - , files_number, files_bytes, (double)files_bytes/files_number - , extents_number, extents_bytes, (double)extents_bytes/extents_number - , page_compressed_sizes - ); } diff --git a/database/engine/datafile.h b/database/engine/datafile.h index ae94bfdd..1cf256af 100644 --- a/database/engine/datafile.h +++ b/database/engine/datafile.h @@ -52,16 +52,16 @@ struct rrdengine_datafile_list { struct rrdengine_datafile *last; /* newest */ }; -extern void df_extent_insert(struct extent_info *extent); -extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); -extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); -extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); -extern int close_data_file(struct rrdengine_datafile *datafile); -extern int unlink_data_file(struct rrdengine_datafile *datafile); -extern int destroy_data_file(struct rrdengine_datafile *datafile); -extern int create_data_file(struct rrdengine_datafile *datafile); -extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); -extern int init_data_files(struct rrdengine_instance *ctx); -extern void finalize_data_files(struct rrdengine_instance *ctx); +void df_extent_insert(struct extent_info *extent); +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); +int close_data_file(struct rrdengine_datafile *datafile); +int unlink_data_file(struct rrdengine_datafile *datafile); +int destroy_data_file(struct rrdengine_datafile *datafile); +int create_data_file(struct rrdengine_datafile *datafile); +int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); +int init_data_files(struct rrdengine_instance *ctx); +void finalize_data_files(struct rrdengine_instance *ctx); #endif /* NETDATA_DATAFILE_H */
\ No newline at end of file diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index dc61f569..500dd788 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -17,7 +17,7 @@ static void flush_transaction_buffer_cb(uv_fs_t* req) } uv_fs_req_cleanup(req); - free(io_descr->buf); + posix_memfree(io_descr->buf); freez(io_descr); } @@ -225,7 +225,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng rrd_stat_atomic_add(&global_io_errors, 1); } uv_fs_req_cleanup(&req); - free(superblock); + posix_memfree(superblock); if (ret < 0) { destroy_journal_file(journalfile, datafile); return ret; @@ -268,7 +268,7 @@ static int check_journal_file_superblock(uv_file file) ret = 0; } error: - free(superblock); + posix_memfree(superblock); return ret; } @@ -311,20 +311,46 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden } continue; } - uint64_t start_time = jf_metric_data->descr[i].start_time; - uint64_t end_time = jf_metric_data->descr[i].end_time; + uint64_t start_time_ut = jf_metric_data->descr[i].start_time_ut; + uint64_t end_time_ut = jf_metric_data->descr[i].end_time_ut; + size_t entries = jf_metric_data->descr[i].page_length / page_type_size[page_type]; + time_t update_every_s = (entries > 1) ? ((end_time_ut - start_time_ut) / USEC_PER_SEC / (entries - 1)) : 0; + + if (unlikely(start_time_ut > end_time_ut)) { + ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter++; + if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut < end_time_ut) + ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut = end_time_ut; + continue; + } - if (unlikely(start_time > end_time)) { - error("Invalid page encountered, start time %lu > end time %lu", start_time , end_time ); + if (unlikely(start_time_ut == end_time_ut && entries != 1)) { + ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter++; + if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut < end_time_ut) + ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut = end_time_ut; continue; } - if (unlikely(start_time == end_time)) { - size_t entries = jf_metric_data->descr[i].page_length / page_type_size[page_type]; - if (unlikely(entries > 1)) { - error("Invalid page encountered, start time %lu = end time but %zu entries were found", start_time, entries); - continue; - } + if (unlikely(!entries)) { + ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter++; + if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut < end_time_ut) + ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut = end_time_ut; + continue; + } + + if(entries > 1 && update_every_s == 0) { + ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter++; + if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut < end_time_ut) + ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut = end_time_ut; + continue; + } + + if(start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1) != end_time_ut) { + ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter++; + if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut < end_time_ut) + ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut = end_time_ut; + + // let this be + // end_time_ut = start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1); } temp_id = (uuid_t *)jf_metric_data->descr[i].uuid; @@ -340,7 +366,7 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden uv_rwlock_wrlock(&pg_cache->metrics_index.lock); PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ - *PValue = page_index = create_page_index(temp_id); + *PValue = page_index = create_page_index(temp_id, ctx); page_index->prev = pg_cache->metrics_index.last_page_index; pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); @@ -348,21 +374,32 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden descr = pg_cache_create_descr(); descr->page_length = jf_metric_data->descr[i].page_length; - descr->start_time = start_time; - descr->end_time = end_time; + descr->start_time_ut = start_time_ut; + descr->end_time_ut = end_time_ut; + descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s); descr->id = &page_index->id; descr->extent = extent; descr->type = page_type; extent->pages[valid_pages++] = descr; pg_cache_insert(ctx, page_index, descr); + + if(page_index->latest_time_ut == descr->end_time_ut) + page_index->latest_update_every_s = descr->update_every_s; + + if(descr->update_every_s == 0) + fatal( + "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu", + (unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries); } extent->number_of_pages = valid_pages; if (likely(valid_pages)) df_extent_insert(extent); - else + else { freez(extent); + ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++; + } } /* @@ -442,27 +479,30 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde //data_file_size = journalfile->datafile->pos; TODO: utilize this? max_id = 1; - ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES); - if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); + bool journal_is_mmapped = (journalfile->data != NULL); + if (unlikely(!journal_is_mmapped)) { + ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES); + if (unlikely(ret)) + fatal("posix_memalign:%s", strerror(ret)); } - + else + buf = journalfile->data + sizeof(struct rrdeng_jf_sb); for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) { size_bytes = MIN(READAHEAD_BYTES, file_size - pos); - iov = uv_buf_init(buf, size_bytes); - ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL); - if (ret < 0) { - error("uv_fs_read: pos=%"PRIu64", %s", pos, uv_strerror(ret)); + if (unlikely(!journal_is_mmapped)) { + iov = uv_buf_init(buf, size_bytes); + ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL); + if (ret < 0) { + error("uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto skip_file; + } + fatal_assert(req.result >= 0); uv_fs_req_cleanup(&req); - goto skip_file; + ++ctx->stats.io_read_requests; + ctx->stats.io_read_bytes += size_bytes; } - fatal_assert(req.result >= 0); - uv_fs_req_cleanup(&req); - ctx->stats.io_read_bytes += size_bytes; - ++ctx->stats.io_read_requests; - //pos_i = pos; - //while (pos_i < pos + size_bytes) { for (pos_i = 0 ; pos_i < size_bytes ; ) { unsigned max_size; @@ -475,9 +515,12 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde pos_i += ret; max_id = MAX(max_id, id); } + if (likely(journal_is_mmapped)) + buf += size_bytes; } skip_file: - free(buf); + if (unlikely(!journal_is_mmapped)) + posix_memfree(buf); return max_id; } @@ -512,12 +555,16 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi journalfile->file = file; journalfile->pos = file_size; + journalfile->data = netdata_mmap(path, file_size, MAP_SHARED, 0); + info("Loading journal file \"%s\" using %s.", path, journalfile->data?"MMAP":"uv_fs_read"); max_id = iterate_transactions(ctx, journalfile); ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1); info("Journal file \"%s\" loaded (size:%"PRIu64").", path, file_size); + if (likely(journalfile->data)) + netdata_munmap(journalfile->data, file_size); return 0; error: diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h index f6c43cd1..011c5065 100644 --- a/database/engine/journalfile.h +++ b/database/engine/journalfile.h @@ -19,7 +19,7 @@ struct rrdengine_journalfile; struct rrdengine_journalfile { uv_file file; uint64_t pos; - + void *data; struct rrdengine_datafile *datafile; }; @@ -33,17 +33,17 @@ struct transaction_commit_log { unsigned buf_size; }; -extern void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); -extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); -extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size); -extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc); -extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); -extern int unlink_journal_file(struct rrdengine_journalfile *journalfile); -extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); -extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); -extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, +void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); +void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size); +void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc); +int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +int unlink_journal_file(struct rrdengine_journalfile *journalfile); +int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); -extern void init_commit_log(struct rrdengine_instance *ctx); +void init_commit_log(struct rrdengine_instance *ctx); #endif /* NETDATA_JOURNALFILE_H */
\ No newline at end of file diff --git a/database/engine/metadata_log/Makefile.am b/database/engine/metadata_log/Makefile.am deleted file mode 100644 index 161784b8..00000000 --- a/database/engine/metadata_log/Makefile.am +++ /dev/null @@ -1,8 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -AUTOMAKE_OPTIONS = subdir-objects -MAINTAINERCLEANFILES = $(srcdir)/Makefile.in - -dist_noinst_DATA = \ - README.md \ - $(NULL) diff --git a/database/engine/metadata_log/compaction.c b/database/engine/metadata_log/compaction.c deleted file mode 100644 index ba19e1ed..00000000 --- a/database/engine/metadata_log/compaction.c +++ /dev/null @@ -1,86 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#define NETDATA_RRD_INTERNALS - -#include "metadatalog.h" - -/* Return 0 on success. */ -int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles, - unsigned *matched_files) -{ - int ret; - unsigned starting_fileno, fileno, i, j, recovered_files; - struct metadata_logfile *metalogfile = NULL, *compactionfile = NULL, **tmp_metalogfiles; - char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; - - for (i = 0 ; i < *matched_files ; ++i) { - metalogfile = metalogfiles[i]; - if (0 == metalogfile->starting_fileno) - continue; /* skip standard metadata log files */ - break; /* this is a compaction temporary file */ - } - if (i == *matched_files) /* no recovery needed */ - return 0; - info("Starting metadata log file failure recovery procedure in \"%s\".", dbfiles_path); - - if (*matched_files - i > 1) { /* Can't have more than 1 temporary compaction files */ - error("Metadata log files are in an invalid state. Cannot proceed."); - return 1; - } - compactionfile = metalogfile; - starting_fileno = compactionfile->starting_fileno; - fileno = compactionfile->fileno; - /* scratchpad space to move file pointers around */ - tmp_metalogfiles = callocz(*matched_files, sizeof(*tmp_metalogfiles)); - - for (j = 0, recovered_files = 0 ; j < i ; ++j) { - metalogfile = metalogfiles[j]; - fatal_assert(0 == metalogfile->starting_fileno); - if (metalogfile->fileno < starting_fileno) { - tmp_metalogfiles[recovered_files++] = metalogfile; - continue; - } - break; /* reached compaction file serial number */ - } - - if ((j == i) /* Shouldn't be possible, invalid compaction temporary file */ || - (metalogfile->fileno == starting_fileno && metalogfile->fileno == fileno)) { - error("Deleting invalid compaction temporary file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL - METALOG_EXTENSION"\"", dbfiles_path, starting_fileno, fileno); - unlink_metadata_logfile(compactionfile); - freez(compactionfile); - freez(tmp_metalogfiles); - --*matched_files; /* delete the last one */ - - info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path); - return 0; - } - - for ( ; j < i ; ++j) { /* continue iterating through normal metadata log files */ - metalogfile = metalogfiles[j]; - fatal_assert(0 == metalogfile->starting_fileno); - if (metalogfile->fileno < fileno) { /* It has already been compacted */ - error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL - METALOG_EXTENSION"\"", dbfiles_path, 0U, metalogfile->fileno); - unlink_metadata_logfile(metalogfile); - freez(metalogfile); - continue; - } - tmp_metalogfiles[recovered_files++] = metalogfile; - } - - /* compaction temporary file is valid */ - tmp_metalogfiles[recovered_files++] = compactionfile; - ret = rename_metadata_logfile(compactionfile, 0, starting_fileno); - if (ret < 0) { - error("Cannot rename temporary compaction files. Cannot proceed."); - freez(tmp_metalogfiles); - return 1; - } - - memcpy(metalogfiles, tmp_metalogfiles, recovered_files * sizeof(*metalogfiles)); - *matched_files = recovered_files; - freez(tmp_metalogfiles); - - info("Finished metadata log file failure recovery procedure in \"%s\".", dbfiles_path); - return 0; -} diff --git a/database/engine/metadata_log/compaction.h b/database/engine/metadata_log/compaction.h deleted file mode 100644 index d0461344..00000000 --- a/database/engine/metadata_log/compaction.h +++ /dev/null @@ -1,14 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_COMPACTION_H -#define NETDATA_COMPACTION_H - -#ifndef _GNU_SOURCE -#define _GNU_SOURCE -#endif -#include "../rrdengine.h" - -extern int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles, - unsigned *matched_files); - -#endif /* NETDATA_COMPACTION_H */ diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c deleted file mode 100644 index 07eb9b6f..00000000 --- a/database/engine/metadata_log/logfile.c +++ /dev/null @@ -1,447 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#include <database/sqlite/sqlite_functions.h> -#include "metadatalog.h" -#include "metalogpluginsd.h" - - -void generate_metadata_logfile_path(struct metadata_logfile *metalogfile, char *str, size_t maxlen) -{ - (void) snprintfz(str, maxlen, "%s/" METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION, - metalogfile->ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno); -} - -void metadata_logfile_init(struct metadata_logfile *metalogfile, struct metalog_instance *ctx, unsigned starting_fileno, - unsigned fileno) -{ - metalogfile->starting_fileno = starting_fileno; - metalogfile->fileno = fileno; - metalogfile->file = (uv_file)0; - metalogfile->pos = 0; - metalogfile->next = NULL; - metalogfile->ctx = ctx; -} - -int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, unsigned new_fileno) -{ - //struct metalog_instance *ctx = metalogfile->ctx; - uv_fs_t req; - int ret; - char oldpath[RRDENG_PATH_MAX], newpath[RRDENG_PATH_MAX]; - unsigned backup_starting_fileno, backup_fileno; - - backup_starting_fileno = metalogfile->starting_fileno; - backup_fileno = metalogfile->fileno; - generate_metadata_logfile_path(metalogfile, oldpath, sizeof(oldpath)); - metalogfile->starting_fileno = new_starting_fileno; - metalogfile->fileno = new_fileno; - generate_metadata_logfile_path(metalogfile, newpath, sizeof(newpath)); - - info("Renaming metadata log file \"%s\" to \"%s\".", oldpath, newpath); - ret = uv_fs_rename(NULL, &req, oldpath, newpath, NULL); - if (ret < 0) { - error("uv_fs_rename(%s): %s", oldpath, uv_strerror(ret)); - //++ctx->stats.fs_errors; /* this is racy, may miss some errors */ - rrd_stat_atomic_add(&global_fs_errors, 1); - /* restore previous values */ - metalogfile->starting_fileno = backup_starting_fileno; - metalogfile->fileno = backup_fileno; - } - uv_fs_req_cleanup(&req); - - return ret; -} - -int unlink_metadata_logfile(struct metadata_logfile *metalogfile) -{ - //struct metalog_instance *ctx = metalogfile->ctx; - uv_fs_t req; - int ret; - char path[RRDENG_PATH_MAX]; - - generate_metadata_logfile_path(metalogfile, path, sizeof(path)); - - ret = uv_fs_unlink(NULL, &req, path, NULL); - if (ret < 0) { - error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); -// ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); - } - uv_fs_req_cleanup(&req); - - return ret; -} - -static int check_metadata_logfile_superblock(uv_file file) -{ - int ret; - struct rrdeng_metalog_sb *superblock; - uv_buf_t iov; - uv_fs_t req; - - ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); - if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); - } - iov = uv_buf_init((void *)superblock, sizeof(*superblock)); - - ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); - if (ret < 0) { - error("uv_fs_read: %s", uv_strerror(ret)); - uv_fs_req_cleanup(&req); - goto error; - } - fatal_assert(req.result >= 0); - uv_fs_req_cleanup(&req); - - if (strncmp(superblock->magic_number, RRDENG_METALOG_MAGIC, RRDENG_MAGIC_SZ)) { - error("File has invalid superblock."); - ret = UV_EINVAL; - } else { - ret = 0; - } - if (superblock->version > RRDENG_METALOG_VER) { - error("File has unknown version %"PRIu16". Compatibility is not guaranteed.", superblock->version); - } -error: - free(superblock); - return ret; -} - -void replay_record(struct metadata_logfile *metalogfile, struct rrdeng_metalog_record_header *header, void *payload) -{ - struct metalog_instance *ctx = metalogfile->ctx; - char *line, *nextline, *record_end; - int ret; - - debug(D_METADATALOG, "RECORD contents: %.*s", (int)header->payload_length, (char *)payload); - record_end = (char *)payload + header->payload_length - 1; - *record_end = '\0'; - - for (line = payload ; line ; line = nextline) { - nextline = strchr(line, '\n'); - if (nextline) { - *nextline++ = '\0'; - } - ret = parser_action(ctx->metalog_parser_object->parser, line); - debug(D_METADATALOG, "parser_action ret:%d", ret); - if (ret) - return; /* skip record due to error */ - }; -} - -/* This function only works with buffered I/O */ -static inline int metalogfile_read(struct metadata_logfile *metalogfile, void *buf, size_t len, uint64_t offset) -{ -// struct metalog_instance *ctx; - uv_file file; - uv_buf_t iov; - uv_fs_t req; - int ret; - -// ctx = metalogfile->ctx; - file = metalogfile->file; - iov = uv_buf_init(buf, len); - ret = uv_fs_read(NULL, &req, file, &iov, 1, offset, NULL); - if (unlikely(ret < 0 && ret != req.result)) { - fatal("uv_fs_read: %s", uv_strerror(ret)); - } - if (req.result < 0) { -// ++ctx->stats.io_errors; - rrd_stat_atomic_add(&global_io_errors, 1); - error("%s: uv_fs_read - %s - record at offset %"PRIu64"(%u) in metadata logfile %u-%u.", __func__, - uv_strerror((int)req.result), offset, (unsigned)len, metalogfile->starting_fileno, metalogfile->fileno); - } - uv_fs_req_cleanup(&req); -// ctx->stats.io_read_bytes += len; -// ++ctx->stats.io_read_requests; - - return ret; -} - -/* Return 0 on success */ -static int metadata_record_integrity_check(void *record) -{ - int ret; - uint32_t data_size; - struct rrdeng_metalog_record_header *header; - struct rrdeng_metalog_record_trailer *trailer; - uLong crc; - - header = record; - data_size = header->header_length + header->payload_length; - trailer = record + data_size; - - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, record, data_size); - ret = crc32cmp(trailer->checksum, crc); - - return ret; -} - -#define MAX_READ_BYTES (RRDENG_BLOCK_SIZE * 32) /* no record should be over 128KiB in this version */ - -/* - * Iterates metadata log file records and creates database objects (host/chart/dimension) - */ -static void iterate_records(struct metadata_logfile *metalogfile) -{ - uint32_t file_size, pos, bytes_remaining, record_size; - void *buf; - struct rrdeng_metalog_record_header *header; - struct metalog_instance *ctx = metalogfile->ctx; - struct metalog_pluginsd_state *state = ctx->metalog_parser_object->private; - const size_t min_header_size = offsetof(struct rrdeng_metalog_record_header, header_length) + - sizeof(header->header_length); - - file_size = metalogfile->pos; - state->metalogfile = metalogfile; - - buf = mallocz(MAX_READ_BYTES); - - for (pos = sizeof(struct rrdeng_metalog_sb) ; pos < file_size ; pos += record_size) { - bytes_remaining = file_size - pos; - if (bytes_remaining < min_header_size) { - error("%s: unexpected end of file in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, - metalogfile->fileno); - break; - } - if (metalogfile_read(metalogfile, buf, min_header_size, pos) < 0) - break; - header = (struct rrdeng_metalog_record_header *)buf; - if (METALOG_STORE_PADDING == header->type) { - info("%s: Skipping padding in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, - metalogfile->fileno); - record_size = ALIGN_BYTES_FLOOR(pos + RRDENG_BLOCK_SIZE) - pos; - continue; - } - if (metalogfile_read(metalogfile, buf + min_header_size, sizeof(*header) - min_header_size, - pos + min_header_size) < 0) - break; - record_size = header->header_length + header->payload_length + sizeof(struct rrdeng_metalog_record_trailer); - if (header->header_length < min_header_size || record_size > bytes_remaining) { - error("%s: Corrupted record in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno, - metalogfile->fileno); - break; - } - if (record_size > MAX_READ_BYTES) { - error("%s: Record is too long (%u bytes) in metadata logfile %u-%u.", __func__, record_size, - metalogfile->starting_fileno, metalogfile->fileno); - continue; - } - if (metalogfile_read(metalogfile, buf + sizeof(*header), record_size - sizeof(*header), - pos + sizeof(*header)) < 0) - break; - if (metadata_record_integrity_check(buf)) { - error("%s: Record at offset %"PRIu32" was read from disk. CRC32 check: FAILED", __func__, pos); - continue; - } - debug(D_METADATALOG, "%s: Record at offset %"PRIu32" was read from disk. CRC32 check: SUCCEEDED", __func__, - pos); - - replay_record(metalogfile, header, buf + header->header_length); - } - - freez(buf); -} - -int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *metalogfile) -{ - UNUSED(ctx); - uv_fs_t req; - uv_file file; - int ret, fd, error; - uint64_t file_size; - char path[RRDENG_PATH_MAX]; - - generate_metadata_logfile_path(metalogfile, path, sizeof(path)); - if (file_is_migrated(path)) - return 0; - - fd = open_file_buffered_io(path, O_RDWR, &file); - if (fd < 0) { -// ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); - return fd; - } - info("Loading metadata log \"%s\".", path); - - ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_metalog_sb)); - if (ret) - goto error; - - ret = check_metadata_logfile_superblock(file); - if (ret) - goto error; -// ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb); -// ++ctx->stats.io_read_requests; - - metalogfile->file = file; - metalogfile->pos = file_size; - - iterate_records(metalogfile); - - info("Metadata log \"%s\" migrated to the database (size:%"PRIu64").", path, file_size); - add_migrated_file(path, file_size); - return 0; - -error: - error = ret; - ret = uv_fs_close(NULL, &req, file, NULL); - if (ret < 0) { - error("uv_fs_close(%s): %s", path, uv_strerror(ret)); -// ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); - } - uv_fs_req_cleanup(&req); - return error; -} - -static int scan_metalog_files_cmp(const void *a, const void *b) -{ - struct metadata_logfile *file1, *file2; - char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; - - file1 = *(struct metadata_logfile **)a; - file2 = *(struct metadata_logfile **)b; - generate_metadata_logfile_path(file1, path1, sizeof(path1)); - generate_metadata_logfile_path(file2, path2, sizeof(path2)); - return strcmp(path1, path2); -} - -/* Returns number of metadata logfiles that were loaded or < 0 on error */ -static int scan_metalog_files(struct metalog_instance *ctx) -{ - int ret; - unsigned starting_no, no, matched_files, i, failed_to_load; - static uv_fs_t req; - uv_dirent_t dent; - struct metadata_logfile **metalogfiles, *metalogfile; - char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; - - ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL); - if (ret < 0) { - fatal_assert(req.result < 0); - uv_fs_req_cleanup(&req); - error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret)); -// ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); - return ret; - } - info("Found %d files in path %s", ret, dbfiles_path); - - metalogfiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*metalogfiles)); - for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { - info("Scanning file \"%s/%s\"", dbfiles_path, dent.name); - ret = sscanf(dent.name, METALOG_PREFIX METALOG_FILE_NUMBER_SCAN_TMPL METALOG_EXTENSION, &starting_no, &no); - if (2 == ret) { - info("Matched file \"%s/%s\"", dbfiles_path, dent.name); - metalogfile = mallocz(sizeof(*metalogfile)); - metadata_logfile_init(metalogfile, ctx, starting_no, no); - metalogfiles[matched_files++] = metalogfile; - } - } - uv_fs_req_cleanup(&req); - - if (0 == matched_files) { - freez(metalogfiles); - return 0; - } - if (matched_files == MAX_DATAFILES) { - error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); - } - qsort(metalogfiles, matched_files, sizeof(*metalogfiles), scan_metalog_files_cmp); - ret = compaction_failure_recovery(ctx, metalogfiles, &matched_files); - if (ret) { /* If the files are corrupted fail */ - for (i = 0 ; i < matched_files ; ++i) { - freez(metalogfiles[i]); - } - freez(metalogfiles); - return UV_EINVAL; - } - //ctx->last_fileno = metalogfiles[matched_files - 1]->fileno; - - struct plugind cd = { - .enabled = 1, - .update_every = 0, - .pid = 0, - .serial_failures = 0, - .successful_collections = 0, - .obsolete = 0, - .started_t = INVALID_TIME, - .next = NULL, - .version = 0, - }; - - struct metalog_pluginsd_state metalog_parser_state; - metalog_pluginsd_state_init(&metalog_parser_state, ctx); - - PARSER_USER_OBJECT metalog_parser_object = { - .enabled = cd.enabled, - .host = ctx->rrdeng_ctx->host, - .cd = &cd, - .trust_durations = 0, - .private = &metalog_parser_state - }; - - PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT); - parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host); - parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid); - parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context); - parser_add_keyword(parser, PLUGINSD_KEYWORD_TOMBSTONE, pluginsd_tombstone); - parser->plugins_action->dimension_action = &metalog_pluginsd_dimension_action; - parser->plugins_action->chart_action = &metalog_pluginsd_chart_action; - parser->plugins_action->guid_action = &metalog_pluginsd_guid_action; - parser->plugins_action->context_action = &metalog_pluginsd_context_action; - parser->plugins_action->tombstone_action = &metalog_pluginsd_tombstone_action; - parser->plugins_action->host_action = &metalog_pluginsd_host_action; - - - metalog_parser_object.parser = parser; - ctx->metalog_parser_object = &metalog_parser_object; - - for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { - metalogfile = metalogfiles[i]; - db_lock(); - db_execute("BEGIN TRANSACTION;"); - ret = load_metadata_logfile(ctx, metalogfile); - if (0 != ret) { - error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL - METALOG_EXTENSION"\"", dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno); - unlink_metadata_logfile(metalogfile); - ++failed_to_load; - db_execute("ROLLBACK TRANSACTION;"); - } - else - db_execute("COMMIT TRANSACTION;"); - db_unlock(); - freez(metalogfile); - } - matched_files -= failed_to_load; - debug(D_METADATALOG, "PARSER ended"); - - parser_destroy(parser); - - size_t count __maybe_unused = metalog_parser_object.count; - - debug(D_METADATALOG, "Parsing count=%u", (unsigned)count); - - freez(metalogfiles); - return matched_files; -} - -/* Return 0 on success. */ -int init_metalog_files(struct metalog_instance *ctx) -{ - int ret; - char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path; - - ret = scan_metalog_files(ctx); - if (ret < 0) { - error("Failed to scan path \"%s\".", dbfiles_path); - return ret; - }/* else if (0 == ret) { - ctx->last_fileno = 1; - }*/ - - return 0; -} diff --git a/database/engine/metadata_log/logfile.h b/database/engine/metadata_log/logfile.h deleted file mode 100644 index df12ac71..00000000 --- a/database/engine/metadata_log/logfile.h +++ /dev/null @@ -1,39 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_LOGFILE_H -#define NETDATA_LOGFILE_H - -#include "metadatalogprotocol.h" -#include "../rrdengine.h" - -/* Forward declarations */ -struct metadata_logfile; -struct metalog_worker_config; - -#define METALOG_PREFIX "metadatalog-" -#define METALOG_EXTENSION ".mlf" - -/* only one event loop is supported for now */ -struct metadata_logfile { - unsigned fileno; /* Starts at 1 */ - unsigned starting_fileno; /* 0 for normal files, staring number during compaction */ - uv_file file; - uint64_t pos; - struct metalog_instance *ctx; - struct metadata_logfile *next; -}; - -struct metadata_logfile_list { - struct metadata_logfile *first; /* oldest */ - struct metadata_logfile *last; /* newest */ -}; - -extern void generate_metadata_logfile_path(struct metadata_logfile *metadatalog, char *str, size_t maxlen); -extern int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, - unsigned new_fileno); -extern int unlink_metadata_logfile(struct metadata_logfile *metalogfile); -extern int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *logfile); -extern int init_metalog_files(struct metalog_instance *ctx); - - -#endif /* NETDATA_LOGFILE_H */ diff --git a/database/engine/metadata_log/metadatalog.h b/database/engine/metadata_log/metadatalog.h deleted file mode 100644 index 483036a9..00000000 --- a/database/engine/metadata_log/metadatalog.h +++ /dev/null @@ -1,28 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_METADATALOG_H -#define NETDATA_METADATALOG_H - -#ifndef _GNU_SOURCE -#define _GNU_SOURCE -#endif -#include "../rrdengine.h" -#include "metadatalogprotocol.h" -#include "logfile.h" -#include "metadatalogapi.h" -#include "compaction.h" - -/* Forward declarations */ -struct metalog_instance; -struct parser_user_object; - -#define METALOG_FILE_NUMBER_SCAN_TMPL "%5u-%5u" -#define METALOG_FILE_NUMBER_PRINT_TMPL "%5.5u-%5.5u" - -struct metalog_instance { - struct rrdengine_instance *rrdeng_ctx; - struct parser_user_object *metalog_parser_object; - uint8_t initialized; /* set to 1 to mark context initialized */ -}; - -#endif /* NETDATA_METADATALOG_H */ diff --git a/database/engine/metadata_log/metadatalogapi.c b/database/engine/metadata_log/metadatalogapi.c deleted file mode 100755 index b206cca0..00000000 --- a/database/engine/metadata_log/metadatalogapi.c +++ /dev/null @@ -1,39 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#define NETDATA_RRD_INTERNALS - -#include "metadatalog.h" - -/* - * Returns 0 on success, negative on error - */ -int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx) -{ - struct metalog_instance *ctx; - int error; - - ctx = callocz(1, sizeof(*ctx)); - ctx->initialized = 0; - rrdeng_parent_ctx->metalog_ctx = ctx; - - ctx->rrdeng_ctx = rrdeng_parent_ctx; - error = init_metalog_files(ctx); - if (error) { - goto error_after_init_rrd_files; - } - ctx->initialized = 1; /* notify dbengine that the metadata log has finished initializing */ - return 0; - -error_after_init_rrd_files: - freez(ctx); - return UV_EIO; -} - -/* This function is called by dbengine rotation logic when the metric has no writers */ -void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid) -{ - uuid_t multihost_uuid; - - delete_dimension_uuid(metric_uuid); - rrdeng_convert_legacy_uuid_to_multihost(ctx->rrdeng_ctx->machine_guid, metric_uuid, &multihost_uuid); - delete_dimension_uuid(&multihost_uuid); -} diff --git a/database/engine/metadata_log/metadatalogapi.h b/database/engine/metadata_log/metadatalogapi.h deleted file mode 100644 index d558b931..00000000 --- a/database/engine/metadata_log/metadatalogapi.h +++ /dev/null @@ -1,12 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_METADATALOGAPI_H -#define NETDATA_METADATALOGAPI_H - -extern void metalog_commit_delete_chart(RRDSET *st); -extern void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid); - -/* must call once before using anything */ -extern int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx); - -#endif /* NETDATA_METADATALOGAPI_H */ diff --git a/database/engine/metadata_log/metadatalogprotocol.h b/database/engine/metadata_log/metadatalogprotocol.h deleted file mode 100644 index 1017213a..00000000 --- a/database/engine/metadata_log/metadatalogprotocol.h +++ /dev/null @@ -1,53 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_METADATALOGPROTOCOL_H -#define NETDATA_METADATALOGPROTOCOL_H - -#include "../rrddiskprotocol.h" - -#define RRDENG_METALOG_MAGIC "netdata-metadata-log" - -#define RRDENG_METALOG_VER (1) - -#define RRDENG_METALOG_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + sizeof(uint16_t))) -/* - * Metadata log persistent super-block - */ -struct rrdeng_metalog_sb { - char magic_number[RRDENG_MAGIC_SZ]; - uint16_t version; - uint8_t padding[RRDENG_METALOG_SB_PADDING_SZ]; -} __attribute__ ((packed)); - -/* - * Metadata log record types - */ -#define METALOG_STORE_PADDING (0) -#define METALOG_CREATE_OBJECT (1) -#define METALOG_DELETE_OBJECT (2) -#define METALOG_OTHER (3) /* reserved */ - -/* - * Metadata log record header - */ -struct rrdeng_metalog_record_header { - /* when set to METALOG_STORE_PADDING jump to start of next block */ - uint8_t type; - - uint16_t header_length; - uint32_t payload_length; - /****************************************************** - * No fields above this point can ever change. * - ****************************************************** - * All fields below this point are subject to change. * - ******************************************************/ -} __attribute__ ((packed)); - -/* - * Metadata log record trailer - */ -struct rrdeng_metalog_record_trailer { - uint8_t checksum[CHECKSUM_SZ]; /* CRC32 */ -} __attribute__ ((packed)); - -#endif /* NETDATA_METADATALOGPROTOCOL_H */ diff --git a/database/engine/metadata_log/metalogpluginsd.c b/database/engine/metadata_log/metalogpluginsd.c deleted file mode 100755 index a5301bc1..00000000 --- a/database/engine/metadata_log/metalogpluginsd.c +++ /dev/null @@ -1,140 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#define NETDATA_RRD_INTERNALS - -#include "metadatalog.h" -#include "metalogpluginsd.h" - -extern struct config stream_config; - -PARSER_RC metalog_pluginsd_host_action( - void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, - char *tags) -{ - struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; - - RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0); - if (host) { - if (unlikely(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)) { - error("Archived host '%s' has memory mode '%s', but the archived one is '%s'. Ignoring archived state.", - host->hostname, rrd_memory_mode_name(host->rrd_memory_mode), - rrd_memory_mode_name(RRD_MEMORY_MODE_DBENGINE)); - ((PARSER_USER_OBJECT *) user)->host = NULL; /* Ignore objects if memory mode is not dbengine */ - } - ((PARSER_USER_OBJECT *) user)->host = host; - return PARSER_RC_OK; - } - - if (strcmp(machine_guid, registry_get_this_machine_guid()) == 0) { - ((PARSER_USER_OBJECT *) user)->host = host; - return PARSER_RC_OK; - } - - if (likely(!uuid_parse(machine_guid, state->host_uuid))) { - int rc = sql_store_host(&state->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags, 1); - if (unlikely(rc)) { - errno = 0; - error("Failed to store host %s with UUID %s in the database", hostname, machine_guid); - } - } - else { - errno = 0; - error("Host machine GUID %s is not valid", machine_guid); - } - - return PARSER_RC_OK; -} - -PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context, - char *title, char *units, char *plugin, char *module, int priority, - int update_every, RRDSET_TYPE chart_type, char *options) -{ - UNUSED(options); - - struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; - - if (unlikely(uuid_is_null(state->host_uuid))) { - debug(D_METADATALOG, "Ignoring chart belonging to missing or ignored host."); - return PARSER_RC_OK; - } - uuid_copy(state->chart_uuid, state->uuid); - uuid_clear(state->uuid); /* Consume UUID */ - (void) sql_store_chart(&state->chart_uuid, &state->host_uuid, - type, id, name, family, context, title, units, - plugin, module, priority, update_every, - chart_type, RRD_MEMORY_MODE_DBENGINE, host ? host->rrd_history_entries : 1); - ((PARSER_USER_OBJECT *)user)->st_exists = 1; - - return PARSER_RC_OK; -} - -PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, - long multiplier, long divisor, char *options, RRD_ALGORITHM algorithm_type) -{ - struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; - UNUSED(user); - UNUSED(options); - UNUSED(algorithm); - UNUSED(st); - - if (unlikely(uuid_is_null(state->chart_uuid))) { - debug(D_METADATALOG, "Ignoring dimension belonging to missing or ignored chart."); - info("Ignoring dimension belonging to missing or ignored chart."); - return PARSER_RC_OK; - } - - if (unlikely(uuid_is_null(state->uuid))) { - debug(D_METADATALOG, "Ignoring dimension without unknown UUID"); - info("Ignoring dimension without unknown UUID"); - return PARSER_RC_OK; - } - - (void) sql_store_dimension(&state->uuid, &state->chart_uuid, id, name, multiplier, divisor, algorithm_type); - uuid_clear(state->uuid); /* Consume UUID */ - - return PARSER_RC_OK; -} - -PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid) -{ - struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; - - uuid_copy(state->uuid, *uuid); - - return PARSER_RC_OK; -} - -PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid) -{ - struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private; - - int rc = find_uuid_type(uuid); - - if (rc == 1) { - uuid_copy(state->host_uuid, *uuid); - ((PARSER_USER_OBJECT *)user)->st_exists = 0; - ((PARSER_USER_OBJECT *)user)->host_exists = 1; - } else if (rc == 2) { - uuid_copy(state->chart_uuid, *uuid); - ((PARSER_USER_OBJECT *)user)->st_exists = 1; - } else - uuid_copy(state->uuid, *uuid); - - return PARSER_RC_OK; -} - -PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid) -{ - UNUSED(user); - UNUSED(uuid); - - return PARSER_RC_OK; -} - -void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx) -{ - state->ctx = ctx; - state->skip_record = 0; - uuid_clear(state->uuid); - state->metalogfile = NULL; -} diff --git a/database/engine/metadata_log/metalogpluginsd.h b/database/engine/metadata_log/metalogpluginsd.h deleted file mode 100644 index 4fd8c390..00000000 --- a/database/engine/metadata_log/metalogpluginsd.h +++ /dev/null @@ -1,33 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_METALOGPLUGINSD_H -#define NETDATA_METALOGPLUGINSD_H - -#include "collectors/plugins.d/pluginsd_parser.h" -#include "collectors/plugins.d/plugins_d.h" -#include "parser/parser.h" - -struct metalog_pluginsd_state { - struct metalog_instance *ctx; - uuid_t uuid; - uuid_t host_uuid; - uuid_t chart_uuid; - uint8_t skip_record; /* skip this record due to errors in parsing */ - struct metadata_logfile *metalogfile; /* current metadata log file being replayed */ -}; - -extern void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct metalog_instance *ctx); - -extern PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, - char *context, char *title, char *units, char *plugin, char *module, - int priority, int update_every, RRDSET_TYPE chart_type, char *options); -extern PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, - long multiplier, long divisor, char *options, - RRD_ALGORITHM algorithm_type); -extern PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid); -extern PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid); -extern PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid); -extern PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC metalog_pluginsd_host_action(void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone, char *tags); - -#endif /* NETDATA_METALOGPLUGINSD_H */ diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 39f7642d..d65cb35a 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -4,8 +4,8 @@ #include "rrdengine.h" ARAL page_descr_aral = { - .element_size = sizeof(struct rrdeng_page_descr), - .elements = 20000, + .requested_element_size = sizeof(struct rrdeng_page_descr), + .initial_elements = 20000, .filename = "page_descriptors", .cache_dir = &netdata_configured_cache_dir, .use_mmap = false, @@ -127,12 +127,13 @@ struct rrdeng_page_descr *pg_cache_create_descr(void) descr = rrdeng_page_descr_mallocz(); descr->page_length = 0; - descr->start_time = INVALID_TIME; - descr->end_time = INVALID_TIME; + descr->start_time_ut = INVALID_TIME; + descr->end_time_ut = INVALID_TIME; descr->id = NULL; descr->extent = NULL; descr->pg_cache_descr_state = 0; descr->pg_cache_descr = NULL; + descr->update_every_s = 0; return descr; } @@ -476,7 +477,7 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); uv_rwlock_wrlock(&page_index->lock); - ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0); + ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0); if (unlikely(0 == ret)) { uv_rwlock_wrunlock(&page_index->lock); if (unlikely(debug_flags & D_RRDENGINE)) { @@ -506,7 +507,7 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d while (!pg_cache_try_get_unsafe(descr, 1)) { debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "", true); pg_cache_wait_event_unsafe(descr); } } @@ -517,7 +518,7 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) { debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__); if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "", true); pg_cache_wait_event_unsafe(descr); } } @@ -548,8 +549,8 @@ static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t { usec_t pg_start, pg_end; - pg_start = descr->start_time; - pg_end = descr->end_time; + pg_start = descr->start_time_ut; + pg_end = descr->end_time_ut; return (pg_start < start_time && pg_end >= start_time) || (pg_start >= start_time && pg_start <= end_time); @@ -557,7 +558,7 @@ static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time) { - return (point_in_time >= descr->start_time && point_in_time <= descr->end_time); + return (point_in_time >= descr->start_time_ut && point_in_time <= descr->end_time_ut); } /* The caller must hold the page index lock */ @@ -592,14 +593,14 @@ static inline struct rrdeng_page_descr * /* Update metric oldest and latest timestamps efficiently when adding new values */ void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr) { - usec_t oldest_time = page_index->oldest_time; - usec_t latest_time = page_index->latest_time; + usec_t oldest_time = page_index->oldest_time_ut; + usec_t latest_time = page_index->latest_time_ut; - if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) { - page_index->oldest_time = descr->start_time; + if (unlikely(oldest_time == INVALID_TIME || descr->start_time_ut < oldest_time)) { + page_index->oldest_time_ut = descr->start_time_ut; } - if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) { - page_index->latest_time = descr->end_time; + if (likely(descr->end_time_ut > latest_time || latest_time == INVALID_TIME)) { + page_index->latest_time_ut = descr->end_time_ut; } } @@ -618,23 +619,23 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index) firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0); if (likely(NULL != firstPValue)) { descr = *firstPValue; - oldest_time = descr->start_time; + oldest_time = descr->start_time_ut; } lastIndex = (Word_t)-1; lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0); if (likely(NULL != lastPValue)) { descr = *lastPValue; - latest_time = descr->end_time; + latest_time = descr->end_time_ut; } uv_rwlock_rdunlock(&page_index->lock); if (unlikely(NULL == firstPValue)) { fatal_assert(NULL == lastPValue); - page_index->oldest_time = page_index->latest_time = INVALID_TIME; + page_index->oldest_time_ut = page_index->latest_time_ut = INVALID_TIME; return; } - page_index->oldest_time = oldest_time; - page_index->latest_time = latest_time; + page_index->oldest_time_ut = oldest_time; + page_index->latest_time_ut = latest_time; } /* If index is NULL lookup by UUID (descr->id) */ @@ -669,7 +670,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index } uv_rwlock_wrlock(&page_index->lock); - PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0); + PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0); *PValue = descr; ++page_index->page_count; pg_cache_add_new_metric_time(page_index, descr); @@ -681,7 +682,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); } -usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time) +usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut) { struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr = NULL; @@ -699,25 +700,25 @@ usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, } uv_rwlock_rdlock(&page_index->lock); - descr = find_first_page_in_time_range(page_index, start_time, end_time); + descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); if (NULL == descr) { uv_rwlock_rdunlock(&page_index->lock); return INVALID_TIME; } uv_rwlock_rdunlock(&page_index->lock); - return descr->start_time; + return descr->start_time_ut; } /** * Return page information for the first page before point_in_time that satisfies the filter. * @param ctx DB context * @param page_index page index of a metric - * @param point_in_time the pages that are searched must be older than this timestamp + * @param point_in_time_ut the pages that are searched must be older than this timestamp * @param filter decides if the page satisfies the caller's criteria * @param page_info the result of the search is set in this pointer */ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, - usec_t point_in_time, pg_cache_page_info_filter_t *filter, + usec_t point_in_time_ut, pg_cache_page_info_filter_t *filter, struct rrdeng_page_info *page_info) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -728,7 +729,7 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c (void)pg_cache; fatal_assert(NULL != page_index); - Index = (Word_t)(point_in_time / USEC_PER_SEC); + Index = (Word_t)(point_in_time_ut / USEC_PER_SEC); uv_rwlock_rdlock(&page_index->lock); do { PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0); @@ -736,12 +737,12 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c } while (descr != NULL && !filter(descr)); if (unlikely(NULL == descr)) { page_info->page_length = 0; - page_info->start_time = INVALID_TIME; - page_info->end_time = INVALID_TIME; + page_info->start_time_ut = INVALID_TIME; + page_info->end_time_ut = INVALID_TIME; } else { page_info->page_length = descr->page_length; - page_info->start_time = descr->start_time; - page_info->end_time = descr->end_time; + page_info->start_time_ut = descr->start_time_ut; + page_info->end_time_ut = descr->end_time_ut; } uv_rwlock_rdunlock(&page_index->lock); } @@ -750,7 +751,7 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c * Searches for an unallocated page without triggering disk I/O. Attempts to reserve the page and get a reference. * @param ctx DB context * @param id lookup by UUID - * @param start_time exact starting time in usec + * @param start_time_ut exact starting time in usec * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID. * @return the page descriptor or NULL on failure. It can fail if: * 1. The page is already allocated to the page cache. @@ -758,7 +759,7 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c * 3. It did not succeed to reserve a spot in the page cache. */ struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id, - usec_t start_time) + usec_t start_time_ut) { struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr = NULL; @@ -781,7 +782,7 @@ struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_ } uv_rwlock_rdlock(&page_index->lock); - Index = (Word_t)(start_time / USEC_PER_SEC); + Index = (Word_t)(start_time_ut / USEC_PER_SEC); PValue = JudyLGet(page_index->JudyL_array, Index, PJE0); if (likely(NULL != PValue)) { descr = *PValue; @@ -818,15 +819,15 @@ struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_ * Does not get a reference. * @param ctx DB context * @param id UUID - * @param start_time inclusive starting time in usec - * @param end_time inclusive ending time in usec + * @param start_time_ut inclusive starting time in usec + * @param end_time_ut inclusive ending time in usec * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez(). * If page_info_arrayp is set to NULL nothing was allocated. * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID. * @return the number of pages that overlap with the time range [start_time,end_time]. */ -unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time, +unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut, struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -854,14 +855,14 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta } uv_rwlock_rdlock(&page_index->lock); - descr = find_first_page_in_time_range(page_index, start_time, end_time); + descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); if (NULL == descr) { uv_rwlock_rdunlock(&page_index->lock); debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); *ret_page_indexp = NULL; return 0; } else { - Index = (Word_t)(descr->start_time / USEC_PER_SEC); + Index = (Word_t)(descr->start_time_ut / USEC_PER_SEC); } if (page_info_arrayp) { page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); @@ -869,7 +870,7 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta } for (count = 0, preload_count = 0 ; - descr != NULL && is_page_in_time_range(descr, start_time, end_time) ; + descr != NULL && is_page_in_time_range(descr, start_time_ut, end_time_ut) ; PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue) { /* Iterate all pages in range */ @@ -881,8 +882,8 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size); } - (*page_info_arrayp)[count].start_time = descr->start_time; - (*page_info_arrayp)[count].end_time = descr->end_time; + (*page_info_arrayp)[count].start_time_ut = descr->start_time_ut; + (*page_info_arrayp)[count].end_time_ut = descr->end_time_ut; (*page_info_arrayp)[count].page_length = descr->page_length; } ++count; @@ -974,7 +975,7 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta */ struct rrdeng_page_descr * pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, - usec_t point_in_time) + usec_t point_in_time_ut) { struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr = NULL; @@ -1003,15 +1004,15 @@ struct rrdeng_page_descr * page_not_in_cache = 0; uv_rwlock_rdlock(&page_index->lock); while (1) { - Index = (Word_t)(point_in_time / USEC_PER_SEC); + Index = (Word_t)(point_in_time_ut / USEC_PER_SEC); PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); if (likely(NULL != PValue)) { descr = *PValue; } if (NULL == PValue || 0 == descr->page_length || - (INVALID_TIME != point_in_time && - !is_point_in_time_in_page(descr, point_in_time))) { + (INVALID_TIME != point_in_time_ut && + !is_point_in_time_in_page(descr, point_in_time_ut))) { /* non-empty page not found */ uv_rwlock_rdunlock(&page_index->lock); @@ -1038,7 +1039,7 @@ struct rrdeng_page_descr * debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "", true); while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { pg_cache_wait_event_unsafe(descr); } @@ -1053,7 +1054,7 @@ struct rrdeng_page_descr * uv_rwlock_rdunlock(&page_index->lock); debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "", true); if (!(flags & RRD_PAGE_POPULATED)) page_not_in_cache = 1; pg_cache_wait_event_unsafe(descr); @@ -1081,7 +1082,7 @@ struct rrdeng_page_descr * */ struct rrdeng_page_descr * pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, - usec_t start_time, usec_t end_time) + usec_t start_time_ut, usec_t end_time_ut) { struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr = NULL; @@ -1110,7 +1111,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index uv_rwlock_rdlock(&page_index->lock); int retry_count = 0; while (1) { - descr = find_first_page_in_time_range(page_index, start_time, end_time); + descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); if (NULL == descr || 0 == descr->page_length || retry_count == default_rrdeng_page_fetch_retries) { /* non-empty page not found */ if (retry_count == default_rrdeng_page_fetch_retries) @@ -1140,7 +1141,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "", true); while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { pg_cache_wait_event_unsafe(descr); } @@ -1155,7 +1156,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index uv_rwlock_rdunlock(&page_index->lock); debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "", true); if (!(flags & RRD_PAGE_POPULATED)) page_not_in_cache = 1; @@ -1180,7 +1181,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index return descr; } -struct pg_cache_page_index *create_page_index(uuid_t *id) +struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_instance *ctx) { struct pg_cache_page_index *page_index; @@ -1188,11 +1189,15 @@ struct pg_cache_page_index *create_page_index(uuid_t *id) page_index->JudyL_array = (Pvoid_t) NULL; uuid_copy(page_index->id, *id); fatal_assert(0 == uv_rwlock_init(&page_index->lock)); - page_index->oldest_time = INVALID_TIME; - page_index->latest_time = INVALID_TIME; + page_index->oldest_time_ut = INVALID_TIME; + page_index->latest_time_ut = INVALID_TIME; page_index->prev = NULL; page_index->page_count = 0; + page_index->refcount = 0; page_index->writers = 0; + page_index->ctx = ctx; + page_index->alignment = NULL; + page_index->latest_update_every_s = default_rrd_update_every; return page_index; } @@ -1238,24 +1243,6 @@ void init_page_cache(struct rrdengine_instance *ctx) init_committed_page_index(ctx); } - - -/* - * METRIC # number - * 1. INDEX: JudyHS # bytes - * 2. DATA: page_index # bytes - * - * PAGE (1 page of 1 metric) # number - * 1. INDEX AT METRIC: page_index->JudyL_array # bytes - * 2. DATA: descr # bytes - * - * PAGE CACHE (1 page of 1 metric at the cache) # number - * 1. pg_cache_descr (if PG_CACHE_DESCR_ALLOCATED) # bytes - * 2. data (if RRD_PAGE_POPULATED) # bytes - * - */ - - void free_page_cache(struct rrdengine_instance *ctx) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -1265,30 +1252,15 @@ void free_page_cache(struct rrdengine_instance *ctx) struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; - Word_t metrics_number = 0, - metrics_bytes = 0, - metrics_index_bytes = 0, - metrics_duration = 0; - - Word_t pages_number = 0, - pages_bytes = 0, - pages_index_bytes = 0; - - Word_t pages_size_per_type[256] = { 0 }, - pages_count_per_type[256] = { 0 }; - - Word_t cache_pages_number = 0, - cache_pages_bytes = 0, - cache_pages_data_bytes = 0; - - size_t points_in_db = 0, - uncompressed_points_size = 0, - seconds_in_db = 0, - single_point_pages = 0; - - Word_t pages_dirty_index_bytes = 0; - - usec_t oldest_time_ut = LONG_MAX, latest_time_ut = 0; + // if we are exiting, the OS will recover all memory so do not slow down the shutdown process + // Do the cleanup if we are compiling with NETDATA_INTERNAL_CHECKS + // This affects the reporting of dbengine statistics which are available in real time + // via the /api/v1/dbengine_stats endpoint +#ifndef NETDATA_DBENGINE_FREE + if (netdata_exit) + return; +#endif + Word_t metrics_index_bytes = 0, pages_index_bytes = 0, pages_dirty_index_bytes = 0; /* Free committed page index */ pages_dirty_index_bytes = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0); @@ -1305,116 +1277,30 @@ void free_page_cache(struct rrdengine_instance *ctx) PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); descr = unlikely(NULL == PValue) ? NULL : *PValue; - size_t metric_duration = 0; - size_t metric_update_every = 0; - size_t metric_single_point_pages = 0; - while (descr != NULL) { /* Iterate all page descriptors of this metric */ if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { - cache_pages_number++; - /* Check rrdenglocking.c */ pg_cache_descr = descr->pg_cache_descr; if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { dbengine_page_free(pg_cache_descr->page); - cache_pages_data_bytes += RRDENG_BLOCK_SIZE; } rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); - cache_pages_bytes += sizeof(*pg_cache_descr); } - - if(descr->start_time < oldest_time_ut) - oldest_time_ut = descr->start_time; - - if(descr->end_time > latest_time_ut) - latest_time_ut = descr->end_time; - - pages_size_per_type[descr->type] += descr->page_length; - pages_count_per_type[descr->type]++; - - size_t points_in_page = (descr->page_length / PAGE_POINT_SIZE_BYTES(descr)); - size_t page_duration = ((descr->end_time - descr->start_time) / USEC_PER_SEC); - size_t update_every = (page_duration == 0) ? 1 : page_duration / (points_in_page - 1); - - if (!page_duration && metric_update_every) { - page_duration = metric_update_every; - update_every = metric_update_every; - } - else if(page_duration) - metric_update_every = update_every; - - uncompressed_points_size += descr->page_length; - - if(page_duration > 0) { - page_duration = update_every * points_in_page; - metric_duration += page_duration; - seconds_in_db += page_duration; - points_in_db += descr->page_length / PAGE_POINT_SIZE_BYTES(descr); - } - else - metric_single_point_pages++; - rrdeng_page_descr_freez(descr); - pages_bytes += sizeof(*descr); - pages_number++; PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0); descr = unlikely(NULL == PValue) ? NULL : *PValue; } - if(metric_single_point_pages && metric_update_every) { - points_in_db += metric_single_point_pages; - seconds_in_db += metric_update_every * metric_single_point_pages; - metric_duration += metric_update_every * metric_single_point_pages; - } - else - single_point_pages += metric_single_point_pages; - /* Free page index */ pages_index_bytes += JudyLFreeArray(&page_index->JudyL_array, PJE0); fatal_assert(NULL == page_index->JudyL_array); freez(page_index); - - metrics_number++; - metrics_bytes += sizeof(*page_index); - metrics_duration += metric_duration; } /* Free metrics index */ metrics_index_bytes = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0); fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array); - - if(!metrics_number) metrics_number = 1; - if(!pages_number) pages_number = 1; - if(!cache_pages_number) cache_pages_number = 1; - if(!points_in_db) points_in_db = 1; - if(latest_time_ut == oldest_time_ut) oldest_time_ut -= USEC_PER_SEC; - - if(single_point_pages) { - long double avg_duration = (long double)seconds_in_db / points_in_db; - points_in_db += single_point_pages; - seconds_in_db += (size_t)(avg_duration * single_point_pages); - } - - info("DBENGINE STATISTICS ON METRICS:" - " Metrics: %lu (structures %lu bytes - per metric %0.2f, index (HS) %lu bytes - per metric %0.2f bytes - duration %zu secs) |" - " Page descriptors: %lu (structures %lu bytes - per page %0.2f bytes, index (L) %lu bytes - per page %0.2f, dirty index %lu bytes). |" - " Page cache: %lu pages (structures %lu bytes - per page %0.2f bytes, data %lu bytes). |" - " Points in db %zu, uncompressed size of points database %zu bytes. |" - " Duration of all points %zu seconds, average point duration %0.2f seconds." - " Duration of the database %llu seconds, average metric duration %0.2f seconds, average metric lifetime %0.2f%%." - , metrics_number, metrics_bytes, (double)metrics_bytes/metrics_number, metrics_index_bytes, (double)metrics_index_bytes/metrics_number, metrics_duration - , pages_number, pages_bytes, (double)pages_bytes/pages_number, pages_index_bytes, (double)pages_index_bytes/pages_number, pages_dirty_index_bytes - , cache_pages_number, cache_pages_bytes, (double)cache_pages_bytes/cache_pages_number, cache_pages_data_bytes - , points_in_db, uncompressed_points_size - , seconds_in_db, (double)seconds_in_db/points_in_db - , (latest_time_ut - oldest_time_ut) / USEC_PER_SEC, (double)metrics_duration/metrics_number - , (double)metrics_duration/metrics_number * 100.0 / ((latest_time_ut - oldest_time_ut) / USEC_PER_SEC) - ); - - for(int i = 0; i < 256 ;i++) { - if(pages_count_per_type[i]) - info("DBENGINE STATISTICS ON PAGE TYPES: page type %d total pages %lu, average page size %0.2f bytes", i, pages_count_per_type[i], (double)pages_size_per_type[i]/pages_count_per_type[i]); - } + info("Freed %lu bytes of memory from page cache.", pages_dirty_index_bytes + pages_index_bytes + metrics_index_bytes); } diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index b938b9e0..2f4d6b33 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -60,18 +60,19 @@ struct rrdeng_page_descr { volatile unsigned long pg_cache_descr_state; /* page information */ - usec_t start_time; - usec_t end_time; - uint32_t page_length; + usec_t start_time_ut; + usec_t end_time_ut; + uint32_t update_every_s:24; uint8_t type; + uint32_t page_length; }; #define PAGE_INFO_SCRATCH_SZ (8) struct rrdeng_page_info { uint8_t scratch[PAGE_INFO_SCRATCH_SZ]; /* scratch area to be used by page-cache users */ - usec_t start_time; - usec_t end_time; + usec_t start_time_ut; + usec_t end_time_ut; uint32_t page_length; }; @@ -80,6 +81,11 @@ typedef int pg_cache_page_info_filter_t(struct rrdeng_page_descr *); #define PAGE_CACHE_MAX_PRELOAD_PAGES (256) +struct pg_alignment { + uint32_t page_length; + uint32_t refcount; +}; + /* maps time ranges to pages */ struct pg_cache_page_index { uuid_t id; @@ -89,6 +95,7 @@ struct pg_cache_page_index { */ Pvoid_t JudyL_array; Word_t page_count; + unsigned short refcount; unsigned short writers; uv_rwlock_t lock; @@ -96,13 +103,17 @@ struct pg_cache_page_index { * Only one effective writer, data deletion workqueue. * It's also written during the DB loading phase. */ - usec_t oldest_time; + usec_t oldest_time_ut; /* * Only one effective writer, data collection thread. * It's also written by the data deletion workqueue when data collection is disabled for this metric. */ - usec_t latest_time; + usec_t latest_time_ut; + + struct rrdengine_instance *ctx; + struct pg_alignment *alignment; + uint32_t latest_update_every_s; struct pg_cache_page_index *prev; }; @@ -152,93 +163,93 @@ struct page_cache { /* TODO: add statistics */ unsigned populated_pages; }; -extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr); -extern void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); -extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr); -extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); -extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, +void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr); +void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr); +unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); -extern void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, +void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); -extern void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, +void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); -extern struct rrdeng_page_descr *pg_cache_create_descr(void); -extern int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access); -extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr); -extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); -extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, +struct rrdeng_page_descr *pg_cache_create_descr(void); +int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access); +void pg_cache_put_unsafe(struct rrdeng_page_descr *descr); +void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, struct rrdeng_page_descr *descr); -extern uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, +uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty, uint8_t is_exclusive_holder, uuid_t *metric_id); -extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, - usec_t start_time, usec_t end_time); -extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, - usec_t point_in_time, pg_cache_page_info_filter_t *filter, +usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, + usec_t start_time_ut, usec_t end_time_ut); +void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, + usec_t point_in_time_ut, pg_cache_page_info_filter_t *filter, struct rrdeng_page_info *page_info); -extern struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id, - usec_t start_time); -extern unsigned - pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time, +struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id, + usec_t start_time_ut); +unsigned + pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut, struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp); -extern struct rrdeng_page_descr * +struct rrdeng_page_descr * pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, - usec_t point_in_time); -extern struct rrdeng_page_descr * + usec_t point_in_time_ut); +struct rrdeng_page_descr * pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, - usec_t start_time, usec_t end_time); -extern struct pg_cache_page_index *create_page_index(uuid_t *id); -extern void init_page_cache(struct rrdengine_instance *ctx); -extern void free_page_cache(struct rrdengine_instance *ctx); -extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr); -extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index); -extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx); -extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx); -extern unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx); - -extern void rrdeng_page_descr_aral_go_singlethreaded(void); -extern void rrdeng_page_descr_aral_go_multithreaded(void); -extern void rrdeng_page_descr_use_malloc(void); -extern void rrdeng_page_descr_use_mmap(void); -extern bool rrdeng_page_descr_is_mmap(void); -extern struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void); -extern void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr); + usec_t start_time_ut, usec_t end_time_ut); +struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_instance *ctx); +void init_page_cache(struct rrdengine_instance *ctx); +void free_page_cache(struct rrdengine_instance *ctx); +void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr); +void pg_cache_update_metric_times(struct pg_cache_page_index *page_index); +unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx); +unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx); +unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx); + +void rrdeng_page_descr_aral_go_singlethreaded(void); +void rrdeng_page_descr_aral_go_multithreaded(void); +void rrdeng_page_descr_use_malloc(void); +void rrdeng_page_descr_use_mmap(void); +bool rrdeng_page_descr_is_mmap(void); +struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void); +void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr); static inline void - pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp) + pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_time_ut_p, uint32_t *page_lengthp) { - usec_t end_time, old_end_time; + usec_t end_time_ut, old_end_time_ut; uint32_t page_length; if (NULL == descr->extent) { /* this page is currently being modified, get consistent info locklessly */ do { - end_time = descr->end_time; + end_time_ut = descr->end_time_ut; __sync_synchronize(); - old_end_time = end_time; + old_end_time_ut = end_time_ut; page_length = descr->page_length; __sync_synchronize(); - end_time = descr->end_time; + end_time_ut = descr->end_time_ut; __sync_synchronize(); - } while ((end_time != old_end_time || (end_time & 1) != 0)); + } while ((end_time_ut != old_end_time_ut || (end_time_ut & 1) != 0)); - *end_timep = end_time; + *end_time_ut_p = end_time_ut; *page_lengthp = page_length; } else { - *end_timep = descr->end_time; + *end_time_ut_p = descr->end_time_ut; *page_lengthp = descr->page_length; } } /* The caller must hold a reference to the page and must have already set the new data */ -static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time, uint32_t page_length) +static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time_ut, uint32_t page_length) { - fatal_assert(!(end_time & 1)); + fatal_assert(!(end_time_ut & 1)); __sync_synchronize(); - descr->end_time |= 1; /* mark start of uncertainty period by adding 1 microsecond */ + descr->end_time_ut |= 1; /* mark start of uncertainty period by adding 1 microsecond */ __sync_synchronize(); descr->page_length = page_length; __sync_synchronize(); - descr->end_time = end_time; /* mark end of uncertainty period */ + descr->end_time_ut = end_time_ut; /* mark end of uncertainty period */ } #endif /* NETDATA_PAGECACHE_H */ diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h index cb57385a..5b4be949 100644 --- a/database/engine/rrddiskprotocol.h +++ b/database/engine/rrddiskprotocol.h @@ -46,8 +46,8 @@ struct rrdeng_extent_page_descr { uint8_t uuid[UUID_SZ]; uint32_t page_length; - uint64_t start_time; - uint64_t end_time; + uint64_t start_time_ut; + uint64_t end_time_ut; } __attribute__ ((packed)); /* diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 8b35051d..e4cd37e9 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -30,7 +30,7 @@ void dbengine_page_free(void *page) { if (unlikely(db_engine_use_malloc)) freez(page); else - munmap(page, RRDENG_BLOCK_SIZE); + netdata_munmap(page, RRDENG_BLOCK_SIZE); } static void sanity_check(void) @@ -206,8 +206,8 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str /* care, we don't hold the descriptor mutex */ if (!uuid_compare(*extent->pages[j]->id, *descr->id) && extent->pages[j]->page_length == descr->page_length && - extent->pages[j]->start_time == descr->start_time && - extent->pages[j]->end_time == descr->end_time) { + extent->pages[j]->start_time_ut == descr->start_time_ut && + extent->pages[j]->end_time_ut == descr->end_time_ut) { break; } page_offset += extent->pages[j]->page_length; @@ -272,11 +272,9 @@ static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) } } -void read_extent_cb(uv_fs_t* req) +static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed) { - struct rrdengine_worker_config* wc = req->loop->data; struct rrdengine_instance *ctx = wc->ctx; - struct extent_io_descriptor *xt_io_descr; struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; int ret; @@ -289,21 +287,20 @@ void read_extent_cb(uv_fs_t* req) struct rrdeng_df_extent_trailer *trailer; uLong crc; - xt_io_descr = req->data; header = xt_io_descr->buf; payload_length = header->payload_length; count = header->number_of_pages; payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count; trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer); - if (req->result < 0) { + if (unlikely(read_failed)) { struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; ++ctx->stats.io_errors; rrd_stat_atomic_add(&global_io_errors, 1); have_read_error = 1; - error("%s: uv_fs_read - %s - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, - uv_strerror((int)req->result), xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + error("%s: uv_fs_read - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, xt_io_descr->pos, + xt_io_descr->bytes, datafile->tier, datafile->fileno); goto after_crc_check; } crc = crc32(0L, Z_NULL, 0); @@ -378,8 +375,8 @@ after_crc_check: /* care, we don't hold the descriptor mutex */ if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj->id) && header->descr[i].page_length == descrj->page_length && - header->descr[i].start_time == descrj->start_time && - header->descr[i].end_time == descrj->end_time) { + header->descr[i].start_time_ut == descrj->start_time_ut && + header->descr[i].end_time_ut == descrj->end_time_ut) { descr = descrj; break; } @@ -387,7 +384,7 @@ after_crc_check: is_prefetched_page = 0; if (!descr) { /* This extent page has not been requested. Try populating it for locality (best effort). */ descr = pg_cache_lookup_unpopulated_and_lock(ctx, (uuid_t *)header->descr[i].uuid, - header->descr[i].start_time); + header->descr[i].start_time_ut); if (!descr) continue; /* Failed to reserve a suitable page */ is_prefetched_page = 1; @@ -421,11 +418,67 @@ after_crc_check: } if (xt_io_descr->completion) completion_mark_complete(xt_io_descr->completion); +} + +static void read_extent_cb(uv_fs_t *req) +{ + struct rrdengine_worker_config *wc = req->loop->data; + struct extent_io_descriptor *xt_io_descr; + + xt_io_descr = req->data; + do_extent_processing(wc, xt_io_descr, req->result < 0); uv_fs_req_cleanup(req); - free(xt_io_descr->buf); + posix_memfree(xt_io_descr->buf); freez(xt_io_descr); } +static void read_mmap_extent_cb(uv_work_t *req, int status __maybe_unused) +{ + struct rrdengine_worker_config *wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + struct extent_io_descriptor *xt_io_descr; + xt_io_descr = req->data; + + if (likely(xt_io_descr->map_base)) { + do_extent_processing(wc, xt_io_descr, false); + munmap(xt_io_descr->map_base, xt_io_descr->map_length); + freez(xt_io_descr); + return; + } + + // MMAP failed, so do uv_fs_read + int ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(xt_io_descr->bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + unsigned real_io_size = ALIGN_BYTES_CEILING( xt_io_descr->bytes); + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + xt_io_descr->req.data = xt_io_descr; + ret = uv_fs_read(req->loop, &xt_io_descr->req, xt_io_descr->file, &xt_io_descr->iov, 1, (unsigned) xt_io_descr->pos, read_extent_cb); + fatal_assert(-1 != ret); + ctx->stats.io_read_bytes += real_io_size; + ctx->stats.io_read_extent_bytes += real_io_size; +} + +static void do_mmap_read_extent(uv_work_t *req) +{ + struct extent_io_descriptor *xt_io_descr = (struct extent_io_descriptor * )req->data; + struct rrdengine_worker_config *wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + + off_t map_start = ALIGN_BYTES_FLOOR(xt_io_descr->pos); + size_t length = ALIGN_BYTES_CEILING(xt_io_descr->pos + xt_io_descr->bytes) - map_start; + unsigned real_io_size = xt_io_descr->bytes; + + void *data = mmap(NULL, length, PROT_READ, MAP_SHARED, xt_io_descr->file, map_start); + if (likely(data != MAP_FAILED)) { + xt_io_descr->map_base = data; + xt_io_descr->map_length = length; + xt_io_descr->buf = data + (xt_io_descr->pos - map_start); + ctx->stats.io_read_bytes += real_io_size; + ctx->stats.io_read_extent_bytes += real_io_size; + } +} static void do_read_extent(struct rrdengine_worker_config* wc, struct rrdeng_page_descr **descr, @@ -435,8 +488,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc, struct rrdengine_instance *ctx = wc->ctx; struct page_cache_descr *pg_cache_descr; int ret; - unsigned i, size_bytes, pos, real_io_size; -// uint32_t payload_length; + unsigned i, size_bytes, pos; struct extent_io_descriptor *xt_io_descr; struct rrdengine_datafile *datafile; struct extent_info *extent = descr[0]->extent; @@ -452,18 +504,17 @@ static void do_read_extent(struct rrdengine_worker_config* wc, rrdeng_page_descr_mutex_lock(ctx, descr[i]); pg_cache_descr = descr[i]->pg_cache_descr; pg_cache_descr->flags |= RRD_PAGE_READ_PENDING; -// payload_length = descr[i]->page_length; rrdeng_page_descr_mutex_unlock(ctx, descr[i]); - xt_io_descr->descr_array[i] = descr[i]; } xt_io_descr->descr_count = count; + xt_io_descr->file = datafile->file; xt_io_descr->bytes = size_bytes; xt_io_descr->pos = pos; - xt_io_descr->req.data = xt_io_descr; + xt_io_descr->req_worker.data = xt_io_descr; xt_io_descr->completion = NULL; - /* xt_io_descr->descr_commit_idx_array[0] */ xt_io_descr->release_descr = release_descr; + xt_io_descr->buf = NULL; xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); if (xt_is_cached) { @@ -483,19 +534,10 @@ static void do_read_extent(struct rrdengine_worker_config* wc, } } - ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); - if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); - /* freez(xt_io_descr); - return;*/ - } - real_io_size = ALIGN_BYTES_CEILING(size_bytes); - xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); - ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb); + ret = uv_queue_work(wc->loop, &xt_io_descr->req_worker, do_mmap_read_extent, read_mmap_extent_cb); fatal_assert(-1 != ret); - ctx->stats.io_read_bytes += real_io_size; + ++ctx->stats.io_read_requests; - ctx->stats.io_read_extent_bytes += real_io_size; ++ctx->stats.io_read_extents; ctx->stats.pg_cache_backfills += count; } @@ -696,7 +738,7 @@ void flush_pages_cb(uv_fs_t* req) if (xt_io_descr->completion) completion_mark_complete(xt_io_descr->completion); uv_fs_req_cleanup(req); - free(xt_io_descr->buf); + posix_memfree(xt_io_descr->buf); freez(xt_io_descr); uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); @@ -820,8 +862,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct header->descr[i].type = descr->type; uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); header->descr[i].page_length = descr->page_length; - header->descr[i].start_time = descr->start_time; - header->descr[i].end_time = descr->end_time; + header->descr[i].start_time_ut = descr->start_time_ut; + header->descr[i].end_time_ut = descr->end_time_ut; pos += sizeof(header->descr[i]); } for (i = 0 ; i < count ; ++i) { @@ -922,7 +964,6 @@ static void after_delete_old_data(struct rrdengine_worker_config* wc) wc->now_deleting_files = NULL; wc->cleanup_thread_deleting_files = 0; - aclk_data_rotated(); rrdcontext_db_rotation(); /* interrupt event loop */ @@ -948,12 +989,12 @@ static void delete_old_data(void *arg) for (i = 0 ; i < count ; ++i) { descr = extent->pages[i]; can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id); - if (unlikely(can_delete_metric && ctx->metalog_ctx->initialized)) { + if (unlikely(can_delete_metric)) { /* * If the metric is empty, has no active writers and if the metadata log has been initialized then * attempt to delete the corresponding netdata dimension. */ - metalog_delete_dimension_by_uuid(ctx->metalog_ctx, &metric_id); + metaqueue_delete_dimension_uuid(&metric_id); } } next = extent->next; @@ -1044,7 +1085,70 @@ static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc) /* return 0 on success */ int init_rrd_files(struct rrdengine_instance *ctx) { - return init_data_files(ctx); + int ret = init_data_files(ctx); + + BUFFER *wb = buffer_create(1000); + size_t all_errors = 0; + usec_t now = now_realtime_usec(); + + if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter) { + buffer_sprintf(wb, "%s%zu pages had start time > end time (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter) { + buffer_sprintf(wb, "%s%zu pages had start time = end time with more than 1 entries (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter) { + buffer_sprintf(wb, "%s%zu pages had zero points (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter) { + buffer_sprintf(wb, "%s%zu pages had update every == 0 with entries > 1 (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter) { + buffer_sprintf(wb, "%s%zu pages had a different number of points compared to their timestamps (latest: %llu secs ago; these page have been loaded)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter) { + buffer_sprintf(wb, "%s%zu extents have been dropped because they didn't have any valid pages" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter + ); + all_errors += ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter; + } + + if(all_errors) + info("DBENGINE: tier %d: %s", ctx->tier, buffer_tostring(wb)); + + buffer_free(wb); + return ret; } void finalize_rrd_files(struct rrdengine_instance *ctx) @@ -1139,10 +1243,6 @@ void timer_cb(uv_timer_t* handle) uv_stop(handle->loop); uv_update_time(handle->loop); - if (unlikely(!ctx->metalog_ctx->initialized)) { - worker_is_idle(); - return; /* Wait for the metadata log to initialize */ - } rrdeng_test_quota(wc); debug(D_RRDENGINE, "%s: timeout reached.", __func__); if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) { @@ -1329,7 +1429,7 @@ void rrdeng_worker(void* arg) } /* cleanup operations of the event loop */ - info("Shutting down RRD engine event loop."); + info("Shutting down RRD engine event loop for tier %d", ctx->tier); /* * uv_async_send after uv_close does not seem to crash in linux at the moment, @@ -1344,7 +1444,7 @@ void rrdeng_worker(void* arg) wal_flush_transaction_buffer(wc); uv_run(loop, UV_RUN_DEFAULT); - info("Shutting down RRD engine event loop complete."); + info("Shutting down RRD engine event loop for tier %d complete", ctx->tier); /* TODO: don't let the API block by waiting to enqueue commands */ uv_cond_destroy(&wc->cmd_cond); /* uv_mutex_destroy(&wc->cmd_mutex); */ diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index 4b383b62..fedadbe8 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -17,7 +17,6 @@ #include "rrdenginelib.h" #include "datafile.h" #include "journalfile.h" -#include "metadata_log/metadatalog.h" #include "rrdengineapi.h" #include "pagecache.h" #include "rrdenglocking.h" @@ -37,29 +36,25 @@ struct rrdengine_instance; #define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u" struct rrdeng_collect_handle { - struct rrdeng_metric_handle *metric_handle; + struct pg_cache_page_index *page_index; struct rrdeng_page_descr *descr; unsigned long page_correlation_id; - struct rrdengine_instance *ctx; // set to 1 when this dimension is not page aligned with the other dimensions in the chart uint8_t unaligned_page; }; struct rrdeng_query_handle { - struct rrdeng_metric_handle *metric_handle; struct rrdeng_page_descr *descr; struct rrdengine_instance *ctx; struct pg_cache_page_index *page_index; - time_t next_page_time; - time_t now; + time_t wanted_start_time_s; + time_t now_s; unsigned position; unsigned entries; - TIER_QUERY_FETCH tier_query_fetch_type; storage_number *page; - usec_t page_end_time; + usec_t page_end_time_ut; uint32_t page_length; - usec_t dt; - time_t dt_sec; + time_t dt_s; }; typedef enum { @@ -110,8 +105,12 @@ struct rrdeng_cmdqueue { struct extent_io_descriptor { uv_fs_t req; + uv_work_t req_worker; uv_buf_t iov; + uv_file file; void *buf; + void *map_base; + size_t map_length; uint64_t pos; unsigned bytes; struct completion *completion; @@ -230,8 +229,16 @@ extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of dele #define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */ #define QUIESCED (2) /* is set after all threads have finished running */ +typedef enum { + LOAD_ERRORS_PAGE_FLIPPED_TIME = 0, + LOAD_ERRORS_PAGE_EQUAL_TIME = 1, + LOAD_ERRORS_PAGE_ZERO_ENTRIES = 2, + LOAD_ERRORS_PAGE_UPDATE_ZERO = 3, + LOAD_ERRORS_PAGE_FLEXY_TIME = 4, + LOAD_ERRORS_DROPPED_EXTENT = 5, +} INVALID_PAGE_ID; + struct rrdengine_instance { - struct metalog_instance *metalog_ctx; struct rrdengine_worker_config worker_config; struct completion rrdengine_completion; struct page_cache pg_cache; @@ -254,16 +261,21 @@ struct rrdengine_instance { uint8_t page_type; /* Default page type for this context */ struct rrdengine_statistics stats; + + struct { + size_t counter; + usec_t latest_end_time_ut; + } load_errors[6]; }; -extern void *dbengine_page_alloc(void); -extern void dbengine_page_free(void *page); +void *dbengine_page_alloc(void); +void dbengine_page_free(void *page); -extern int init_rrd_files(struct rrdengine_instance *ctx); -extern void finalize_rrd_files(struct rrdengine_instance *ctx); -extern void rrdeng_test_quota(struct rrdengine_worker_config* wc); -extern void rrdeng_worker(void* arg); -extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd); -extern struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc); +int init_rrd_files(struct rrdengine_instance *ctx); +void finalize_rrd_files(struct rrdengine_instance *ctx); +void rrdeng_test_quota(struct rrdengine_worker_config* wc); +void rrdeng_worker(void* arg); +void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd); +struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc); #endif /* NETDATA_RRDENGINE_H */ diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index f4da2940..27503bae 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -1,5 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" +#include "../storage_engine.h" /* Default global database instance */ struct rrdengine_instance multidb_ctx_storage_tier0; @@ -35,14 +36,31 @@ int default_multidb_disk_quota_mb = 256; /* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */ uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1; -static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host, int tier) { - if(tier < 0 || tier >= RRD_STORAGE_TIERS) tier = 0; - if(!host->storage_instance[tier]) tier = 0; - return (struct rrdengine_instance *)host->storage_instance[tier]; +// ---------------------------------------------------------------------------- +// metrics groups + +STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) { + return callocz(1, sizeof(struct pg_alignment)); +} + +void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) { + if(!smg) return; + + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + struct pg_alignment *pa = (struct pg_alignment *)smg; + struct page_cache *pg_cache = &ctx->pg_cache; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + if(pa->refcount == 0) + freez(pa); + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); } +// ---------------------------------------------------------------------------- +// metric handle for legacy dbs + /* This UUID is not unique across hosts */ -void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid) +void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid) { EVP_MD_CTX *evpctx; unsigned char hash_value[EVP_MAX_MD_SIZE]; @@ -75,98 +93,136 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu memcpy(ret_uuid, hash_value, sizeof(uuid_t)); } -struct rrdeng_metric_handle { - RRDDIM *rd; - struct rrdengine_instance *ctx; - uuid_t *rrdeng_uuid; // database engine metric UUID - struct pg_cache_page_index *page_index; -}; +STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg) { + uuid_t legacy_uuid; + rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid); + return rrdeng_metric_get(db_instance, &legacy_uuid, smg); +} -void rrdeng_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle) { - freez(db_metric_handle); +// ---------------------------------------------------------------------------- +// metric handle + +void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + + unsigned short refcount = __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); + if(refcount == 0 && page_index->alignment) { + __atomic_sub_fetch(&page_index->alignment->refcount, 1, __ATOMIC_SEQ_CST); + page_index->alignment = NULL; + } } -STORAGE_METRIC_HANDLE *rrdeng_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance) { +STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); + return db_metric_handle; +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) { struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; - struct page_cache *pg_cache; - uuid_t legacy_uuid; - uuid_t multihost_legacy_uuid; - Pvoid_t *PValue; + struct pg_alignment *pa = (struct pg_alignment *)smg; + struct page_cache *pg_cache = &ctx->pg_cache; struct pg_cache_page_index *page_index = NULL; - int is_multihost_child = 0; - RRDHOST *host = rd->rrdset->rrdhost; - - pg_cache = &ctx->pg_cache; - - rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid); - if (host != localhost && is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) - is_multihost_child = 1; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &legacy_uuid, sizeof(uuid_t)); - if (likely(NULL != PValue)) { + Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t)); + if (likely(NULL != PValue)) page_index = *PValue; - } uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (is_multihost_child || NULL == PValue) { - /* First time we see the legacy UUID or metric belongs to child host in multi-host DB. - * Drop legacy support, normal path */ - - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - page_index = *PValue; - } - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (NULL == PValue) { - uv_rwlock_wrlock(&pg_cache->metrics_index.lock); - PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t), PJE0); - fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ - *PValue = page_index = create_page_index(&rd->metric_uuid); - page_index->prev = pg_cache->metrics_index.last_page_index; - pg_cache->metrics_index.last_page_index = page_index; - uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + + if (likely(page_index)) { + __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); + + if(pa) { + if(page_index->alignment && page_index->alignment != pa && page_index->writers > 0) + fatal("DBENGINE: page_index has a different alignment (page_index refcount is %u, writers is %u).", + page_index->refcount, page_index->writers); + + page_index->alignment = pa; + __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST); } - } else { - /* There are legacy UUIDs in the database, implement backward compatibility */ + } - rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid, - &multihost_legacy_uuid); + return (STORAGE_METRIC_HANDLE *)page_index; +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) { + internal_fatal(!db_instance, "DBENGINE: db_instance is NULL"); - int need_to_store = uuid_compare(rd->metric_uuid, multihost_legacy_uuid); + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + struct pg_alignment *pa = (struct pg_alignment *)smg; + struct pg_cache_page_index *page_index; + struct page_cache *pg_cache = &ctx->pg_cache; - uuid_copy(rd->metric_uuid, multihost_legacy_uuid); + uv_rwlock_wrlock(&pg_cache->metrics_index.lock); + Pvoid_t *PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t), PJE0); + fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ + *PValue = page_index = create_page_index(uuid, ctx); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; + page_index->alignment = pa; + page_index->refcount = 1; + if(pa) + pa->refcount++; + uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + + return (STORAGE_METRIC_HANDLE *)page_index; +} + +STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) { + STORAGE_METRIC_HANDLE *db_metric_handle; + + db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid, smg); + if(!db_metric_handle) { + db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset), smg); + if(db_metric_handle) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + uuid_copy(rd->metric_uuid, page_index->id); + } + } + if(!db_metric_handle) + db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid, smg); - if (unlikely(need_to_store && !ctx->tier)) - (void)sql_store_dimension(&rd->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor, - rd->algorithm); +#ifdef NETDATA_INTERNAL_CHECKS + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + if(uuid_compare(rd->metric_uuid, page_index->id) != 0) { + char uuid1[UUID_STR_LEN + 1]; + char uuid2[UUID_STR_LEN + 1]; + + uuid_unparse(rd->metric_uuid, uuid1); + uuid_unparse(page_index->id, uuid2); + fatal("DBENGINE: uuids do not match, asked for metric '%s', but got page_index of metric '%s'", uuid1, uuid2); } - struct rrdeng_metric_handle *mh = mallocz(sizeof(struct rrdeng_metric_handle)); - mh->rd = rd; - mh->ctx = ctx; - mh->rrdeng_uuid = &page_index->id; - mh->page_index = page_index; - return (STORAGE_METRIC_HANDLE *)mh; + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + if(page_index->ctx != ctx) + fatal("DBENGINE: mixed up rrdengine instances, asked for metric from %p, got from %p", ctx, page_index->ctx); +#endif + + return db_metric_handle; } + +// ---------------------------------------------------------------------------- +// collect ops + /* * Gets a handle for storing metrics to the database. * The handle must be released with rrdeng_store_metric_final(). */ -STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle) { - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; - +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every) { + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; struct rrdeng_collect_handle *handle; - struct pg_cache_page_index *page_index; + + if(!page_index->alignment) + fatal("DBENGINE: metric group is required for collect operations"); handle = callocz(1, sizeof(struct rrdeng_collect_handle)); - handle->metric_handle = metric_handle; - handle->ctx = metric_handle->ctx; + handle->page_index = page_index; handle->descr = NULL; handle->unaligned_page = 0; + page_index->latest_update_every_s = update_every; - page_index = metric_handle->page_index; uv_rwlock_wrlock(&page_index->lock); ++page_index->writers; uv_rwlock_wrunlock(&page_index->lock); @@ -214,7 +270,7 @@ static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr) void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) { struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; // struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; - struct rrdengine_instance *ctx = handle->ctx; + struct rrdengine_instance *ctx = handle->page_index->ctx; struct rrdeng_page_descr *descr = handle->descr; if (unlikely(!ctx)) return; @@ -227,9 +283,7 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h page_is_empty = page_has_only_empty_metrics(descr); if (page_is_empty) { - debug(D_RRDENGINE, "Page has empty metrics only, deleting:"); - if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "Page has empty metrics only, deleting", true); pg_cache_put(ctx, descr); pg_cache_punch_hole(ctx, descr, 1, 0, NULL); } else @@ -242,8 +296,8 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h handle->descr = NULL; } -void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, - usec_t point_in_time, +static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection_handle, + usec_t point_in_time_ut, NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, @@ -252,11 +306,10 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, SN_FLAGS flags) { struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; - struct rrdengine_instance *ctx = handle->ctx; + struct pg_cache_page_index *page_index = handle->page_index; + struct rrdengine_instance *ctx = handle->page_index->ctx; struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr = handle->descr; - RRDDIM *rd = metric_handle->rd; void *page; uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; @@ -264,21 +317,33 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, if (descr) { /* Make alignment decisions */ - if (descr->page_length == rd->rrdset->rrddim_page_alignment) { +#ifdef NETDATA_INTERNAL_CHECKS + if(descr->end_time_ut + page_index->latest_update_every_s * USEC_PER_SEC != point_in_time_ut) { + char buffer[200 + 1]; + snprintfz(buffer, 200, + "metrics collected are %s, end_time_ut = %llu, point_in_time_ut = %llu, update_every = %u, delta = %llu", + (point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC > page_index->latest_update_every_s)?"far apart":"not aligned", + descr->end_time_ut / USEC_PER_SEC, + point_in_time_ut / USEC_PER_SEC, + page_index->latest_update_every_s, + point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC); + print_page_cache_descr(descr, buffer, false); + } +#endif + + if (descr->page_length == page_index->alignment->page_length) { /* this is the leading dimension that defines chart alignment */ perfect_page_alignment = 1; } /* is the metric far enough out of alignment with the others? */ - if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < rd->rrdset->rrddim_page_alignment)) { + if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < page_index->alignment->page_length)) { handle->unaligned_page = 1; - debug(D_RRDENGINE, "Metric page is not aligned with chart:"); - if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "Metric page is not aligned with chart", true); } if (unlikely(handle->unaligned_page && /* did the other metrics change page? */ - rd->rrdset->rrddim_page_alignment <= PAGE_POINT_SIZE_BYTES(descr))) { - debug(D_RRDENGINE, "Flushing unaligned metric page."); + page_index->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) { + print_page_cache_descr(descr, "must_flush_unaligned_page = 1", true); must_flush_unaligned_page = 1; handle->unaligned_page = 0; } @@ -286,16 +351,21 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, if (unlikely(NULL == descr || descr->page_length + PAGE_POINT_SIZE_BYTES(descr) > RRDENG_BLOCK_SIZE || must_flush_unaligned_page)) { - rrdeng_store_metric_flush_current_page(collection_handle); - page = rrdeng_create_page(ctx, &metric_handle->page_index->id, &descr); + if(descr) { + print_page_cache_descr(descr, "flushing metric", true); + rrdeng_store_metric_flush_current_page(collection_handle); + } + + page = rrdeng_create_page(ctx, &page_index->id, &descr); fatal_assert(page); + descr->update_every_s = page_index->latest_update_every_s; handle->descr = descr; handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1); - if (0 == rd->rrdset->rrddim_page_alignment) { + if (0 == page_index->alignment->page_length) { /* this is the leading dimension that defines chart alignment */ perfect_page_alignment = 1; } @@ -330,13 +400,13 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, break; } - pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + PAGE_POINT_SIZE_BYTES(descr)); + pg_cache_atomic_set_pg_info(descr, point_in_time_ut, descr->page_length + PAGE_POINT_SIZE_BYTES(descr)); if (perfect_page_alignment) - rd->rrdset->rrddim_page_alignment = descr->page_length; - if (unlikely(INVALID_TIME == descr->start_time)) { + page_index->alignment->page_length = descr->page_length; + if (unlikely(INVALID_TIME == descr->start_time_ut)) { unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers; - descr->start_time = point_in_time; + descr->start_time_ut = point_in_time_ut; new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1); while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) { @@ -350,20 +420,111 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, } } - pg_cache_insert(ctx, metric_handle->page_index, descr); + pg_cache_insert(ctx, page_index, descr); } else { - pg_cache_add_new_metric_time(metric_handle->page_index, descr); + pg_cache_add_new_metric_time(page_index, descr); } + +// { +// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 }; +// if(uuid_compare(u, page_index->id) == 0) { +// char buffer[100]; +// snprintfz(buffer, 100, "store system.cpu, collect:%u, page_index first:%u, last:%u", +// (uint32_t)(point_in_time / USEC_PER_SEC), +// (uint32_t)(page_index->oldest_time / USEC_PER_SEC), +// (uint32_t)(page_index->latest_time / USEC_PER_SEC)); +// +// print_page_cache_descr(descr, buffer, false); +// } +// } } +void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, + usec_t point_in_time_ut, + NETDATA_DOUBLE n, + NETDATA_DOUBLE min_value, + NETDATA_DOUBLE max_value, + uint16_t count, + uint16_t anomaly_count, + SN_FLAGS flags) +{ + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct pg_cache_page_index *page_index = handle->page_index; + struct rrdeng_page_descr *descr = handle->descr; + + if(likely(descr)) { + usec_t last_point_in_time_ut = descr->end_time_ut; + usec_t update_every_ut = page_index->latest_update_every_s * USEC_PER_SEC; + size_t points_gap = (point_in_time_ut <= last_point_in_time_ut) ? + (size_t)0 : + (size_t)((point_in_time_ut - last_point_in_time_ut) / update_every_ut); + + if(unlikely(points_gap != 1)) { + if (unlikely(points_gap <= 0)) { + time_t now = now_realtime_sec(); + static __thread size_t counter = 0; + static __thread time_t last_time_logged = 0; + counter++; + + if(now - last_time_logged > 600) { + error("DBENGINE: collected point is in the past (repeated %zu times in the last %zu secs). Ignoring these data collection points.", + counter, (size_t)(last_time_logged?(now - last_time_logged):0)); + + last_time_logged = now; + counter = 0; + } + return; + } + + size_t point_size = PAGE_POINT_SIZE_BYTES(descr); + size_t page_size_in_points = RRDENG_BLOCK_SIZE / point_size; + size_t used_points = descr->page_length / point_size; + size_t remaining_points_in_page = page_size_in_points - used_points; + + bool new_point_is_aligned = true; + if(unlikely((point_in_time_ut - last_point_in_time_ut) / points_gap != update_every_ut)) + new_point_is_aligned = false; + + if(unlikely(points_gap > remaining_points_in_page || !new_point_is_aligned)) { +// char buffer[200]; +// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Cutting page.", +// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s); +// print_page_cache_descr(descr, buffer, false); + + rrdeng_store_metric_flush_current_page(collection_handle); + } + else { +// char buffer[200]; +// snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Filling the gap.", +// points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s); +// print_page_cache_descr(descr, buffer, false); + + // loop to fill the gap + usec_t step_ut = page_index->latest_update_every_s * USEC_PER_SEC; + usec_t last_point_filled_ut = last_point_in_time_ut + step_ut; + + while (last_point_filled_ut < point_in_time_ut) { + rrdeng_store_metric_next_internal( + collection_handle, last_point_filled_ut, NAN, NAN, NAN, + 1, 0, SN_EMPTY_SLOT); + + last_point_filled_ut += step_ut; + } + } + } + } + + rrdeng_store_metric_next_internal(collection_handle, point_in_time_ut, n, min_value, max_value, count, anomaly_count, flags); +} + + /* * Releases the database reference from the handle for storing metrics. * Returns 1 if it's safe to delete the dimension. */ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle; - struct pg_cache_page_index *page_index = metric_handle->page_index; + struct pg_cache_page_index *page_index = handle->page_index; uint8_t can_delete_metric = 0; @@ -378,6 +539,18 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { return can_delete_metric; } +void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct pg_cache_page_index *page_index = handle->page_index; + rrdeng_store_metric_flush_current_page(collection_handle); + uv_rwlock_rdlock(&page_index->lock); + page_index->latest_update_every_s = update_every; + uv_rwlock_rdunlock(&page_index->lock); +} + +// ---------------------------------------------------------------------------- +// query ops + //static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info) //{ // return (uint32_t *)&page_info->scratch[0]; @@ -392,49 +565,45 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { * Gets a handle for loading metrics from the database. * The handle must be released with rrdeng_load_metric_final(). */ -void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type) +void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrdimm_handle, time_t start_time_s, time_t end_time_s) { - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; - struct rrdengine_instance *ctx = metric_handle->ctx; - RRDDIM *rd = metric_handle->rd; + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + struct rrdengine_instance *ctx = page_index->ctx; // fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time); struct rrdeng_query_handle *handle; unsigned pages_nr; - rrdimm_handle->start_time = start_time; - rrdimm_handle->end_time = end_time; + if(!page_index->latest_update_every_s) + page_index->latest_update_every_s = default_rrd_update_every; + + rrdimm_handle->start_time_s = start_time_s; + rrdimm_handle->end_time_s = end_time_s; handle = callocz(1, sizeof(struct rrdeng_query_handle)); - handle->next_page_time = start_time; - handle->now = start_time; - handle->tier_query_fetch_type = tier_query_fetch_type; - // TODO we should store the dt of each page in each page - // this will produce wrong values for dt in case the user changes - // the update every of the charts or the tier grouping iterations - handle->dt_sec = get_tier_grouping(ctx->tier) * (time_t)rd->update_every; - handle->dt = handle->dt_sec * USEC_PER_SEC; + handle->wanted_start_time_s = start_time_s; + handle->now_s = start_time_s; handle->position = 0; handle->ctx = ctx; - handle->metric_handle = metric_handle; handle->descr = NULL; + handle->dt_s = page_index->latest_update_every_s; rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle; - pages_nr = pg_cache_preload(ctx, metric_handle->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, + pages_nr = pg_cache_preload(ctx, &page_index->id, start_time_s * USEC_PER_SEC, end_time_s * USEC_PER_SEC, NULL, &handle->page_index); if (unlikely(NULL == handle->page_index || 0 == pages_nr)) // there are no metrics to load - handle->next_page_time = INVALID_TIME; + handle->wanted_start_time_s = INVALID_TIME; } -static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { +static int rrdeng_load_page_next(struct storage_engine_query_handle *rrdimm_handle, bool debug_this __maybe_unused) { struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; struct rrdengine_instance *ctx = handle->ctx; struct rrdeng_page_descr *descr = handle->descr; uint32_t page_length; - usec_t page_end_time; + usec_t page_end_time_ut; unsigned position; if (likely(descr)) { @@ -446,14 +615,15 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { pg_cache_put(ctx, descr); handle->descr = NULL; - handle->next_page_time = (handle->page_end_time / USEC_PER_SEC) + 1; + handle->wanted_start_time_s = (time_t)((handle->page_end_time_ut / USEC_PER_SEC) + handle->dt_s); - if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) + if (unlikely(handle->wanted_start_time_s > rrdimm_handle->end_time_s)) return 1; } - usec_t next_page_time = handle->next_page_time * USEC_PER_SEC; - descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, next_page_time, rrdimm_handle->end_time * USEC_PER_SEC); + usec_t wanted_start_time_ut = handle->wanted_start_time_s * USEC_PER_SEC; + descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, + wanted_start_time_ut, rrdimm_handle->end_time_s * USEC_PER_SEC); if (NULL == descr) return 1; @@ -462,77 +632,116 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { #endif handle->descr = descr; - pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); - if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == page_end_time)) + pg_cache_atomic_get_pg_info(descr, &page_end_time_ut, &page_length); + if (unlikely(INVALID_TIME == descr->start_time_ut || INVALID_TIME == page_end_time_ut || 0 == descr->update_every_s)) { + error("DBENGINE: discarding invalid page descriptor (start_time = %llu, end_time = %llu, update_every_s = %d)", + descr->start_time_ut, page_end_time_ut, descr->update_every_s); return 1; + } - if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) { + if (unlikely(descr->start_time_ut != page_end_time_ut && wanted_start_time_ut > descr->start_time_ut)) { // we're in the middle of the page somewhere unsigned entries = page_length / PAGE_POINT_SIZE_BYTES(descr); - position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) / - (page_end_time - descr->start_time); + position = ((uint64_t)(wanted_start_time_ut - descr->start_time_ut)) * (entries - 1) / + (page_end_time_ut - descr->start_time_ut); } else position = 0; - handle->page_end_time = page_end_time; + handle->page_end_time_ut = page_end_time_ut; handle->page_length = page_length; + handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr); handle->page = descr->pg_cache_descr->page; - usec_t entries = handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr); - if (likely(entries > 1)) - handle->dt = (page_end_time - descr->start_time) / (entries - 1); - else { - // TODO we should store the dt of each page in each page - // now we keep the dt of whatever was before - ; - } - - handle->dt_sec = (time_t)(handle->dt / USEC_PER_SEC); + handle->dt_s = descr->update_every_s; handle->position = position; +// if(debug_this) +// info("DBENGINE: rrdeng_load_page_next(), " +// "position:%d, " +// "start_time_ut:%llu, " +// "page_end_time_ut:%llu, " +// "next_page_time_ut:%llu, " +// "in_out:%s" +// , position +// , descr->start_time_ut +// , page_end_time_ut +// , +// wanted_start_time_ut, in_out?"true":"false" +// ); + return 0; } // Returns the metric and sets its timestamp into current_time // IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags) // IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES -STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) { - struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; +STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle; // struct rrdeng_metric_handle *metric_handle = handle->metric_handle; - STORAGE_POINT sp; struct rrdeng_page_descr *descr = handle->descr; + time_t now = handle->now_s + handle->dt_s; + +// bool debug_this = false; +// { +// unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 }; +// if(uuid_compare(u, handle->page_index->id) == 0) { +// char buffer[100]; +// snprintfz(buffer, 100, "load system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u", +// (uint32_t)(now), +// (uint32_t)(handle->dt_s), +// (uint32_t)(handle->position), +// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC), +// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC)); +// +// print_page_cache_descr(descr, buffer, false); +// debug_this = true; +// } +// } + + STORAGE_POINT sp; unsigned position = handle->position + 1; - time_t now = handle->now + handle->dt_sec; storage_number_tier1_t tier1_value; - if (unlikely(INVALID_TIME == handle->next_page_time)) { - handle->next_page_time = INVALID_TIME; - handle->now = now; - storage_point_empty(sp, now - handle->dt_sec, now); + if (unlikely(INVALID_TIME == handle->wanted_start_time_s)) { + handle->wanted_start_time_s = INVALID_TIME; + handle->now_s = now; + storage_point_empty(sp, now - handle->dt_s, now); return sp; } if (unlikely(!descr || position >= handle->entries)) { // We need to get a new page - if(rrdeng_load_page_next(rrdimm_handle)) { + if(rrdeng_load_page_next(rrddim_handle, false)) { // next calls will not load any more metrics - handle->next_page_time = INVALID_TIME; - handle->now = now; - storage_point_empty(sp, now - handle->dt_sec, now); + handle->wanted_start_time_s = INVALID_TIME; + handle->now_s = now; + storage_point_empty(sp, now - handle->dt_s, now); return sp; } descr = handle->descr; position = handle->position; - now = (time_t)((descr->start_time + position * handle->dt) / USEC_PER_SEC); + now = (time_t)((descr->start_time_ut / USEC_PER_SEC) + position * descr->update_every_s); + +// if(debug_this) { +// char buffer[100]; +// snprintfz(buffer, 100, "NEW PAGE system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u", +// (uint32_t)(now), +// (uint32_t)(handle->dt_s), +// (uint32_t)(handle->position), +// (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC), +// (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC)); +// +// print_page_cache_descr(descr, buffer, false); +// } } - sp.start_time = now - handle->dt_sec; + sp.start_time = now - handle->dt_s; sp.end_time = now; handle->position = position; - handle->now = now; + handle->now_s = now; switch(descr->type) { case PAGE_METRICS: { @@ -567,24 +776,32 @@ STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) break; } - if (unlikely(now >= rrdimm_handle->end_time)) { + if (unlikely(now >= rrddim_handle->end_time_s)) { // next calls will not load any more metrics - handle->next_page_time = INVALID_TIME; + handle->wanted_start_time_s = INVALID_TIME; } +// if(debug_this) +// info("DBENGINE: returning point: " +// "time from %ld to %ld // query from %ld to %ld // wanted_start_time_s %ld" +// , sp.start_time, sp.end_time +// , rrddim_handle->start_time_s, rrddim_handle->end_time_s +// , handle->wanted_start_time_s +// ); + return sp; } -int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) +int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrdimm_handle) { struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; - return (INVALID_TIME == handle->next_page_time); + return (INVALID_TIME == handle->wanted_start_time_s); } /* * Releases the database reference from the handle for loading metrics. */ -void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) +void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrdimm_handle) { struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; struct rrdengine_instance *ctx = handle->ctx; @@ -603,46 +820,12 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) } time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; - - struct pg_cache_page_index *page_index = metric_handle->page_index; - return page_index->latest_time / USEC_PER_SEC; + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + return (time_t)(page_index->latest_time_ut / USEC_PER_SEC); } time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { - struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle; - - struct pg_cache_page_index *page_index = metric_handle->page_index; - return page_index->oldest_time / USEC_PER_SEC; -} - -int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t, int tier) -{ - struct page_cache *pg_cache; - struct rrdengine_instance *ctx; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index = NULL; - - ctx = get_rrdeng_ctx_from_host(localhost, tier); - if (unlikely(!ctx)) { - error("Failed to fetch multidb context"); - return 1; - } - pg_cache = &ctx->pg_cache; - - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - page_index = *PValue; - } - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - - if (likely(page_index)) { - *first_entry_t = page_index->oldest_time / USEC_PER_SEC; - *last_entry_t = page_index->latest_time / USEC_PER_SEC; - return 0; - } - - return 1; + struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; + return (time_t)(page_index->oldest_time_ut / USEC_PER_SEC); } int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t) @@ -667,8 +850,8 @@ int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); if (likely(page_index)) { - *first_entry_t = page_index->oldest_time / USEC_PER_SEC; - *last_entry_t = page_index->latest_time / USEC_PER_SEC; + *first_entry_t = page_index->oldest_time_ut / USEC_PER_SEC; + *last_entry_t = page_index->latest_time_ut / USEC_PER_SEC; return 0; } @@ -695,7 +878,7 @@ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrde debug(D_RRDENGINE, "Created new page:"); if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); + print_page_cache_descr(descr, "", true); rrdeng_page_descr_mutex_unlock(ctx, descr); *ret_descr = descr; return page; @@ -767,13 +950,13 @@ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void ** } /* Gets a reference for the page */ -void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle) +void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time_ut, void **handle) { struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; debug(D_RRDENGINE, "Reading existing page:"); - descr = pg_cache_lookup(ctx, NULL, id, point_in_time); + descr = pg_cache_lookup(ctx, NULL, id, point_in_time_ut); if (NULL == descr) { *handle = NULL; @@ -849,7 +1032,7 @@ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) * Returns 0 on success, negative on error */ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, - unsigned disk_space_mb, int tier) { + unsigned disk_space_mb, size_t tier) { struct rrdengine_instance *ctx; int error; uint32_t max_open_files; @@ -897,7 +1080,6 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure; ctx->metric_API_max_producers = 0; ctx->quiesce = NO_QUIESCE; - ctx->metalog_ctx = NULL; /* only set this after the metadata log has finished initializing */ ctx->host = host; memset(&ctx->worker_config, 0, sizeof(ctx->worker_config)); @@ -918,11 +1100,11 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p if (ctx->worker_config.error) { goto error_after_rrdeng_worker; } - error = metalog_init(ctx); - if (error) { - error("Failed to initialize metadata log file event loop."); - goto error_after_rrdeng_worker; - } +// error = metalog_init(ctx); +// if (error) { +// error("Failed to initialize metadata log file event loop."); +// goto error_after_rrdeng_worker; +// } return 0; @@ -1010,13 +1192,13 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) { size_t points = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); if(likely(points > 1)) - update_every_usec = (descr->end_time - descr->start_time) / (points - 1); + update_every_usec = (descr->end_time_ut - descr->start_time_ut) / (points - 1); else { update_every_usec = default_rrd_update_every * get_tier_grouping(ctx->tier) * USEC_PER_SEC; stats.single_point_pages++; } - time_t duration_secs = (time_t)((descr->end_time - descr->start_time + update_every_usec)/USEC_PER_SEC); + time_t duration_secs = (time_t)((descr->end_time_ut - descr->start_time_ut + update_every_usec)/USEC_PER_SEC); stats.extents_pages++; stats.pages_uncompressed_bytes += descr->page_length; @@ -1028,11 +1210,11 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) { stats.page_types[descr->type].pages_duration_secs += duration_secs; stats.page_types[descr->type].points += points; - if(!stats.first_t || (descr->start_time - update_every_usec) < stats.first_t) - stats.first_t = (descr->start_time - update_every_usec) / USEC_PER_SEC; + if(!stats.first_t || (descr->start_time_ut - update_every_usec) < stats.first_t) + stats.first_t = (descr->start_time_ut - update_every_usec) / USEC_PER_SEC; - if(!stats.last_t || descr->end_time > stats.last_t) - stats.last_t = descr->end_time / USEC_PER_SEC; + if(!stats.last_t || descr->end_time_ut > stats.last_t) + stats.last_t = descr->end_time_ut / USEC_PER_SEC; } } } @@ -1072,7 +1254,7 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) { } } - stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index)); + stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index) + sizeof(struct pg_alignment)); stats.sizeof_page = struct_natural_alignment(sizeof(struct rrdeng_page_descr)); stats.sizeof_datafile = struct_natural_alignment(sizeof(struct rrdengine_datafile)) + struct_natural_alignment(sizeof(struct rrdengine_journalfile)); stats.sizeof_page_in_cache = struct_natural_alignment(sizeof(struct page_cache_descr)); diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index 509aa48c..85375044 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -25,58 +25,63 @@ extern size_t page_type_size[]; #define PAGE_POINT_SIZE_BYTES(x) page_type_size[(x)->type] struct rrdeng_region_info { - time_t start_time; + time_t start_time_s; int update_every; unsigned points; }; -extern void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr); -extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, +void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr); +void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, Word_t page_correlation_id); -extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle); -extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle); -extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle); +void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle); +void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time_ut, void **handle); +void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle); -extern void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid); -extern void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, +void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid); +void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, uuid_t *ret_uuid); -extern STORAGE_METRIC_HANDLE *rrdeng_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance); -extern void rrdeng_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle); +STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); +STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg); +STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg); +STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg); +void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle); +STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle); -extern STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle); -extern void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle); -extern void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE n, +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every); +void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle); +void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); +void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, uint16_t count, uint16_t anomaly_count, SN_FLAGS flags); -extern int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle); +int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle); -extern unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, - struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list); +void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrdimm_handle, + time_t start_time_s, time_t end_time_s); +STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle); -extern void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *rrdimm_handle, - time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type); -extern STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle); -extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle); -extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle); -extern time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle); -extern time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle); +int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrdimm_handle); +void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrdimm_handle); +time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle); +time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle); -extern void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array); +void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array); /* must call once before using anything */ -extern int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, - unsigned disk_space_mb, int tier); +int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, + unsigned disk_space_mb, size_t tier); -extern int rrdeng_exit(struct rrdengine_instance *ctx); -extern void rrdeng_prepare_exit(struct rrdengine_instance *ctx); -extern int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t, int tier); -extern int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t); +int rrdeng_exit(struct rrdengine_instance *ctx); +void rrdeng_prepare_exit(struct rrdengine_instance *ctx); +int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t); + +extern STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +extern void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); typedef struct rrdengine_size_statistics { size_t default_granularity_secs; @@ -134,6 +139,6 @@ typedef struct rrdengine_size_statistics { double average_page_size_bytes; } RRDENG_SIZE_STATS; -extern RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx); +RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx); #endif /* NETDATA_RRDENGINEAPI_H */ diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c index 287b86be..58bd9c43 100644 --- a/database/engine/rrdenginelib.c +++ b/database/engine/rrdenginelib.c @@ -4,28 +4,45 @@ #define BUFSIZE (512) /* Caller must hold descriptor lock */ -void print_page_cache_descr(struct rrdeng_page_descr *descr) +void print_page_cache_descr(struct rrdeng_page_descr *descr, const char *msg, bool log_debug) { - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - char uuid_str[UUID_STR_LEN]; - char str[BUFSIZE + 1]; - int pos = 0; + if(log_debug && !(debug_flags & D_RRDENGINE)) + return; - uuid_unparse_lower(*descr->id, uuid_str); - pos += snprintfz(str, BUFSIZE - pos, "page(%p) id=%s\n" - "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:", - pg_cache_descr->page, uuid_str, - descr->page_length, - (uint64_t)descr->start_time, - (uint64_t)descr->end_time); - if (!descr->extent) { - pos += snprintfz(str + pos, BUFSIZE - pos, "N/A"); - } else { - pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset); + BUFFER *wb = buffer_create(512); + + if(!descr) { + buffer_sprintf(wb, "DBENGINE: %s : descr is NULL", msg); } + else { + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + char uuid_str[UUID_STR_LEN]; + + uuid_unparse_lower(*descr->id, uuid_str); + buffer_sprintf(wb, "DBENGINE: %s : page(%p) metric:%s, len:%"PRIu32", time:%"PRIu64"->%"PRIu64", update_every:%u, type:%u, xt_offset:", + msg, + pg_cache_descr->page, uuid_str, + descr->page_length, + (uint64_t)descr->start_time_ut, + (uint64_t)descr->end_time_ut, + (uint32_t)descr->update_every_s, + (uint32_t)descr->type + ); + if (!descr->extent) { + buffer_strcat(wb, "N/A"); + } else { + buffer_sprintf(wb, "%"PRIu64, descr->extent->offset); + } + + buffer_sprintf(wb, ", flags:0x%2.2lX refcnt:%u", pg_cache_descr->flags, pg_cache_descr->refcnt); + } + + if(log_debug) + debug(D_RRDENGINE, "%s", buffer_tostring(wb)); + else + internal_error(true, "%s", buffer_tostring(wb)); - snprintfz(str + pos, BUFSIZE - pos, " flags:0x%2.2lX refcnt:%u\n\n", pg_cache_descr->flags, pg_cache_descr->refcnt); - debug(D_RRDENGINE, "%s", str); + buffer_free(wb); } void print_page_descr(struct rrdeng_page_descr *descr) @@ -39,8 +56,8 @@ void print_page_descr(struct rrdeng_page_descr *descr) "--->len:%"PRIu32" time:%"PRIu64"->%"PRIu64" xt_offset:", uuid_str, descr->page_length, - (uint64_t)descr->start_time, - (uint64_t)descr->end_time); + (uint64_t)descr->start_time_ut, + (uint64_t)descr->end_time_ut); if (!descr->extent) { pos += snprintfz(str + pos, BUFSIZE - pos, "N/A"); } else { diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h index 32eebf10..6b1a15fb 100644 --- a/database/engine/rrdenginelib.h +++ b/database/engine/rrdenginelib.h @@ -83,10 +83,10 @@ static inline void crc32set(void *crcp, uLong crc) *(uint32_t *)crcp = crc; } -extern void print_page_cache_descr(struct rrdeng_page_descr *page_cache_descr); -extern void print_page_descr(struct rrdeng_page_descr *descr); -extern int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size); -extern int open_file_for_io(char *path, int flags, uv_file *file, int direct); +void print_page_cache_descr(struct rrdeng_page_descr *descr, const char *msg, bool log_debug); +void print_page_descr(struct rrdeng_page_descr *descr); +int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size); +int open_file_for_io(char *path, int flags, uv_file *file, int direct); static inline int open_file_direct_io(char *path, int flags, uv_file *file) { return open_file_for_io(path, flags, file, 1); @@ -95,8 +95,8 @@ static inline int open_file_buffered_io(char *path, int flags, uv_file *file) { return open_file_for_io(path, flags, file, 0); } -extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size); -extern int compute_multidb_diskspace(); -extern int is_legacy_child(const char *machine_guid); +char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t size); +int compute_multidb_diskspace(); +int is_legacy_child(const char *machine_guid); #endif /* NETDATA_RRDENGINELIB_H */ diff --git a/database/engine/rrdenglocking.h b/database/engine/rrdenglocking.h index 127ddc90..078eab38 100644 --- a/database/engine/rrdenglocking.h +++ b/database/engine/rrdenglocking.h @@ -8,10 +8,10 @@ /* Forward declarations */ struct page_cache_descr; -extern struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx); -extern void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr); -extern void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); -extern void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); -extern void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance *ctx); +void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr); +void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); +void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); #endif /* NETDATA_RRDENGLOCKING_H */
\ No newline at end of file |