summaryrefslogtreecommitdiffstats
path: root/database/engine/pagecache.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/engine/pagecache.c284
1 files changed, 245 insertions, 39 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index 124f2448b..1bd4c9436 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -419,6 +419,35 @@ static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec
return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
}
+/* The caller must hold the page index lock */
+static inline struct rrdeng_page_descr *
+ find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
+{
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ Word_t Index;
+
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ return descr;
+ }
+ }
+
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ return descr;
+ }
+ }
+
+ return NULL;
+}
+
/* Update metric oldest and latest timestamps efficiently when adding new values */
void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
{
@@ -510,70 +539,144 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
}
-/*
- * Searches for a page and triggers disk I/O if necessary and possible.
+usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ return INVALID_TIME;
+ }
+
+ uv_rwlock_rdlock(&page_index->lock);
+ descr = find_first_page_in_time_range(page_index, start_time, end_time);
+ if (NULL == descr) {
+ uv_rwlock_rdunlock(&page_index->lock);
+ return INVALID_TIME;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+ return descr->start_time;
+}
+
+/**
+ * Return page information for the first page before point_in_time that satisfies the filter.
+ * @param ctx DB context
+ * @param page_index page index of a metric
+ * @param point_in_time the pages that are searched must be older than this timestamp
+ * @param filter decides if the page satisfies the caller's criteria
+ * @param page_info the result of the search is set in this pointer
+ */
+void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
+ usec_t point_in_time, pg_cache_page_info_filter_t *filter,
+ struct rrdeng_page_info *page_info)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ Word_t Index;
+
+ (void)pg_cache;
+ assert(NULL != page_index);
+
+ Index = (Word_t)(point_in_time / USEC_PER_SEC);
+ uv_rwlock_rdlock(&page_index->lock);
+ do {
+ PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ } while (descr != NULL && !filter(descr));
+ if (unlikely(NULL == descr)) {
+ page_info->page_length = 0;
+ page_info->start_time = INVALID_TIME;
+ page_info->end_time = INVALID_TIME;
+ } else {
+ page_info->page_length = descr->page_length;
+ page_info->start_time = descr->start_time;
+ page_info->end_time = descr->end_time;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+}
+/**
+ * Searches for pages in a time range and triggers disk I/O if necessary and possible.
* Does not get a reference.
- * Returns page index pointer for given metric UUID.
+ * @param ctx DB context
+ * @param id UUID
+ * @param start_time inclusive starting time in usec
+ * @param end_time inclusive ending time in usec
+ * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
+ * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
+ * If page_info_arrayp is set to NULL nothing was allocated.
+ * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
+ * @return the number of pages that overlap with the time range [start_time,end_time].
*/
-struct pg_cache_page_index *
- pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
+unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
+ struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
struct page_cache_descr *pg_cache_descr = NULL;
- int i, j, k, count, found;
+ unsigned i, j, k, preload_count, count, page_info_array_max_size;
unsigned long flags;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
Word_t Index;
uint8_t failed_to_reserve;
+ assert(NULL != ret_page_indexp);
+
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
if (likely(NULL != PValue)) {
- page_index = *PValue;
+ *ret_page_indexp = page_index = *PValue;
}
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
if (NULL == PValue) {
debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
- return NULL;
+ *ret_page_indexp = NULL;
+ return 0;
}
uv_rwlock_rdlock(&page_index->lock);
- /* Find first page in range */
- found = 0;
- Index = (Word_t)(start_time / USEC_PER_SEC);
- PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- if (is_page_in_time_range(descr, start_time, end_time)) {
- found = 1;
- }
- }
- if (!found) {
- Index = (Word_t)(start_time / USEC_PER_SEC);
- PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- if (is_page_in_time_range(descr, start_time, end_time)) {
- found = 1;
- }
- }
- }
- if (!found) {
+ descr = find_first_page_in_time_range(page_index, start_time, end_time);
+ if (NULL == descr) {
uv_rwlock_rdunlock(&page_index->lock);
debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
- return page_index;
+ *ret_page_indexp = NULL;
+ return 0;
+ } else {
+ Index = (Word_t)(descr->start_time / USEC_PER_SEC);
+ }
+ if (page_info_arrayp) {
+ page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
+ *page_info_arrayp = mallocz(page_info_array_max_size);
}
- for (count = 0 ;
- descr != NULL && is_page_in_time_range(descr, start_time, end_time);
+ for (count = 0, preload_count = 0 ;
+ descr != NULL && is_page_in_time_range(descr, start_time, end_time) ;
PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
/* Iterate all pages in range */
if (unlikely(0 == descr->page_length))
continue;
+ if (page_info_arrayp) {
+ if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) {
+ page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
+ *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
+ }
+ (*page_info_arrayp)[count].start_time = descr->start_time;
+ (*page_info_arrayp)[count].end_time = descr->end_time;
+ (*page_info_arrayp)[count].page_length = descr->page_length;
+ }
+ ++count;
+
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
flags = pg_cache_descr->flags;
@@ -586,8 +689,8 @@ struct pg_cache_page_index *
}
}
if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
- preload_array[count++] = descr;
- if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) {
+ preload_array[preload_count++] = descr;
+ if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) {
rrdeng_page_descr_mutex_unlock(ctx, descr);
break;
}
@@ -598,7 +701,7 @@ struct pg_cache_page_index *
uv_rwlock_rdunlock(&page_index->lock);
failed_to_reserve = 0;
- for (i = 0 ; i < count && !failed_to_reserve ; ++i) {
+ for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) {
struct rrdeng_cmd cmd;
struct rrdeng_page_descr *next;
@@ -614,7 +717,7 @@ struct pg_cache_page_index *
cmd.read_extent.page_cache_descr[0] = descr;
/* don't use this page again */
preload_array[i] = NULL;
- for (j = 0, k = 1 ; j < count ; ++j) {
+ for (j = 0, k = 1 ; j < preload_count ; ++j) {
next = preload_array[j];
if (NULL == next) {
continue;
@@ -635,7 +738,7 @@ struct pg_cache_page_index *
}
if (failed_to_reserve) {
debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
- for (i = 0 ; i < count ; ++i) {
+ for (i = 0 ; i < preload_count ; ++i) {
descr = preload_array[i];
if (NULL == descr) {
continue;
@@ -643,11 +746,15 @@ struct pg_cache_page_index *
pg_cache_put(ctx, descr);
}
}
- if (!count) {
+ if (!preload_count) {
/* no such page */
debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
}
- return page_index;
+ if (unlikely(0 == count && page_info_arrayp)) {
+ freez(*page_info_arrayp);
+ *page_info_arrayp = NULL;
+ }
+ return count;
}
/*
@@ -757,6 +864,105 @@ struct rrdeng_page_descr *
return descr;
}
+/*
+ * Searches for the first page between start_time and end_time and gets a reference.
+ * start_time and end_time are inclusive.
+ * If index is NULL lookup by UUID (id).
+ */
+struct rrdeng_page_descr *
+pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
+ usec_t start_time, usec_t end_time)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ unsigned long flags;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+ uint8_t page_not_in_cache;
+
+ if (unlikely(NULL == index)) {
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ return NULL;
+ }
+ } else {
+ page_index = index;
+ }
+ pg_cache_reserve_pages(ctx, 1);
+
+ page_not_in_cache = 0;
+ uv_rwlock_rdlock(&page_index->lock);
+ while (1) {
+ descr = find_first_page_in_time_range(page_index, start_time, end_time);
+ if (NULL == descr || 0 == descr->page_length) {
+ /* non-empty page not found */
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ pg_cache_release_pages(ctx, 1);
+ return NULL;
+ }
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
+ if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
+ /* success */
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+ break;
+ }
+ if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
+ struct rrdeng_cmd cmd;
+
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ cmd.opcode = RRDENG_READ_PAGE;
+ cmd.read_page.page_cache_descr = descr;
+ rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+
+ debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
+ pg_cache_wait_event_unsafe(descr);
+ }
+ /* success */
+ /* Downgrade exclusive reference to allow other readers */
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ return descr;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+ debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ if (!(flags & RRD_PAGE_POPULATED))
+ page_not_in_cache = 1;
+ pg_cache_wait_event_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ /* reset scan to find again */
+ uv_rwlock_rdlock(&page_index->lock);
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ if (!(flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_set_hot(ctx, descr);
+ pg_cache_release_pages(ctx, 1);
+ if (page_not_in_cache)
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
+ else
+ rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
+ return descr;
+}
+
struct pg_cache_page_index *create_page_index(uuid_t *id)
{
struct pg_cache_page_index *page_index;