diff options
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r-- | database/engine/rrdengine.c | 79 |
1 files changed, 60 insertions, 19 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 9f43f4456..8b35051d8 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -9,20 +9,28 @@ rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0; rrdeng_stats_t global_flushing_pressure_page_deletions = 0; -static unsigned pages_per_extent = MAX_PAGES_PER_EXTENT; +unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT; #if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2) #error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2) #endif void *dbengine_page_alloc() { - void *page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm); - if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()"); + void *page = NULL; + if (unlikely(db_engine_use_malloc)) + page = mallocz(RRDENG_BLOCK_SIZE); + else { + page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm); + if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()"); + } return page; } void dbengine_page_free(void *page) { - munmap(page, RRDENG_BLOCK_SIZE); + if (unlikely(db_engine_use_malloc)) + freez(page); + else + munmap(page, RRDENG_BLOCK_SIZE); } static void sanity_check(void) @@ -227,6 +235,43 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str freez(xt_io_descr); } +static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) { + switch(type) { + case PAGE_METRICS: { + storage_number n = pack_storage_number(NAN, SN_FLAG_NONE); + storage_number *array = (storage_number *)page; + size_t slots = page_length / sizeof(n); + for(size_t i = 0; i < slots ; i++) + array[i] = n; + } + break; + + case PAGE_TIER: { + storage_number_tier1_t n = { + .min_value = NAN, + .max_value = NAN, + .sum_value = NAN, + .count = 1, + .anomaly_count = 0, + }; + storage_number_tier1_t *array = (storage_number_tier1_t *)page; + size_t slots = page_length / sizeof(n); + for(size_t i = 0; i < slots ; i++) + array[i] = n; + } + break; + + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: cannot fill page with nulls on unknown page type id %d", type); + logged = true; + } + memset(page, 0, page_length); + } + } +} + void read_extent_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; @@ -351,8 +396,7 @@ after_crc_check: /* care, we don't hold the descriptor mutex */ if (have_read_error) { - /* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */ - memset(page, SN_EMPTY_SLOT, descr->page_length); + fill_page_with_nulls(page, descr->page_length, descr->type); } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length); } else { @@ -697,7 +741,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue ; - descr != NULL && count != pages_per_extent ; + descr != NULL && count != rrdeng_pages_per_extent; PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue) { @@ -773,7 +817,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i]; descr = xt_io_descr->descr_array[i]; - header->descr[i].type = PAGE_METRICS; + header->descr[i].type = descr->type; uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); header->descr[i].page_length = descr->page_length; header->descr[i].start_time = descr->start_time; @@ -879,6 +923,7 @@ static void after_delete_old_data(struct rrdengine_worker_config* wc) wc->cleanup_thread_deleting_files = 0; aclk_data_rotated(); + rrdcontext_db_rotation(); /* interrupt event loop */ uv_stop(wc->loop); @@ -1066,16 +1111,12 @@ struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc) static void load_configuration_dynamic(void) { - unsigned read_num; - static int printed_error = 0; - - read_num = (unsigned) config_get_number(CONFIG_SECTION_GLOBAL, "dbengine extent pages", - MAX_PAGES_PER_EXTENT); - if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT) { - pages_per_extent = read_num; - } else if (!printed_error) { - printed_error = 1; - error("Invalid dbengine extent pages %u given. Defaulting to %u.", read_num, pages_per_extent); + unsigned read_num = (unsigned)config_get_number(CONFIG_SECTION_DB, "dbengine pages per extent", MAX_PAGES_PER_EXTENT); + if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT) + rrdeng_pages_per_extent = read_num; + else { + error("Invalid dbengine pages per extent %u given. Using %u.", read_num, rrdeng_pages_per_extent); + config_set_number(CONFIG_SECTION_DB, "dbengine pages per extent", rrdeng_pages_per_extent); } } @@ -1335,7 +1376,7 @@ void rrdengine_main(void) struct rrdengine_instance *ctx; sanity_check(); - ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB); + ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB, 0); if (ret) { exit(ret); } |