diff options
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r-- | database/engine/rrdengine.c | 69 |
1 files changed, 14 insertions, 55 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 36d917541..896d71f16 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -5,6 +5,8 @@ rrdeng_stats_t global_io_errors = 0; rrdeng_stats_t global_fs_errors = 0; +rrdeng_stats_t global_pg_cache_warnings = 0; +rrdeng_stats_t global_pg_cache_errors = 0; rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; void sanity_check(void) @@ -251,13 +253,10 @@ void flush_pages_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; struct rrdengine_instance *ctx = wc->ctx; - struct page_cache *pg_cache = &ctx->pg_cache; struct extent_io_descriptor *xt_io_descr; struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; - int ret; unsigned i, count; - Word_t commit_id; xt_io_descr = req->data; if (req->result < 0) { @@ -277,13 +276,6 @@ void flush_pages_cb(uv_fs_t* req) /* care, we don't hold the descriptor mutex */ descr = xt_io_descr->descr_array[i]; - uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); - commit_id = xt_io_descr->descr_commit_idx_array[i]; - ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, commit_id, PJE0); - assert(1 == ret); - --pg_cache->commited_page_index.nr_commited_pages; - uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); - pg_cache_replaceQ_insert(ctx, descr); rrdeng_page_descr_mutex_lock(ctx, descr); @@ -331,7 +323,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct if (force) { debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure."); } - uv_rwlock_rdlock(&pg_cache->commited_page_index.lock); + uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); for (Index = 0, count = 0, uncompressed_payload_length = 0, PValue = JudyLFirst(pg_cache->commited_page_index.JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue ; @@ -340,11 +332,15 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct PValue = JudyLNext(pg_cache->commited_page_index.JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue) { + uint8_t page_write_pending; + assert(0 != descr->page_length); + page_write_pending = 0; rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) { + page_write_pending = 1; /* care, no reference being held */ pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING; uncompressed_payload_length += descr->page_length; @@ -352,8 +348,14 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct eligible_pages[count++] = descr; } rrdeng_page_descr_mutex_unlock(ctx, descr); + + if (page_write_pending) { + ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, Index, PJE0); + assert(1 == ret); + --pg_cache->commited_page_index.nr_commited_pages; + } } - uv_rwlock_rdunlock(&pg_cache->commited_page_index.lock); + uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); if (!count) { debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__); @@ -813,47 +815,6 @@ error_after_loop_init: complete(&ctx->rrdengine_completion); } - -#define NR_PAGES (256) -static void basic_functional_test(struct rrdengine_instance *ctx) -{ - int i, j, failed_validations; - uuid_t uuid[NR_PAGES]; - void *buf; - struct rrdeng_page_descr *handle[NR_PAGES]; - char uuid_str[UUID_STR_LEN]; - char backup[NR_PAGES][UUID_STR_LEN * 100]; /* backup storage for page data verification */ - - for (i = 0 ; i < NR_PAGES ; ++i) { - uuid_generate(uuid[i]); - uuid_unparse_lower(uuid[i], uuid_str); -// fprintf(stderr, "Generated uuid[%d]=%s\n", i, uuid_str); - buf = rrdeng_create_page(ctx, &uuid[i], &handle[i]); - /* Each page contains 10 times its own UUID stringified */ - for (j = 0 ; j < 100 ; ++j) { - strcpy(buf + UUID_STR_LEN * j, uuid_str); - strcpy(backup[i] + UUID_STR_LEN * j, uuid_str); - } - rrdeng_commit_page(ctx, handle[i], (Word_t)i); - } - fprintf(stderr, "\n********** CREATED %d METRIC PAGES ***********\n\n", NR_PAGES); - failed_validations = 0; - for (i = 0 ; i < NR_PAGES ; ++i) { - buf = rrdeng_get_latest_page(ctx, &uuid[i], (void **)&handle[i]); - if (NULL == buf) { - ++failed_validations; - fprintf(stderr, "Page %d was LOST.\n", i); - } - if (memcmp(backup[i], buf, UUID_STR_LEN * 100)) { - ++failed_validations; - fprintf(stderr, "Page %d data comparison with backup FAILED validation.\n", i); - } - rrdeng_put_page(ctx, handle[i]); - } - fprintf(stderr, "\n********** CORRECTLY VALIDATED %d/%d METRIC PAGES ***********\n\n", - NR_PAGES - failed_validations, NR_PAGES); - -} /* C entry point for development purposes * make "LDFLAGS=-errdengine_main" */ @@ -866,8 +827,6 @@ void rrdengine_main(void) if (ret) { exit(ret); } - basic_functional_test(ctx); - rrdeng_exit(ctx); fprintf(stderr, "Hello world!"); exit(0); |