summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c79
1 files changed, 60 insertions, 19 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 9f43f445..8b35051d 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -9,20 +9,28 @@ rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
-static unsigned pages_per_extent = MAX_PAGES_PER_EXTENT;
+unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT;
#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2)
#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2)
#endif
void *dbengine_page_alloc() {
- void *page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm);
- if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()");
+ void *page = NULL;
+ if (unlikely(db_engine_use_malloc))
+ page = mallocz(RRDENG_BLOCK_SIZE);
+ else {
+ page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm);
+ if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()");
+ }
return page;
}
void dbengine_page_free(void *page) {
- munmap(page, RRDENG_BLOCK_SIZE);
+ if (unlikely(db_engine_use_malloc))
+ freez(page);
+ else
+ munmap(page, RRDENG_BLOCK_SIZE);
}
static void sanity_check(void)
@@ -227,6 +235,43 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str
freez(xt_io_descr);
}
+static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) {
+ switch(type) {
+ case PAGE_METRICS: {
+ storage_number n = pack_storage_number(NAN, SN_FLAG_NONE);
+ storage_number *array = (storage_number *)page;
+ size_t slots = page_length / sizeof(n);
+ for(size_t i = 0; i < slots ; i++)
+ array[i] = n;
+ }
+ break;
+
+ case PAGE_TIER: {
+ storage_number_tier1_t n = {
+ .min_value = NAN,
+ .max_value = NAN,
+ .sum_value = NAN,
+ .count = 1,
+ .anomaly_count = 0,
+ };
+ storage_number_tier1_t *array = (storage_number_tier1_t *)page;
+ size_t slots = page_length / sizeof(n);
+ for(size_t i = 0; i < slots ; i++)
+ array[i] = n;
+ }
+ break;
+
+ default: {
+ static bool logged = false;
+ if(!logged) {
+ error("DBENGINE: cannot fill page with nulls on unknown page type id %d", type);
+ logged = true;
+ }
+ memset(page, 0, page_length);
+ }
+ }
+}
+
void read_extent_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
@@ -351,8 +396,7 @@ after_crc_check:
/* care, we don't hold the descriptor mutex */
if (have_read_error) {
- /* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */
- memset(page, SN_EMPTY_SLOT, descr->page_length);
+ fill_page_with_nulls(page, descr->page_length, descr->type);
} else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
(void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length);
} else {
@@ -697,7 +741,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue ;
- descr != NULL && count != pages_per_extent ;
+ descr != NULL && count != rrdeng_pages_per_extent;
PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
@@ -773,7 +817,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i];
descr = xt_io_descr->descr_array[i];
- header->descr[i].type = PAGE_METRICS;
+ header->descr[i].type = descr->type;
uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
header->descr[i].page_length = descr->page_length;
header->descr[i].start_time = descr->start_time;
@@ -879,6 +923,7 @@ static void after_delete_old_data(struct rrdengine_worker_config* wc)
wc->cleanup_thread_deleting_files = 0;
aclk_data_rotated();
+ rrdcontext_db_rotation();
/* interrupt event loop */
uv_stop(wc->loop);
@@ -1066,16 +1111,12 @@ struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc)
static void load_configuration_dynamic(void)
{
- unsigned read_num;
- static int printed_error = 0;
-
- read_num = (unsigned) config_get_number(CONFIG_SECTION_GLOBAL, "dbengine extent pages",
- MAX_PAGES_PER_EXTENT);
- if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT) {
- pages_per_extent = read_num;
- } else if (!printed_error) {
- printed_error = 1;
- error("Invalid dbengine extent pages %u given. Defaulting to %u.", read_num, pages_per_extent);
+ unsigned read_num = (unsigned)config_get_number(CONFIG_SECTION_DB, "dbengine pages per extent", MAX_PAGES_PER_EXTENT);
+ if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT)
+ rrdeng_pages_per_extent = read_num;
+ else {
+ error("Invalid dbengine pages per extent %u given. Using %u.", read_num, rrdeng_pages_per_extent);
+ config_set_number(CONFIG_SECTION_DB, "dbengine pages per extent", rrdeng_pages_per_extent);
}
}
@@ -1335,7 +1376,7 @@ void rrdengine_main(void)
struct rrdengine_instance *ctx;
sanity_check();
- ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB);
+ ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB, 0);
if (ret) {
exit(ret);
}