diff options
Diffstat (limited to 'database/engine/pagecache.c')
-rw-r--r-- | database/engine/pagecache.c | 155 |
1 files changed, 110 insertions, 45 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 449138b4..a1820710 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -101,6 +101,13 @@ void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr) uv_cond_broadcast(&pg_cache_descr->cond); } +void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +{ + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_wake_up_waiters_unsafe(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); +} + /* * The caller must hold page descriptor lock. * The lock will be released and re-acquired. The descriptor is not guaranteed @@ -116,6 +123,24 @@ void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr) } /* + * The caller must hold page descriptor lock. + * The lock will be released and re-acquired. The descriptor is not guaranteed + * to exist after this function returns. + * Returns UV_ETIMEDOUT if timeout_sec seconds pass. + */ +int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec) +{ + int ret; + struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + + ++pg_cache_descr->waiters; + ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC); + --pg_cache_descr->waiters; + + return ret; +} + +/* * Returns page flags. * The lock will be released and re-acquired. The descriptor is not guaranteed * to exist after this function returns. @@ -135,10 +160,8 @@ unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_ /* * The caller must hold page descriptor lock. - * Gets a reference to the page descriptor. - * Returns 1 on success and 0 on failure. */ -int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) +int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) { struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; @@ -146,25 +169,25 @@ int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_acces (exclusive_access && pg_cache_descr->refcnt)) { return 0; } - if (exclusive_access) - pg_cache_descr->flags |= RRD_PAGE_LOCKED; - ++pg_cache_descr->refcnt; return 1; } /* * The caller must hold page descriptor lock. - * Same return values as pg_cache_try_get_unsafe() without doing anything. + * Gets a reference to the page descriptor. + * Returns 1 on success and 0 on failure. */ -int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) +int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) { struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) || - (exclusive_access && pg_cache_descr->refcnt)) { + if (!pg_cache_can_get_unsafe(descr, exclusive_access)) return 0; - } + + if (exclusive_access) + pg_cache_descr->flags |= RRD_PAGE_LOCKED; + ++pg_cache_descr->refcnt; return 1; } @@ -212,23 +235,31 @@ static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned numb /* * This function returns the maximum number of pages allowed in the page cache. - * The caller must hold the page cache lock. */ unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx) { /* it's twice the number of producers since we pin 2 pages per producer */ - return ctx->max_cache_pages + 2 * (unsigned long)ctx->stats.metric_API_producers; + return ctx->max_cache_pages + 2 * (unsigned long)ctx->metric_API_max_producers; } /* * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the * number of pages below that number. - * The caller must hold the page cache lock. */ unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx) { /* it's twice the number of producers since we pin 2 pages per producer */ - return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->stats.metric_API_producers; + return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->metric_API_max_producers; +} + +/* + * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page + * cache. + */ +unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx) +{ + /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */ + return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers; } /* @@ -370,31 +401,52 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) return 0; } -void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty) +/** + * Deletes a page from the database. + * Callers of this function need to make sure they're not deleting the same descriptor concurrently. + * @param ctx is the database instance. + * @param descr is the page descriptor. + * @param remove_dirty must be non-zero if the page to be deleted is dirty. + * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference. + * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not + * NULL. Otherwise, metric_id is not set. + * @return 1 if it's safe to delete the metric, 0 otherwise. + */ +uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty, + uint8_t is_exclusive_holder, uuid_t *metric_id) { struct page_cache *pg_cache = &ctx->pg_cache; struct page_cache_descr *pg_cache_descr = NULL; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; int ret; + uint8_t can_delete_metric = 0; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); - assert(NULL != PValue); + fatal_assert(NULL != PValue); page_index = *PValue; uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); uv_rwlock_wrlock(&page_index->lock); ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0); - uv_rwlock_wrunlock(&page_index->lock); if (unlikely(0 == ret)) { + uv_rwlock_wrunlock(&page_index->lock); error("Page under deletion was not in index."); if (unlikely(debug_flags & D_RRDENGINE)) { print_page_descr(descr); } goto destroy; } - assert(1 == ret); + --page_index->page_count; + if (!page_index->writers && !page_index->page_count) { + can_delete_metric = 1; + if (metric_id) { + memcpy(metric_id, page_index->id, sizeof(uuid_t)); + } + } + uv_rwlock_wrunlock(&page_index->lock); + fatal_assert(1 == ret); uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); ++ctx->stats.pg_cache_deletions; @@ -403,13 +455,18 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; - while (!pg_cache_try_get_unsafe(descr, 1)) { - debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); - if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); - pg_cache_wait_event_unsafe(descr); + if (!is_exclusive_holder) { + /* If we don't hold an exclusive page reference get one */ + while (!pg_cache_try_get_unsafe(descr, 1)) { + debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_wait_event_unsafe(descr); + } } - if (!remove_dirty) { + if (remove_dirty) { + pg_cache_descr->flags &= ~RRD_PAGE_DIRTY; + } else { /* even a locked page could be dirty */ while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) { debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__); @@ -429,11 +486,16 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); } pg_cache_put(ctx, descr); - - rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + rrdeng_try_deallocate_pg_cache_descr(ctx, descr); + while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { + rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */ + (void)sleep_usec(1000); /* 1 msec */ + } destroy: freez(descr); pg_cache_update_metric_times(page_index); + + return can_delete_metric; } static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time) @@ -521,7 +583,7 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index) uv_rwlock_rdunlock(&page_index->lock); if (unlikely(NULL == firstPValue)) { - assert(NULL == lastPValue); + fatal_assert(NULL == lastPValue); page_index->oldest_time = page_index->latest_time = INVALID_TIME; return; } @@ -542,7 +604,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index /* there is page cache descriptor pre-allocated state */ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED); + fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED); if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { pg_cache_reserve_pages(ctx, 1); if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY)) @@ -553,7 +615,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index if (unlikely(NULL == index)) { uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); - assert(NULL != PValue); + fatal_assert(NULL != PValue); page_index = *PValue; uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); } else { @@ -563,6 +625,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index uv_rwlock_wrlock(&page_index->lock); PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0); *PValue = descr; + ++page_index->page_count; pg_cache_add_new_metric_time(page_index, descr); uv_rwlock_wrunlock(&page_index->lock); @@ -577,7 +640,7 @@ usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr = NULL; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); @@ -617,7 +680,7 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c Word_t Index; (void)pg_cache; - assert(NULL != page_index); + fatal_assert(NULL != page_index); Index = (Word_t)(point_in_time / USEC_PER_SEC); uv_rwlock_rdlock(&page_index->lock); @@ -658,11 +721,11 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta unsigned i, j, k, preload_count, count, page_info_array_max_size; unsigned long flags; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; Word_t Index; uint8_t failed_to_reserve; - assert(NULL != ret_page_indexp); + fatal_assert(NULL != ret_page_indexp); uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); @@ -804,7 +867,7 @@ struct rrdeng_page_descr * struct page_cache_descr *pg_cache_descr = NULL; unsigned long flags; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; Word_t Index; uint8_t page_not_in_cache; @@ -911,7 +974,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index struct page_cache_descr *pg_cache_descr = NULL; unsigned long flags; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; + struct pg_cache_page_index *page_index = NULL; uint8_t page_not_in_cache; if (unlikely(NULL == index)) { @@ -1003,10 +1066,12 @@ struct pg_cache_page_index *create_page_index(uuid_t *id) page_index = mallocz(sizeof(*page_index)); page_index->JudyL_array = (Pvoid_t) NULL; uuid_copy(page_index->id, *id); - assert(0 == uv_rwlock_init(&page_index->lock)); + fatal_assert(0 == uv_rwlock_init(&page_index->lock)); page_index->oldest_time = INVALID_TIME; page_index->latest_time = INVALID_TIME; page_index->prev = NULL; + page_index->page_count = 0; + page_index->writers = 0; return page_index; } @@ -1017,7 +1082,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx) pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL; pg_cache->metrics_index.last_page_index = NULL; - assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock)); + fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock)); } static void init_replaceQ(struct rrdengine_instance *ctx) @@ -1026,7 +1091,7 @@ static void init_replaceQ(struct rrdengine_instance *ctx) pg_cache->replaceQ.head = NULL; pg_cache->replaceQ.tail = NULL; - assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock)); + fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock)); } static void init_committed_page_index(struct rrdengine_instance *ctx) @@ -1034,7 +1099,7 @@ static void init_committed_page_index(struct rrdengine_instance *ctx) struct page_cache *pg_cache = &ctx->pg_cache; pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL; - assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock)); + fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock)); pg_cache->committed_page_index.latest_corr_id = 0; pg_cache->committed_page_index.nr_committed_pages = 0; } @@ -1045,7 +1110,7 @@ void init_page_cache(struct rrdengine_instance *ctx) pg_cache->page_descriptors = 0; pg_cache->populated_pages = 0; - assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock)); + fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock)); init_metrics_index(ctx); init_replaceQ(ctx); @@ -1064,7 +1129,7 @@ void free_page_cache(struct rrdengine_instance *ctx) /* Free committed page index */ ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0); - assert(NULL == pg_cache->committed_page_index.JudyL_array); + fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array); bytes_freed += ret_Judy; for (page_index = pg_cache->metrics_index.last_page_index ; @@ -1099,14 +1164,14 @@ void free_page_cache(struct rrdengine_instance *ctx) /* Free page index */ ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0); - assert(NULL == page_index->JudyL_array); + fatal_assert(NULL == page_index->JudyL_array); bytes_freed += ret_Judy; freez(page_index); bytes_freed += sizeof(*page_index); } /* Free metrics index */ ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0); - assert(NULL == pg_cache->metrics_index.JudyHS_array); + fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array); bytes_freed += ret_Judy; info("Freed %lu bytes of memory from page cache.", bytes_freed); |