diff options
Diffstat (limited to '')
-rw-r--r-- | database/engine/pagecache.c | 284 |
1 files changed, 245 insertions, 39 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 124f2448b..1bd4c9436 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -419,6 +419,35 @@ static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec return (point_in_time >= descr->start_time && point_in_time <= descr->end_time); } +/* The caller must hold the page index lock */ +static inline struct rrdeng_page_descr * + find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time) +{ + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + Word_t Index; + + Index = (Word_t)(start_time / USEC_PER_SEC); + PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + if (is_page_in_time_range(descr, start_time, end_time)) { + return descr; + } + } + + Index = (Word_t)(start_time / USEC_PER_SEC); + PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + if (is_page_in_time_range(descr, start_time, end_time)) { + return descr; + } + } + + return NULL; +} + /* Update metric oldest and latest timestamps efficiently when adding new values */ void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr) { @@ -510,70 +539,144 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); } -/* - * Searches for a page and triggers disk I/O if necessary and possible. +usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + return INVALID_TIME; + } + + uv_rwlock_rdlock(&page_index->lock); + descr = find_first_page_in_time_range(page_index, start_time, end_time); + if (NULL == descr) { + uv_rwlock_rdunlock(&page_index->lock); + return INVALID_TIME; + } + uv_rwlock_rdunlock(&page_index->lock); + return descr->start_time; +} + +/** + * Return page information for the first page before point_in_time that satisfies the filter. + * @param ctx DB context + * @param page_index page index of a metric + * @param point_in_time the pages that are searched must be older than this timestamp + * @param filter decides if the page satisfies the caller's criteria + * @param page_info the result of the search is set in this pointer + */ +void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, + usec_t point_in_time, pg_cache_page_info_filter_t *filter, + struct rrdeng_page_info *page_info) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + Pvoid_t *PValue; + Word_t Index; + + (void)pg_cache; + assert(NULL != page_index); + + Index = (Word_t)(point_in_time / USEC_PER_SEC); + uv_rwlock_rdlock(&page_index->lock); + do { + PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0); + descr = unlikely(NULL == PValue) ? NULL : *PValue; + } while (descr != NULL && !filter(descr)); + if (unlikely(NULL == descr)) { + page_info->page_length = 0; + page_info->start_time = INVALID_TIME; + page_info->end_time = INVALID_TIME; + } else { + page_info->page_length = descr->page_length; + page_info->start_time = descr->start_time; + page_info->end_time = descr->end_time; + } + uv_rwlock_rdunlock(&page_index->lock); +} +/** + * Searches for pages in a time range and triggers disk I/O if necessary and possible. * Does not get a reference. - * Returns page index pointer for given metric UUID. + * @param ctx DB context + * @param id UUID + * @param start_time inclusive starting time in usec + * @param end_time inclusive ending time in usec + * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap + * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez(). + * If page_info_arrayp is set to NULL nothing was allocated. + * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID. + * @return the number of pages that overlap with the time range [start_time,end_time]. */ -struct pg_cache_page_index * - pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time) +unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time, + struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp) { struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES]; struct page_cache_descr *pg_cache_descr = NULL; - int i, j, k, count, found; + unsigned i, j, k, preload_count, count, page_info_array_max_size; unsigned long flags; Pvoid_t *PValue; struct pg_cache_page_index *page_index; Word_t Index; uint8_t failed_to_reserve; + assert(NULL != ret_page_indexp); + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); if (likely(NULL != PValue)) { - page_index = *PValue; + *ret_page_indexp = page_index = *PValue; } uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); if (NULL == PValue) { debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); - return NULL; + *ret_page_indexp = NULL; + return 0; } uv_rwlock_rdlock(&page_index->lock); - /* Find first page in range */ - found = 0; - Index = (Word_t)(start_time / USEC_PER_SEC); - PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); - if (likely(NULL != PValue)) { - descr = *PValue; - if (is_page_in_time_range(descr, start_time, end_time)) { - found = 1; - } - } - if (!found) { - Index = (Word_t)(start_time / USEC_PER_SEC); - PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); - if (likely(NULL != PValue)) { - descr = *PValue; - if (is_page_in_time_range(descr, start_time, end_time)) { - found = 1; - } - } - } - if (!found) { + descr = find_first_page_in_time_range(page_index, start_time, end_time); + if (NULL == descr) { uv_rwlock_rdunlock(&page_index->lock); debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); - return page_index; + *ret_page_indexp = NULL; + return 0; + } else { + Index = (Word_t)(descr->start_time / USEC_PER_SEC); + } + if (page_info_arrayp) { + page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); + *page_info_arrayp = mallocz(page_info_array_max_size); } - for (count = 0 ; - descr != NULL && is_page_in_time_range(descr, start_time, end_time); + for (count = 0, preload_count = 0 ; + descr != NULL && is_page_in_time_range(descr, start_time, end_time) ; PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue) { /* Iterate all pages in range */ if (unlikely(0 == descr->page_length)) continue; + if (page_info_arrayp) { + if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) { + page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); + *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size); + } + (*page_info_arrayp)[count].start_time = descr->start_time; + (*page_info_arrayp)[count].end_time = descr->end_time; + (*page_info_arrayp)[count].page_length = descr->page_length; + } + ++count; + rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; flags = pg_cache_descr->flags; @@ -586,8 +689,8 @@ struct pg_cache_page_index * } } if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { - preload_array[count++] = descr; - if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) { + preload_array[preload_count++] = descr; + if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) { rrdeng_page_descr_mutex_unlock(ctx, descr); break; } @@ -598,7 +701,7 @@ struct pg_cache_page_index * uv_rwlock_rdunlock(&page_index->lock); failed_to_reserve = 0; - for (i = 0 ; i < count && !failed_to_reserve ; ++i) { + for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) { struct rrdeng_cmd cmd; struct rrdeng_page_descr *next; @@ -614,7 +717,7 @@ struct pg_cache_page_index * cmd.read_extent.page_cache_descr[0] = descr; /* don't use this page again */ preload_array[i] = NULL; - for (j = 0, k = 1 ; j < count ; ++j) { + for (j = 0, k = 1 ; j < preload_count ; ++j) { next = preload_array[j]; if (NULL == next) { continue; @@ -635,7 +738,7 @@ struct pg_cache_page_index * } if (failed_to_reserve) { debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__); - for (i = 0 ; i < count ; ++i) { + for (i = 0 ; i < preload_count ; ++i) { descr = preload_array[i]; if (NULL == descr) { continue; @@ -643,11 +746,15 @@ struct pg_cache_page_index * pg_cache_put(ctx, descr); } } - if (!count) { + if (!preload_count) { /* no such page */ debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__); } - return page_index; + if (unlikely(0 == count && page_info_arrayp)) { + freez(*page_info_arrayp); + *page_info_arrayp = NULL; + } + return count; } /* @@ -757,6 +864,105 @@ struct rrdeng_page_descr * return descr; } +/* + * Searches for the first page between start_time and end_time and gets a reference. + * start_time and end_time are inclusive. + * If index is NULL lookup by UUID (id). + */ +struct rrdeng_page_descr * +pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, + usec_t start_time, usec_t end_time) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + struct rrdeng_page_descr *descr = NULL; + struct page_cache_descr *pg_cache_descr = NULL; + unsigned long flags; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + uint8_t page_not_in_cache; + + if (unlikely(NULL == index)) { + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + return NULL; + } + } else { + page_index = index; + } + pg_cache_reserve_pages(ctx, 1); + + page_not_in_cache = 0; + uv_rwlock_rdlock(&page_index->lock); + while (1) { + descr = find_first_page_in_time_range(page_index, start_time, end_time); + if (NULL == descr || 0 == descr->page_length) { + /* non-empty page not found */ + uv_rwlock_rdunlock(&page_index->lock); + + pg_cache_release_pages(ctx, 1); + return NULL; + } + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + flags = pg_cache_descr->flags; + if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { + /* success */ + rrdeng_page_descr_mutex_unlock(ctx, descr); + debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); + break; + } + if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { + struct rrdeng_cmd cmd; + + uv_rwlock_rdunlock(&page_index->lock); + + cmd.opcode = RRDENG_READ_PAGE; + cmd.read_page.page_cache_descr = descr; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { + pg_cache_wait_event_unsafe(descr); + } + /* success */ + /* Downgrade exclusive reference to allow other readers */ + pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + return descr; + } + uv_rwlock_rdunlock(&page_index->lock); + debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + if (!(flags & RRD_PAGE_POPULATED)) + page_not_in_cache = 1; + pg_cache_wait_event_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); + + /* reset scan to find again */ + uv_rwlock_rdlock(&page_index->lock); + } + uv_rwlock_rdunlock(&page_index->lock); + + if (!(flags & RRD_PAGE_DIRTY)) + pg_cache_replaceQ_set_hot(ctx, descr); + pg_cache_release_pages(ctx, 1); + if (page_not_in_cache) + rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); + else + rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1); + return descr; +} + struct pg_cache_page_index *create_page_index(uuid_t *id) { struct pg_cache_page_index *page_index; |