summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/pagecache.c68
-rw-r--r--database/engine/pagecache.h2
-rw-r--r--database/engine/rrdengine.c53
-rw-r--r--database/engine/rrdengine.h20
-rwxr-xr-xdatabase/engine/rrdengineapi.c30
-rw-r--r--database/engine/rrdengineapi.h1
6 files changed, 155 insertions, 19 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index a18207100..d7698de01 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -699,6 +699,74 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c
}
uv_rwlock_rdunlock(&page_index->lock);
}
+
+/**
+ * Searches for an unallocated page without triggering disk I/O. Attempts to reserve the page and get a reference.
+ * @param ctx DB context
+ * @param id lookup by UUID
+ * @param start_time exact starting time in usec
+ * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
+ * @return the page descriptor or NULL on failure. It can fail if:
+ * 1. The page is already allocated to the page cache.
+ * 2. It did not succeed to get a reference.
+ * 3. It did not succeed to reserve a spot in the page cache.
+ */
+struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
+ usec_t start_time)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+ Word_t Index;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+
+ if ((NULL == PValue) || !pg_cache_try_reserve_pages(ctx, 1)) {
+ /* Failed to find page or failed to reserve a spot in the cache */
+ return NULL;
+ }
+
+ uv_rwlock_rdlock(&page_index->lock);
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLGet(page_index->JudyL_array, Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ }
+ if (NULL == PValue || 0 == descr->page_length) {
+ /* Failed to find non-empty page */
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ pg_cache_release_pages(ctx, 1);
+ return NULL;
+ }
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if ((flags & RRD_PAGE_POPULATED) || !pg_cache_try_get_unsafe(descr, 1)) {
+ /* Failed to get reference or page is already populated */
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ pg_cache_release_pages(ctx, 1);
+ return NULL;
+ }
+ /* success */
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+
+ return descr;
+}
+
/**
* Searches for pages in a time range and triggers disk I/O if necessary and possible.
* Does not get a reference.
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index 31e9739da..d5350ef56 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -172,6 +172,8 @@ extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid
extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
usec_t point_in_time, pg_cache_page_info_filter_t *filter,
struct rrdeng_page_info *page_info);
+extern struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
+ usec_t start_time);
extern unsigned
pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp);
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 43135ff01..0c4a401cb 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -9,6 +9,8 @@ rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
+static unsigned pages_per_extent = MAX_PAGES_PER_EXTENT;
+
static void sanity_check(void)
{
/* Magic numbers must fit in the super-blocks */
@@ -305,19 +307,32 @@ after_crc_check:
}
}
- for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
- page = mallocz(RRDENG_BLOCK_SIZE);
- descr = xt_io_descr->descr_array[i];
- for (j = 0, page_offset = 0; j < count; ++j) {
+ for (i = 0, page_offset = 0; i < count; page_offset += header->descr[i++].page_length) {
+ uint8_t is_prefetched_page;
+ descr = NULL;
+ for (j = 0 ; j < xt_io_descr->descr_count; ++j) {
+ struct rrdeng_page_descr *descrj;
+
+ descrj = xt_io_descr->descr_array[j];
/* care, we don't hold the descriptor mutex */
- if (!uuid_compare(*(uuid_t *) header->descr[j].uuid, *descr->id) &&
- header->descr[j].page_length == descr->page_length &&
- header->descr[j].start_time == descr->start_time &&
- header->descr[j].end_time == descr->end_time) {
+ if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj->id) &&
+ header->descr[i].page_length == descrj->page_length &&
+ header->descr[i].start_time == descrj->start_time &&
+ header->descr[i].end_time == descrj->end_time) {
+ descr = descrj;
break;
}
- page_offset += header->descr[j].page_length;
}
+ is_prefetched_page = 0;
+ if (!descr) { /* This extent page has not been requested. Try populating it for locality (best effort). */
+ descr = pg_cache_lookup_unpopulated_and_lock(ctx, (uuid_t *)header->descr[i].uuid,
+ header->descr[i].start_time);
+ if (!descr)
+ continue; /* Failed to reserve a suitable page */
+ is_prefetched_page = 1;
+ }
+ page = mallocz(RRDENG_BLOCK_SIZE);
+
/* care, we don't hold the descriptor mutex */
if (have_read_error) {
/* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */
@@ -334,7 +349,7 @@ after_crc_check:
pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
rrdeng_page_descr_mutex_unlock(ctx, descr);
pg_cache_replaceQ_insert(ctx, descr);
- if (xt_io_descr->release_descr) {
+ if (xt_io_descr->release_descr || is_prefetched_page) {
pg_cache_put(ctx, descr);
} else {
debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
@@ -666,7 +681,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue ;
- descr != NULL && count != MAX_PAGES_PER_EXTENT ;
+ descr != NULL && count != pages_per_extent ;
PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
@@ -1031,6 +1046,21 @@ struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc)
return ret;
}
+static void load_configuration_dynamic(void)
+{
+ unsigned read_num;
+ static int printed_error = 0;
+
+ read_num = (unsigned) config_get_number(CONFIG_SECTION_GLOBAL, "dbengine extent pages",
+ MAX_PAGES_PER_EXTENT);
+ if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT) {
+ pages_per_extent = read_num;
+ } else if (!printed_error) {
+ printed_error = 1;
+ error("Invalid dbengine extent pages %u given. Defaulting to %u.", read_num, pages_per_extent);
+ }
+}
+
void async_cb(uv_async_t *handle)
{
uv_stop(handle->loop);
@@ -1084,6 +1114,7 @@ void timer_cb(uv_timer_t* handle)
}
}
}
+ load_configuration_dynamic();
#ifdef NETDATA_INTERNAL_CHECKS
{
char buf[4096];
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 87af04bff..2d48665f8 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -56,16 +56,20 @@ enum rrdeng_opcode {
RRDENG_MAX_OPCODE
};
+struct rrdeng_read_page {
+ struct rrdeng_page_descr *page_cache_descr;
+};
+
+struct rrdeng_read_extent {
+ struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
+ int page_count;
+};
+
struct rrdeng_cmd {
enum rrdeng_opcode opcode;
union {
- struct rrdeng_read_page {
- struct rrdeng_page_descr *page_cache_descr;
- } read_page;
- struct rrdeng_read_extent {
- struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
- int page_count;
- } read_extent;
+ struct rrdeng_read_page read_page;
+ struct rrdeng_read_extent read_extent;
struct completion *completion;
};
};
@@ -230,4 +234,4 @@ extern void rrdeng_worker(void* arg);
extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
extern struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc);
-#endif /* NETDATA_RRDENGINE_H */ \ No newline at end of file
+#endif /* NETDATA_RRDENGINE_H */
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 7b2ff5b72..cb46e06e3 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -691,6 +691,36 @@ time_t rrdeng_metric_oldest_time(RRDDIM *rd)
return page_index->oldest_time / USEC_PER_SEC;
}
+int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t)
+{
+ struct page_cache *pg_cache;
+ struct rrdengine_instance *ctx;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index = NULL;
+
+ ctx = get_rrdeng_ctx_from_host(localhost);
+ if (unlikely(!ctx)) {
+ error("Failed to fetch multidb context");
+ return 1;
+ }
+ pg_cache = &ctx->pg_cache;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+
+ if (likely(page_index)) {
+ *first_entry_t = page_index->oldest_time / USEC_PER_SEC;
+ *last_entry_t = page_index->latest_time / USEC_PER_SEC;
+ return 0;
+ }
+
+ return 1;
+}
+
/* Also gets a reference for the page */
void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr)
{
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index 41375b980..00e55e662 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -59,5 +59,6 @@ extern int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *db
extern int rrdeng_exit(struct rrdengine_instance *ctx);
extern void rrdeng_prepare_exit(struct rrdengine_instance *ctx);
+extern int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t);
#endif /* NETDATA_RRDENGINEAPI_H */