summaryrefslogtreecommitdiffstats
path: root/database/engine/pagecache.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/engine/pagecache.c317
1 files changed, 200 insertions, 117 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index c90947a67..124f2448b 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -8,28 +8,29 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
/* always inserts into tail */
static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
if (likely(NULL != pg_cache->replaceQ.tail)) {
- descr->prev = pg_cache->replaceQ.tail;
- pg_cache->replaceQ.tail->next = descr;
+ pg_cache_descr->prev = pg_cache->replaceQ.tail;
+ pg_cache->replaceQ.tail->next = pg_cache_descr;
}
if (unlikely(NULL == pg_cache->replaceQ.head)) {
- pg_cache->replaceQ.head = descr;
+ pg_cache->replaceQ.head = pg_cache_descr;
}
- pg_cache->replaceQ.tail = descr;
+ pg_cache->replaceQ.tail = pg_cache_descr;
}
static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *prev, *next;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
- prev = descr->prev;
- next = descr->next;
+ prev = pg_cache_descr->prev;
+ next = pg_cache_descr->next;
if (likely(NULL != prev)) {
prev->next = next;
@@ -37,17 +38,17 @@ static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ct
if (likely(NULL != next)) {
next->prev = prev;
}
- if (unlikely(descr == pg_cache->replaceQ.head)) {
+ if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
pg_cache->replaceQ.head = next;
}
- if (unlikely(descr == pg_cache->replaceQ.tail)) {
+ if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
pg_cache->replaceQ.tail = prev;
}
- descr->prev = descr->next = NULL;
+ pg_cache_descr->prev = pg_cache_descr->next = NULL;
}
void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -57,7 +58,7 @@ void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
}
void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -66,7 +67,7 @@ void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
}
void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -76,40 +77,28 @@ void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
}
-struct rrdeng_page_cache_descr *pg_cache_create_descr(void)
+struct rrdeng_page_descr *pg_cache_create_descr(void)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
descr = mallocz(sizeof(*descr));
- descr->page = NULL;
descr->page_length = 0;
descr->start_time = INVALID_TIME;
descr->end_time = INVALID_TIME;
descr->id = NULL;
descr->extent = NULL;
- descr->flags = 0;
- descr->prev = descr->next = descr->private = NULL;
- descr->refcnt = 0;
- descr->waiters = 0;
- descr->handle = NULL;
- assert(0 == uv_cond_init(&descr->cond));
- assert(0 == uv_mutex_init(&descr->mutex));
+ descr->pg_cache_descr_state = 0;
+ descr->pg_cache_descr = NULL;
return descr;
}
-void pg_cache_destroy_descr(struct rrdeng_page_cache_descr *descr)
-{
- uv_cond_destroy(&descr->cond);
- uv_mutex_destroy(&descr->mutex);
- free(descr);
-}
-
/* The caller must hold page descriptor lock. */
-void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr)
+void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
{
- if (descr->waiters)
- uv_cond_broadcast(&descr->cond);
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ if (pg_cache_descr->waiters)
+ uv_cond_broadcast(&pg_cache_descr->cond);
}
/*
@@ -117,11 +106,13 @@ void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr)
* The lock will be released and re-acquired. The descriptor is not guaranteed
* to exist after this function returns.
*/
-void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr)
+void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
{
- ++descr->waiters;
- uv_cond_wait(&descr->cond, &descr->mutex);
- --descr->waiters;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ ++pg_cache_descr->waiters;
+ uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
+ --pg_cache_descr->waiters;
}
/*
@@ -129,14 +120,15 @@ void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr)
* The lock will be released and re-acquired. The descriptor is not guaranteed
* to exist after this function returns.
*/
-unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr)
+unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
unsigned long flags;
- uv_mutex_lock(&descr->mutex);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_wait_event_unsafe(descr);
- flags = descr->flags;
- uv_mutex_unlock(&descr->mutex);
+ flags = pg_cache_descr->flags;
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
return flags;
}
@@ -146,15 +138,17 @@ unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr)
* Gets a reference to the page descriptor.
* Returns 1 on success and 0 on failure.
*/
-int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
+int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
- if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && descr->refcnt)) {
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && pg_cache_descr->refcnt)) {
return 0;
}
if (exclusive_access)
- descr->flags |= RRD_PAGE_LOCKED;
- ++descr->refcnt;
+ pg_cache_descr->flags |= RRD_PAGE_LOCKED;
+ ++pg_cache_descr->refcnt;
return 1;
}
@@ -163,10 +157,12 @@ int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive
* The caller must hold page descriptor lock.
* Same return values as pg_cache_try_get_unsafe() without doing anything.
*/
-int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
+int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
- if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && descr->refcnt)) {
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
+ (exclusive_access && pg_cache_descr->refcnt)) {
return 0;
}
@@ -177,23 +173,24 @@ int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive
* The caller must hold the page descriptor lock.
* This function may block doing cleanup.
*/
-void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr)
+void pg_cache_put_unsafe(struct rrdeng_page_descr *descr)
{
- descr->flags &= ~RRD_PAGE_LOCKED;
- if (0 == --descr->refcnt) {
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
+ if (0 == --pg_cache_descr->refcnt) {
pg_cache_wake_up_waiters_unsafe(descr);
}
- /* TODO: perform cleanup */
}
/*
* This function may block doing cleanup.
*/
-void pg_cache_put(struct rrdeng_page_cache_descr *descr)
+void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
- uv_mutex_lock(&descr->mutex);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_put_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
}
/* The caller must hold the page cache lock */
@@ -224,7 +221,7 @@ static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned numb
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
if (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1)
- debug(D_RRDENGINE, "=================================\nPage cache full. Reserving %u pages.\n=================================",
+ debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
number);
while (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) {
if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
@@ -266,7 +263,7 @@ static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned n
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
if (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1) {
debug(D_RRDENGINE,
- "=================================\nPage cache full. Trying to reserve %u pages.\n=================================",
+ "==Page cache full. Trying to reserve %u pages.==",
number);
do {
if (!pg_cache_try_evict_one_page_unsafe(ctx))
@@ -286,18 +283,20 @@ static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned n
}
/* The caller must hold the page cache and the page descriptor locks in that order */
-static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
+static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
- free(descr->page);
- descr->page = NULL;
- descr->flags &= ~RRD_PAGE_POPULATED;
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+
+ freez(pg_cache_descr->page);
+ pg_cache_descr->page = NULL;
+ pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
pg_cache_release_pages_unsafe(ctx, 1);
++ctx->stats.pg_cache_evictions;
}
/*
* The caller must hold the page cache lock.
- * Lock order: page cache -> replaceQ -> descriptor
+ * Lock order: page cache -> replaceQ -> page descriptor
* This function iterates all pages and tries to evict one.
* If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
* or it sets it to NULL if no write-back is in progress.
@@ -308,36 +307,40 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
{
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned long old_flags;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr = NULL;
uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
- for (descr = pg_cache->replaceQ.head ; NULL != descr ; descr = descr->next) {
- uv_mutex_lock(&descr->mutex);
- old_flags = descr->flags;
+ for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) {
+ descr = pg_cache_descr->descr;
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ old_flags = pg_cache_descr->flags;
if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
/* must evict */
pg_cache_evict_unsafe(ctx, descr);
pg_cache_put_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
pg_cache_replaceQ_delete_unsafe(ctx, descr);
+
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+
return 1;
}
- uv_mutex_unlock(&descr->mutex);
- };
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
/* failed to evict */
return 0;
}
-/*
- * TODO: last waiter frees descriptor ?
- */
-void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
+void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty)
{
struct page_cache *pg_cache = &ctx->pg_cache;
+ struct page_cache_descr *pg_cache_descr = NULL;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
int ret;
@@ -353,8 +356,9 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach
uv_rwlock_wrunlock(&page_index->lock);
if (unlikely(0 == ret)) {
error("Page under deletion was not in index.");
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
+ if (unlikely(debug_flags & D_RRDENGINE)) {
+ print_page_descr(descr);
+ }
goto destroy;
}
assert(1 == ret);
@@ -364,23 +368,26 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach
--pg_cache->page_descriptors;
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- uv_mutex_lock(&descr->mutex);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
while (!pg_cache_try_get_unsafe(descr, 1)) {
debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- pg_cache_wait_event_unsafe(descr);
- }
- /* even a locked page could be dirty */
- while (unlikely(descr->flags & RRD_PAGE_DIRTY)) {
- debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
if (unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
pg_cache_wait_event_unsafe(descr);
}
- uv_mutex_unlock(&descr->mutex);
+ if (!remove_dirty) {
+ /* even a locked page could be dirty */
+ while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
+ debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
- if (descr->flags & RRD_PAGE_POPULATED) {
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
/* only after locking can it be safely deleted from LRU */
pg_cache_replaceQ_delete(ctx, descr);
@@ -388,13 +395,15 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cach
pg_cache_evict_unsafe(ctx, descr);
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
}
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
+
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
destroy:
- pg_cache_destroy_descr(descr);
+ freez(descr);
pg_cache_update_metric_times(page_index);
}
-static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, usec_t start_time, usec_t end_time)
+static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
{
usec_t pg_start, pg_end;
@@ -405,13 +414,13 @@ static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, u
(pg_start >= start_time && pg_start <= end_time);
}
-static inline int is_point_in_time_in_page(struct rrdeng_page_cache_descr *descr, usec_t point_in_time)
+static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
{
return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
}
/* Update metric oldest and latest timestamps efficiently when adding new values */
-void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr)
+void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
{
usec_t oldest_time = page_index->oldest_time;
usec_t latest_time = page_index->latest_time;
@@ -429,7 +438,7 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
{
Pvoid_t *firstPValue, *lastPValue;
Word_t firstIndex, lastIndex;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
usec_t oldest_time = INVALID_TIME;
usec_t latest_time = INVALID_TIME;
@@ -460,16 +469,23 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
/* If index is NULL lookup by UUID (descr->id) */
void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
- struct rrdeng_page_cache_descr *descr)
+ struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
+ unsigned long pg_cache_descr_state = descr->pg_cache_descr_state;
+
+ if (0 != pg_cache_descr_state) {
+ /* there is page cache descriptor pre-allocated state */
+ struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- if (descr->flags & RRD_PAGE_POPULATED) {
- pg_cache_reserve_pages(ctx, 1);
- if (!(descr->flags & RRD_PAGE_DIRTY))
- pg_cache_replaceQ_insert(ctx, descr);
+ assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ pg_cache_reserve_pages(ctx, 1);
+ if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
+ pg_cache_replaceQ_insert(ctx, descr);
+ }
}
if (unlikely(NULL == index)) {
@@ -503,7 +519,8 @@ struct pg_cache_page_index *
pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
{
struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
+ struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
+ struct page_cache_descr *pg_cache_descr = NULL;
int i, j, k, count, found;
unsigned long flags;
Pvoid_t *PValue;
@@ -557,12 +574,13 @@ struct pg_cache_page_index *
if (unlikely(0 == descr->page_length))
continue;
- uv_mutex_lock(&descr->mutex);
- flags = descr->flags;
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
if (pg_cache_can_get_unsafe(descr, 0)) {
if (flags & RRD_PAGE_POPULATED) {
/* success */
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
continue;
}
@@ -570,19 +588,19 @@ struct pg_cache_page_index *
if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
preload_array[count++] = descr;
if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) {
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
break;
}
}
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
- };
+ }
uv_rwlock_rdunlock(&page_index->lock);
failed_to_reserve = 0;
for (i = 0 ; i < count && !failed_to_reserve ; ++i) {
struct rrdeng_cmd cmd;
- struct rrdeng_page_cache_descr *next;
+ struct rrdeng_page_descr *next;
descr = preload_array[i];
if (NULL == descr) {
@@ -622,7 +640,7 @@ struct pg_cache_page_index *
if (NULL == descr) {
continue;
}
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
}
}
if (!count) {
@@ -637,12 +655,13 @@ struct pg_cache_page_index *
* When point_in_time is INVALID_TIME get any page.
* If index is NULL lookup by UUID (id).
*/
-struct rrdeng_page_cache_descr *
+struct rrdeng_page_descr *
pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
usec_t point_in_time)
{
struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *descr = NULL;
+ struct rrdeng_page_descr *descr = NULL;
+ struct page_cache_descr *pg_cache_descr = NULL;
unsigned long flags;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
@@ -682,11 +701,12 @@ struct rrdeng_page_cache_descr *
pg_cache_release_pages(ctx, 1);
return NULL;
}
- uv_mutex_lock(&descr->mutex);
- flags = descr->flags;
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ flags = pg_cache_descr->flags;
if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
/* success */
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
break;
}
@@ -702,14 +722,14 @@ struct rrdeng_page_cache_descr *
debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
if(unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
- while (!(descr->flags & RRD_PAGE_POPULATED)) {
+ while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
pg_cache_wait_event_unsafe(descr);
}
/* success */
/* Downgrade exclusive reference to allow other readers */
- descr->flags &= ~RRD_PAGE_LOCKED;
+ pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
pg_cache_wake_up_waiters_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
return descr;
}
@@ -720,7 +740,7 @@ struct rrdeng_page_cache_descr *
if (!(flags & RRD_PAGE_POPULATED))
page_not_in_cache = 1;
pg_cache_wait_event_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
/* reset scan to find again */
uv_rwlock_rdlock(&page_index->lock);
@@ -747,6 +767,7 @@ struct pg_cache_page_index *create_page_index(uuid_t *id)
assert(0 == uv_rwlock_init(&page_index->lock));
page_index->oldest_time = INVALID_TIME;
page_index->latest_time = INVALID_TIME;
+ page_index->prev = NULL;
return page_index;
}
@@ -756,6 +777,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx)
struct page_cache *pg_cache = &ctx->pg_cache;
pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
+ pg_cache->metrics_index.last_page_index = NULL;
assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
}
@@ -789,4 +811,65 @@ void init_page_cache(struct rrdengine_instance *ctx)
init_metrics_index(ctx);
init_replaceQ(ctx);
init_commited_page_index(ctx);
+}
+
+void free_page_cache(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Word_t ret_Judy, bytes_freed = 0;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index, *prev_page_index;
+ Word_t Index;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+
+ /* Free commited page index */
+ ret_Judy = JudyLFreeArray(&pg_cache->commited_page_index.JudyL_array, PJE0);
+ assert(NULL == pg_cache->commited_page_index.JudyL_array);
+ bytes_freed += ret_Judy;
+
+ for (page_index = pg_cache->metrics_index.last_page_index ;
+ page_index != NULL ;
+ page_index = prev_page_index) {
+ prev_page_index = page_index->prev;
+
+ /* Find first page in range */
+ Index = (Word_t) 0;
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ }
+ while (descr != NULL) {
+ /* Iterate all page descriptors of this metric */
+
+ if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
+ /* Check rrdenglocking.c */
+ pg_cache_descr = descr->pg_cache_descr;
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ freez(pg_cache_descr->page);
+ bytes_freed += RRDENG_BLOCK_SIZE;
+ }
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ bytes_freed += sizeof(*pg_cache_descr);
+ }
+ freez(descr);
+ bytes_freed += sizeof(*descr);
+
+ PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ }
+
+ /* Free page index */
+ ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0);
+ assert(NULL == page_index->JudyL_array);
+ bytes_freed += ret_Judy;
+ freez(page_index);
+ bytes_freed += sizeof(*page_index);
+ }
+ /* Free metrics index */
+ ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
+ assert(NULL == pg_cache->metrics_index.JudyHS_array);
+ bytes_freed += ret_Judy;
+
+ info("Freed %lu bytes of memory from page cache.", bytes_freed);
} \ No newline at end of file