summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c53
1 files changed, 42 insertions, 11 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 43135ff0..0c4a401c 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -9,6 +9,8 @@ rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
+static unsigned pages_per_extent = MAX_PAGES_PER_EXTENT;
+
static void sanity_check(void)
{
/* Magic numbers must fit in the super-blocks */
@@ -305,19 +307,32 @@ after_crc_check:
}
}
- for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
- page = mallocz(RRDENG_BLOCK_SIZE);
- descr = xt_io_descr->descr_array[i];
- for (j = 0, page_offset = 0; j < count; ++j) {
+ for (i = 0, page_offset = 0; i < count; page_offset += header->descr[i++].page_length) {
+ uint8_t is_prefetched_page;
+ descr = NULL;
+ for (j = 0 ; j < xt_io_descr->descr_count; ++j) {
+ struct rrdeng_page_descr *descrj;
+
+ descrj = xt_io_descr->descr_array[j];
/* care, we don't hold the descriptor mutex */
- if (!uuid_compare(*(uuid_t *) header->descr[j].uuid, *descr->id) &&
- header->descr[j].page_length == descr->page_length &&
- header->descr[j].start_time == descr->start_time &&
- header->descr[j].end_time == descr->end_time) {
+ if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj->id) &&
+ header->descr[i].page_length == descrj->page_length &&
+ header->descr[i].start_time == descrj->start_time &&
+ header->descr[i].end_time == descrj->end_time) {
+ descr = descrj;
break;
}
- page_offset += header->descr[j].page_length;
}
+ is_prefetched_page = 0;
+ if (!descr) { /* This extent page has not been requested. Try populating it for locality (best effort). */
+ descr = pg_cache_lookup_unpopulated_and_lock(ctx, (uuid_t *)header->descr[i].uuid,
+ header->descr[i].start_time);
+ if (!descr)
+ continue; /* Failed to reserve a suitable page */
+ is_prefetched_page = 1;
+ }
+ page = mallocz(RRDENG_BLOCK_SIZE);
+
/* care, we don't hold the descriptor mutex */
if (have_read_error) {
/* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */
@@ -334,7 +349,7 @@ after_crc_check:
pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
rrdeng_page_descr_mutex_unlock(ctx, descr);
pg_cache_replaceQ_insert(ctx, descr);
- if (xt_io_descr->release_descr) {
+ if (xt_io_descr->release_descr || is_prefetched_page) {
pg_cache_put(ctx, descr);
} else {
debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
@@ -666,7 +681,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue ;
- descr != NULL && count != MAX_PAGES_PER_EXTENT ;
+ descr != NULL && count != pages_per_extent ;
PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
@@ -1031,6 +1046,21 @@ struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc)
return ret;
}
+static void load_configuration_dynamic(void)
+{
+ unsigned read_num;
+ static int printed_error = 0;
+
+ read_num = (unsigned) config_get_number(CONFIG_SECTION_GLOBAL, "dbengine extent pages",
+ MAX_PAGES_PER_EXTENT);
+ if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT) {
+ pages_per_extent = read_num;
+ } else if (!printed_error) {
+ printed_error = 1;
+ error("Invalid dbengine extent pages %u given. Defaulting to %u.", read_num, pages_per_extent);
+ }
+}
+
void async_cb(uv_async_t *handle)
{
uv_stop(handle->loop);
@@ -1084,6 +1114,7 @@ void timer_cb(uv_timer_t* handle)
}
}
}
+ load_configuration_dynamic();
#ifdef NETDATA_INTERNAL_CHECKS
{
char buf[4096];