summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c56
1 files changed, 39 insertions, 17 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 896d71f16..b6b6548ec 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -5,9 +5,8 @@
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
-rrdeng_stats_t global_pg_cache_warnings = 0;
-rrdeng_stats_t global_pg_cache_errors = 0;
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
+rrdeng_stats_t global_flushing_errors = 0;
void sanity_check(void)
{
@@ -253,6 +252,7 @@ void flush_pages_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
struct extent_io_descriptor *xt_io_descr;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
@@ -290,6 +290,10 @@ void flush_pages_cb(uv_fs_t* req)
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
freez(xt_io_descr);
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ pg_cache->committed_page_index.nr_committed_pages -= count;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
}
/*
@@ -323,14 +327,14 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
if (force) {
debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure.");
}
- uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
for (Index = 0, count = 0, uncompressed_payload_length = 0,
- PValue = JudyLFirst(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
+ PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue ;
descr != NULL && count != MAX_PAGES_PER_EXTENT ;
- PValue = JudyLNext(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
+ PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
uint8_t page_write_pending;
@@ -350,12 +354,11 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
rrdeng_page_descr_mutex_unlock(ctx, descr);
if (page_write_pending) {
- ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, Index, PJE0);
+ ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
assert(1 == ret);
- --pg_cache->commited_page_index.nr_commited_pages;
}
}
- uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
if (!count) {
debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__);
@@ -648,6 +651,9 @@ void async_cb(uv_async_t *handle)
debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
}
+/* Flushes dirty pages when timer expires */
+#define TIMER_PERIOD_MS (1000)
+
void timer_cb(uv_timer_t* handle)
{
struct rrdengine_worker_config* wc = handle->data;
@@ -657,12 +663,31 @@ void timer_cb(uv_timer_t* handle)
rrdeng_test_quota(wc);
debug(D_RRDENGINE, "%s: timeout reached.", __func__);
if (likely(!wc->now_deleting.data)) {
- unsigned total_bytes, bytes_written;
-
/* There is free space so we can write to disk */
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark,
+ high_watermark;
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ producers = ctx->stats.metric_API_producers;
+ /* are flushable pages more than 25% of the maximum page cache size */
+ high_watermark = (ctx->max_cache_pages * 25LLU) / 100;
+ low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */
+
+ if (nr_committed_pages > producers &&
+ /* committed to be written pages are more than the produced number */
+ nr_committed_pages - producers > high_watermark) {
+ /* Flushing speed must increase to stop page cache from filling with dirty pages */
+ bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
+ }
+ bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
+
debug(D_RRDENGINE, "Flushing pages to disk.");
for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ;
- bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ;
+ bytes_written && (total_bytes < bytes_to_write) ;
total_bytes += bytes_written) {
bytes_written = do_flush_pages(wc, 0, NULL);
}
@@ -675,9 +700,6 @@ void timer_cb(uv_timer_t* handle)
#endif
}
-/* Flushes dirty pages when timer expires */
-#define TIMER_PERIOD_MS (1000)
-
#define MAX_CMD_BATCH_SIZE (256)
void rrdeng_worker(void* arg)
@@ -771,8 +793,8 @@ void rrdeng_worker(void* arg)
/* First I/O should be enough to call completion */
bytes_written = do_flush_pages(wc, 1, cmd.completion);
if (bytes_written) {
- while (do_flush_pages(wc, 1, NULL)) {
- ; /* Force flushing of all commited pages. */
+ while (do_flush_pages(wc, 1, NULL) && likely(!wc->now_deleting.data)) {
+ ; /* Force flushing of all committed pages if there is free space. */
}
}
break;
@@ -789,7 +811,7 @@ void rrdeng_worker(void* arg)
}
info("Shutting down RRD engine event loop.");
while (do_flush_pages(wc, 1, NULL)) {
- ; /* Force flushing of all commited pages. */
+ ; /* Force flushing of all committed pages. */
}
wal_flush_transaction_buffer(wc);
uv_run(loop, UV_RUN_DEFAULT);