diff options
Diffstat (limited to 'database/engine/rrdenglocking.c')
-rw-r--r-- | database/engine/rrdenglocking.c | 56 |
1 files changed, 32 insertions, 24 deletions
diff --git a/database/engine/rrdenglocking.c b/database/engine/rrdenglocking.c index 0eb9019b4..a23abf307 100644 --- a/database/engine/rrdenglocking.c +++ b/database/engine/rrdenglocking.c @@ -12,8 +12,8 @@ struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance pg_cache_descr->prev = pg_cache_descr->next = NULL; pg_cache_descr->refcnt = 0; pg_cache_descr->waiters = 0; - assert(0 == uv_cond_init(&pg_cache_descr->cond)); - assert(0 == uv_mutex_init(&pg_cache_descr->mutex)); + fatal_assert(0 == uv_cond_init(&pg_cache_descr->cond)); + fatal_assert(0 == uv_mutex_init(&pg_cache_descr->mutex)); return pg_cache_descr; } @@ -39,7 +39,7 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_ old_users = old_state >> PG_CACHE_DESCR_SHIFT; if (unlikely(we_locked)) { - assert(old_state & PG_CACHE_DESCR_LOCKED); + fatal_assert(old_state & PG_CACHE_DESCR_LOCKED); new_state = (1 << PG_CACHE_DESCR_SHIFT) | PG_CACHE_DESCR_ALLOCATED; ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); if (old_state == ret_state) { @@ -49,7 +49,7 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_ continue; /* spin */ } if (old_state & PG_CACHE_DESCR_LOCKED) { - assert(0 == old_users); + fatal_assert(0 == old_users); continue; /* spin */ } if (0 == old_state) { @@ -71,8 +71,9 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_ continue; /* spin */ } /* page cache descriptor is already allocated */ - assert(old_state & PG_CACHE_DESCR_ALLOCATED); - + if (unlikely(!(old_state & PG_CACHE_DESCR_ALLOCATED))) { + fatal("Invalid page cache descriptor locking state:%#lX", old_state); + } new_state = (old_users + 1) << PG_CACHE_DESCR_SHIFT; new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK; @@ -94,7 +95,7 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) { unsigned long old_state, new_state, ret_state, old_users; - struct page_cache_descr *pg_cache_descr; + struct page_cache_descr *pg_cache_descr, *delete_pg_cache_descr = NULL; uint8_t we_locked; uv_mutex_unlock(&descr->pg_cache_descr->mutex); @@ -105,35 +106,39 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden old_users = old_state >> PG_CACHE_DESCR_SHIFT; if (unlikely(we_locked)) { - assert(0 == old_users); + fatal_assert(0 == old_users); ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, 0); if (old_state == ret_state) { /* success */ - break; + rrdeng_destroy_pg_cache_descr(ctx, delete_pg_cache_descr); + return; } continue; /* spin */ } if (old_state & PG_CACHE_DESCR_LOCKED) { - assert(0 == old_users); + fatal_assert(0 == old_users); continue; /* spin */ } - assert(old_state & PG_CACHE_DESCR_ALLOCATED); + fatal_assert(old_state & PG_CACHE_DESCR_ALLOCATED); pg_cache_descr = descr->pg_cache_descr; /* caller is the only page cache descriptor user and there are no pending references on the page */ if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) && !pg_cache_descr->flags && !pg_cache_descr->refcnt) { + fatal_assert(!pg_cache_descr->waiters); + new_state = PG_CACHE_DESCR_LOCKED; ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); if (old_state == ret_state) { we_locked = 1; - rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + delete_pg_cache_descr = pg_cache_descr; + descr->pg_cache_descr = NULL; /* retry */ continue; } continue; /* spin */ } - assert(old_users > 0); + fatal_assert(old_users > 0); new_state = (old_users - 1) << PG_CACHE_DESCR_SHIFT; new_state |= old_state & PG_CACHE_DESCR_FLAGS_MASK; @@ -144,7 +149,6 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden } /* spin */ } - } /* @@ -155,34 +159,36 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) { unsigned long old_state, new_state, ret_state, old_users; - struct page_cache_descr *pg_cache_descr; - uint8_t just_locked, we_freed, must_unlock; + struct page_cache_descr *pg_cache_descr = NULL; + uint8_t just_locked, can_free, must_unlock; just_locked = 0; - we_freed = 0; + can_free = 0; must_unlock = 0; while (1) { /* spin */ old_state = descr->pg_cache_descr_state; old_users = old_state >> PG_CACHE_DESCR_SHIFT; if (unlikely(just_locked)) { - assert(0 == old_users); + fatal_assert(0 == old_users); must_unlock = 1; just_locked = 0; /* Try deallocate if there are no pending references on the page */ if (!pg_cache_descr->flags && !pg_cache_descr->refcnt) { - rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); - we_freed = 1; + fatal_assert(!pg_cache_descr->waiters); + + descr->pg_cache_descr = NULL; + can_free = 1; /* success */ continue; } continue; /* spin */ } if (unlikely(must_unlock)) { - assert(0 == old_users); + fatal_assert(0 == old_users); - if (we_freed) { + if (can_free) { /* success */ new_state = 0; } else { @@ -192,6 +198,8 @@ void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); if (old_state == ret_state) { /* unlocked */ + if (can_free) + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); return; } continue; /* spin */ @@ -201,16 +209,16 @@ void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct return; } if (old_state & PG_CACHE_DESCR_LOCKED) { - assert(0 == old_users); + fatal_assert(0 == old_users); continue; /* spin */ } - pg_cache_descr = descr->pg_cache_descr; /* caller is the only page cache descriptor user */ if (0 == old_users) { new_state = old_state | PG_CACHE_DESCR_LOCKED; ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state); if (old_state == ret_state) { just_locked = 1; + pg_cache_descr = descr->pg_cache_descr; /* retry */ continue; } |