summaryrefslogtreecommitdiffstats
path: root/database/engine/pagecache.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/pagecache.c')
-rw-r--r--database/engine/pagecache.c155
1 files changed, 110 insertions, 45 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index 449138b4..a1820710 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -101,6 +101,13 @@ void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
uv_cond_broadcast(&pg_cache_descr->cond);
}
+void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+}
+
/*
* The caller must hold page descriptor lock.
* The lock will be released and re-acquired. The descriptor is not guaranteed
@@ -116,6 +123,24 @@ void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
}
/*
+ * The caller must hold page descriptor lock.
+ * The lock will be released and re-acquired. The descriptor is not guaranteed
+ * to exist after this function returns.
+ * Returns UV_ETIMEDOUT if timeout_sec seconds pass.
+ */
+int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec)
+{
+ int ret;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ ++pg_cache_descr->waiters;
+ ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC);
+ --pg_cache_descr->waiters;
+
+ return ret;
+}
+
+/*
* Returns page flags.
* The lock will be released and re-acquired. The descriptor is not guaranteed
* to exist after this function returns.
@@ -135,10 +160,8 @@ unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_
/*
* The caller must hold page descriptor lock.
- * Gets a reference to the page descriptor.
- * Returns 1 on success and 0 on failure.
*/
-int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
+int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
@@ -146,25 +169,25 @@ int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_acces
(exclusive_access && pg_cache_descr->refcnt)) {
return 0;
}
- if (exclusive_access)
- pg_cache_descr->flags |= RRD_PAGE_LOCKED;
- ++pg_cache_descr->refcnt;
return 1;
}
/*
* The caller must hold page descriptor lock.
- * Same return values as pg_cache_try_get_unsafe() without doing anything.
+ * Gets a reference to the page descriptor.
+ * Returns 1 on success and 0 on failure.
*/
-int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
+int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && pg_cache_descr->refcnt)) {
+ if (!pg_cache_can_get_unsafe(descr, exclusive_access))
return 0;
- }
+
+ if (exclusive_access)
+ pg_cache_descr->flags |= RRD_PAGE_LOCKED;
+ ++pg_cache_descr->refcnt;
return 1;
}
@@ -212,23 +235,31 @@ static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned numb
/*
* This function returns the maximum number of pages allowed in the page cache.
- * The caller must hold the page cache lock.
*/
unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
{
/* it's twice the number of producers since we pin 2 pages per producer */
- return ctx->max_cache_pages + 2 * (unsigned long)ctx->stats.metric_API_producers;
+ return ctx->max_cache_pages + 2 * (unsigned long)ctx->metric_API_max_producers;
}
/*
* This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
* number of pages below that number.
- * The caller must hold the page cache lock.
*/
unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
{
/* it's twice the number of producers since we pin 2 pages per producer */
- return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->stats.metric_API_producers;
+ return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->metric_API_max_producers;
+}
+
+/*
+ * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
+ * cache.
+ */
+unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
+{
+ /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */
+ return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
}
/*
@@ -370,31 +401,52 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
return 0;
}
-void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty)
+/**
+ * Deletes a page from the database.
+ * Callers of this function need to make sure they're not deleting the same descriptor concurrently.
+ * @param ctx is the database instance.
+ * @param descr is the page descriptor.
+ * @param remove_dirty must be non-zero if the page to be deleted is dirty.
+ * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference.
+ * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not
+ * NULL. Otherwise, metric_id is not set.
+ * @return 1 if it's safe to delete the metric, 0 otherwise.
+ */
+uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
+ uint8_t is_exclusive_holder, uuid_t *metric_id)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct page_cache_descr *pg_cache_descr = NULL;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
int ret;
+ uint8_t can_delete_metric = 0;
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
- assert(NULL != PValue);
+ fatal_assert(NULL != PValue);
page_index = *PValue;
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
uv_rwlock_wrlock(&page_index->lock);
ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
- uv_rwlock_wrunlock(&page_index->lock);
if (unlikely(0 == ret)) {
+ uv_rwlock_wrunlock(&page_index->lock);
error("Page under deletion was not in index.");
if (unlikely(debug_flags & D_RRDENGINE)) {
print_page_descr(descr);
}
goto destroy;
}
- assert(1 == ret);
+ --page_index->page_count;
+ if (!page_index->writers && !page_index->page_count) {
+ can_delete_metric = 1;
+ if (metric_id) {
+ memcpy(metric_id, page_index->id, sizeof(uuid_t));
+ }
+ }
+ uv_rwlock_wrunlock(&page_index->lock);
+ fatal_assert(1 == ret);
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
++ctx->stats.pg_cache_deletions;
@@ -403,13 +455,18 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
- while (!pg_cache_try_get_unsafe(descr, 1)) {
- debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- pg_cache_wait_event_unsafe(descr);
+ if (!is_exclusive_holder) {
+ /* If we don't hold an exclusive page reference get one */
+ while (!pg_cache_try_get_unsafe(descr, 1)) {
+ debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
}
- if (!remove_dirty) {
+ if (remove_dirty) {
+ pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
+ } else {
/* even a locked page could be dirty */
while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
@@ -429,11 +486,16 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
}
pg_cache_put(ctx, descr);
-
- rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+ while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
+ (void)sleep_usec(1000); /* 1 msec */
+ }
destroy:
freez(descr);
pg_cache_update_metric_times(page_index);
+
+ return can_delete_metric;
}
static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
@@ -521,7 +583,7 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
uv_rwlock_rdunlock(&page_index->lock);
if (unlikely(NULL == firstPValue)) {
- assert(NULL == lastPValue);
+ fatal_assert(NULL == lastPValue);
page_index->oldest_time = page_index->latest_time = INVALID_TIME;
return;
}
@@ -542,7 +604,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
/* there is page cache descriptor pre-allocated state */
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
+ fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
pg_cache_reserve_pages(ctx, 1);
if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
@@ -553,7 +615,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
if (unlikely(NULL == index)) {
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
- assert(NULL != PValue);
+ fatal_assert(NULL != PValue);
page_index = *PValue;
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
} else {
@@ -563,6 +625,7 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
uv_rwlock_wrlock(&page_index->lock);
PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
*PValue = descr;
+ ++page_index->page_count;
pg_cache_add_new_metric_time(page_index, descr);
uv_rwlock_wrunlock(&page_index->lock);
@@ -577,7 +640,7 @@ usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id,
struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr = NULL;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
@@ -617,7 +680,7 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c
Word_t Index;
(void)pg_cache;
- assert(NULL != page_index);
+ fatal_assert(NULL != page_index);
Index = (Word_t)(point_in_time / USEC_PER_SEC);
uv_rwlock_rdlock(&page_index->lock);
@@ -658,11 +721,11 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta
unsigned i, j, k, preload_count, count, page_info_array_max_size;
unsigned long flags;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
Word_t Index;
uint8_t failed_to_reserve;
- assert(NULL != ret_page_indexp);
+ fatal_assert(NULL != ret_page_indexp);
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
@@ -804,7 +867,7 @@ struct rrdeng_page_descr *
struct page_cache_descr *pg_cache_descr = NULL;
unsigned long flags;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
Word_t Index;
uint8_t page_not_in_cache;
@@ -911,7 +974,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index
struct page_cache_descr *pg_cache_descr = NULL;
unsigned long flags;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
+ struct pg_cache_page_index *page_index = NULL;
uint8_t page_not_in_cache;
if (unlikely(NULL == index)) {
@@ -1003,10 +1066,12 @@ struct pg_cache_page_index *create_page_index(uuid_t *id)
page_index = mallocz(sizeof(*page_index));
page_index->JudyL_array = (Pvoid_t) NULL;
uuid_copy(page_index->id, *id);
- assert(0 == uv_rwlock_init(&page_index->lock));
+ fatal_assert(0 == uv_rwlock_init(&page_index->lock));
page_index->oldest_time = INVALID_TIME;
page_index->latest_time = INVALID_TIME;
page_index->prev = NULL;
+ page_index->page_count = 0;
+ page_index->writers = 0;
return page_index;
}
@@ -1017,7 +1082,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx)
pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
pg_cache->metrics_index.last_page_index = NULL;
- assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
}
static void init_replaceQ(struct rrdengine_instance *ctx)
@@ -1026,7 +1091,7 @@ static void init_replaceQ(struct rrdengine_instance *ctx)
pg_cache->replaceQ.head = NULL;
pg_cache->replaceQ.tail = NULL;
- assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
}
static void init_committed_page_index(struct rrdengine_instance *ctx)
@@ -1034,7 +1099,7 @@ static void init_committed_page_index(struct rrdengine_instance *ctx)
struct page_cache *pg_cache = &ctx->pg_cache;
pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
- assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
pg_cache->committed_page_index.latest_corr_id = 0;
pg_cache->committed_page_index.nr_committed_pages = 0;
}
@@ -1045,7 +1110,7 @@ void init_page_cache(struct rrdengine_instance *ctx)
pg_cache->page_descriptors = 0;
pg_cache->populated_pages = 0;
- assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
init_metrics_index(ctx);
init_replaceQ(ctx);
@@ -1064,7 +1129,7 @@ void free_page_cache(struct rrdengine_instance *ctx)
/* Free committed page index */
ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
- assert(NULL == pg_cache->committed_page_index.JudyL_array);
+ fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array);
bytes_freed += ret_Judy;
for (page_index = pg_cache->metrics_index.last_page_index ;
@@ -1099,14 +1164,14 @@ void free_page_cache(struct rrdengine_instance *ctx)
/* Free page index */
ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0);
- assert(NULL == page_index->JudyL_array);
+ fatal_assert(NULL == page_index->JudyL_array);
bytes_freed += ret_Judy;
freez(page_index);
bytes_freed += sizeof(*page_index);
}
/* Free metrics index */
ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
- assert(NULL == pg_cache->metrics_index.JudyHS_array);
+ fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array);
bytes_freed += ret_Judy;
info("Freed %lu bytes of memory from page cache.", bytes_freed);