summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c69
1 files changed, 14 insertions, 55 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 36d917541..896d71f16 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -5,6 +5,8 @@
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
+rrdeng_stats_t global_pg_cache_warnings = 0;
+rrdeng_stats_t global_pg_cache_errors = 0;
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
void sanity_check(void)
@@ -251,13 +253,10 @@ void flush_pages_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
struct rrdengine_instance *ctx = wc->ctx;
- struct page_cache *pg_cache = &ctx->pg_cache;
struct extent_io_descriptor *xt_io_descr;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
- int ret;
unsigned i, count;
- Word_t commit_id;
xt_io_descr = req->data;
if (req->result < 0) {
@@ -277,13 +276,6 @@ void flush_pages_cb(uv_fs_t* req)
/* care, we don't hold the descriptor mutex */
descr = xt_io_descr->descr_array[i];
- uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
- commit_id = xt_io_descr->descr_commit_idx_array[i];
- ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, commit_id, PJE0);
- assert(1 == ret);
- --pg_cache->commited_page_index.nr_commited_pages;
- uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
-
pg_cache_replaceQ_insert(ctx, descr);
rrdeng_page_descr_mutex_lock(ctx, descr);
@@ -331,7 +323,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
if (force) {
debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure.");
}
- uv_rwlock_rdlock(&pg_cache->commited_page_index.lock);
+ uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
for (Index = 0, count = 0, uncompressed_payload_length = 0,
PValue = JudyLFirst(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue ;
@@ -340,11 +332,15 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
PValue = JudyLNext(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ uint8_t page_write_pending;
+
assert(0 != descr->page_length);
+ page_write_pending = 0;
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) {
+ page_write_pending = 1;
/* care, no reference being held */
pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING;
uncompressed_payload_length += descr->page_length;
@@ -352,8 +348,14 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
eligible_pages[count++] = descr;
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ if (page_write_pending) {
+ ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, Index, PJE0);
+ assert(1 == ret);
+ --pg_cache->commited_page_index.nr_commited_pages;
+ }
}
- uv_rwlock_rdunlock(&pg_cache->commited_page_index.lock);
+ uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
if (!count) {
debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__);
@@ -813,47 +815,6 @@ error_after_loop_init:
complete(&ctx->rrdengine_completion);
}
-
-#define NR_PAGES (256)
-static void basic_functional_test(struct rrdengine_instance *ctx)
-{
- int i, j, failed_validations;
- uuid_t uuid[NR_PAGES];
- void *buf;
- struct rrdeng_page_descr *handle[NR_PAGES];
- char uuid_str[UUID_STR_LEN];
- char backup[NR_PAGES][UUID_STR_LEN * 100]; /* backup storage for page data verification */
-
- for (i = 0 ; i < NR_PAGES ; ++i) {
- uuid_generate(uuid[i]);
- uuid_unparse_lower(uuid[i], uuid_str);
-// fprintf(stderr, "Generated uuid[%d]=%s\n", i, uuid_str);
- buf = rrdeng_create_page(ctx, &uuid[i], &handle[i]);
- /* Each page contains 10 times its own UUID stringified */
- for (j = 0 ; j < 100 ; ++j) {
- strcpy(buf + UUID_STR_LEN * j, uuid_str);
- strcpy(backup[i] + UUID_STR_LEN * j, uuid_str);
- }
- rrdeng_commit_page(ctx, handle[i], (Word_t)i);
- }
- fprintf(stderr, "\n********** CREATED %d METRIC PAGES ***********\n\n", NR_PAGES);
- failed_validations = 0;
- for (i = 0 ; i < NR_PAGES ; ++i) {
- buf = rrdeng_get_latest_page(ctx, &uuid[i], (void **)&handle[i]);
- if (NULL == buf) {
- ++failed_validations;
- fprintf(stderr, "Page %d was LOST.\n", i);
- }
- if (memcmp(backup[i], buf, UUID_STR_LEN * 100)) {
- ++failed_validations;
- fprintf(stderr, "Page %d data comparison with backup FAILED validation.\n", i);
- }
- rrdeng_put_page(ctx, handle[i]);
- }
- fprintf(stderr, "\n********** CORRECTLY VALIDATED %d/%d METRIC PAGES ***********\n\n",
- NR_PAGES - failed_validations, NR_PAGES);
-
-}
/* C entry point for development purposes
* make "LDFLAGS=-errdengine_main"
*/
@@ -866,8 +827,6 @@ void rrdengine_main(void)
if (ret) {
exit(ret);
}
- basic_functional_test(ctx);
-
rrdeng_exit(ctx);
fprintf(stderr, "Hello world!");
exit(0);