diff options
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r-- | database/engine/rrdengine.c | 56 |
1 files changed, 39 insertions, 17 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 896d71f16..b6b6548ec 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -5,9 +5,8 @@ rrdeng_stats_t global_io_errors = 0; rrdeng_stats_t global_fs_errors = 0; -rrdeng_stats_t global_pg_cache_warnings = 0; -rrdeng_stats_t global_pg_cache_errors = 0; rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; +rrdeng_stats_t global_flushing_errors = 0; void sanity_check(void) { @@ -253,6 +252,7 @@ void flush_pages_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; struct extent_io_descriptor *xt_io_descr; struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; @@ -290,6 +290,10 @@ void flush_pages_cb(uv_fs_t* req) uv_fs_req_cleanup(req); free(xt_io_descr->buf); freez(xt_io_descr); + + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + pg_cache->committed_page_index.nr_committed_pages -= count; + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); } /* @@ -323,14 +327,14 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct if (force) { debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure."); } - uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); for (Index = 0, count = 0, uncompressed_payload_length = 0, - PValue = JudyLFirst(pg_cache->commited_page_index.JudyL_array, &Index, PJE0), + PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue ; descr != NULL && count != MAX_PAGES_PER_EXTENT ; - PValue = JudyLNext(pg_cache->commited_page_index.JudyL_array, &Index, PJE0), + PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue) { uint8_t page_write_pending; @@ -350,12 +354,11 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct rrdeng_page_descr_mutex_unlock(ctx, descr); if (page_write_pending) { - ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, Index, PJE0); + ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); assert(1 == ret); - --pg_cache->commited_page_index.nr_commited_pages; } } - uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); if (!count) { debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__); @@ -648,6 +651,9 @@ void async_cb(uv_async_t *handle) debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); } +/* Flushes dirty pages when timer expires */ +#define TIMER_PERIOD_MS (1000) + void timer_cb(uv_timer_t* handle) { struct rrdengine_worker_config* wc = handle->data; @@ -657,12 +663,31 @@ void timer_cb(uv_timer_t* handle) rrdeng_test_quota(wc); debug(D_RRDENGINE, "%s: timeout reached.", __func__); if (likely(!wc->now_deleting.data)) { - unsigned total_bytes, bytes_written; - /* There is free space so we can write to disk */ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark, + high_watermark; + + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + producers = ctx->stats.metric_API_producers; + /* are flushable pages more than 25% of the maximum page cache size */ + high_watermark = (ctx->max_cache_pages * 25LLU) / 100; + low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */ + + if (nr_committed_pages > producers && + /* committed to be written pages are more than the produced number */ + nr_committed_pages - producers > high_watermark) { + /* Flushing speed must increase to stop page cache from filling with dirty pages */ + bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE; + } + bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write); + debug(D_RRDENGINE, "Flushing pages to disk."); for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ; - bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ; + bytes_written && (total_bytes < bytes_to_write) ; total_bytes += bytes_written) { bytes_written = do_flush_pages(wc, 0, NULL); } @@ -675,9 +700,6 @@ void timer_cb(uv_timer_t* handle) #endif } -/* Flushes dirty pages when timer expires */ -#define TIMER_PERIOD_MS (1000) - #define MAX_CMD_BATCH_SIZE (256) void rrdeng_worker(void* arg) @@ -771,8 +793,8 @@ void rrdeng_worker(void* arg) /* First I/O should be enough to call completion */ bytes_written = do_flush_pages(wc, 1, cmd.completion); if (bytes_written) { - while (do_flush_pages(wc, 1, NULL)) { - ; /* Force flushing of all commited pages. */ + while (do_flush_pages(wc, 1, NULL) && likely(!wc->now_deleting.data)) { + ; /* Force flushing of all committed pages if there is free space. */ } } break; @@ -789,7 +811,7 @@ void rrdeng_worker(void* arg) } info("Shutting down RRD engine event loop."); while (do_flush_pages(wc, 1, NULL)) { - ; /* Force flushing of all commited pages. */ + ; /* Force flushing of all committed pages. */ } wal_flush_transaction_buffer(wc); uv_run(loop, UV_RUN_DEFAULT); |