summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c53
1 files changed, 50 insertions, 3 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index a975cfa6e..9f43f4456 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -11,8 +11,24 @@ rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
static unsigned pages_per_extent = MAX_PAGES_PER_EXTENT;
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2)
+#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2)
+#endif
+
+void *dbengine_page_alloc() {
+ void *page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm);
+ if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()");
+ return page;
+}
+
+void dbengine_page_free(void *page) {
+ munmap(page, RRDENG_BLOCK_SIZE);
+}
+
static void sanity_check(void)
{
+ BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2));
+
/* Magic numbers must fit in the super-blocks */
BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ);
@@ -176,7 +192,7 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str
struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
- page = mallocz(RRDENG_BLOCK_SIZE);
+ page = dbengine_page_alloc();
descr = xt_io_descr->descr_array[i];
for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) {
/* care, we don't hold the descriptor mutex */
@@ -331,7 +347,7 @@ after_crc_check:
continue; /* Failed to reserve a suitable page */
is_prefetched_page = 1;
}
- page = mallocz(RRDENG_BLOCK_SIZE);
+ page = dbengine_page_alloc();
/* care, we don't hold the descriptor mutex */
if (have_read_error) {
@@ -735,6 +751,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
fatal("posix_memalign:%s", strerror(ret));
/* freez(xt_io_descr);*/
}
+ memset(xt_io_descr->buf, 0, ALIGN_BYTES_CEILING(size_bytes));
(void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count);
xt_io_descr->descr_count = count;
@@ -1074,13 +1091,17 @@ void async_cb(uv_async_t *handle)
void timer_cb(uv_timer_t* handle)
{
+ worker_is_busy(RRDENG_MAX_OPCODE + 1);
+
struct rrdengine_worker_config* wc = handle->data;
struct rrdengine_instance *ctx = wc->ctx;
uv_stop(handle->loop);
uv_update_time(handle->loop);
- if (unlikely(!ctx->metalog_ctx->initialized))
+ if (unlikely(!ctx->metalog_ctx->initialized)) {
+ worker_is_idle();
return; /* Wait for the metadata log to initialize */
+ }
rrdeng_test_quota(wc);
debug(D_RRDENGINE, "%s: timeout reached.", __func__);
if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) {
@@ -1122,12 +1143,26 @@ void timer_cb(uv_timer_t* handle)
debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf)));
}
#endif
+
+ worker_is_idle();
}
#define MAX_CMD_BATCH_SIZE (256)
void rrdeng_worker(void* arg)
{
+ worker_register("DBENGINE");
+ worker_register_job_name(RRDENG_NOOP, "noop");
+ worker_register_job_name(RRDENG_READ_PAGE, "page read");
+ worker_register_job_name(RRDENG_READ_EXTENT, "extent read");
+ worker_register_job_name(RRDENG_COMMIT_PAGE, "commit");
+ worker_register_job_name(RRDENG_FLUSH_PAGES, "flush");
+ worker_register_job_name(RRDENG_SHUTDOWN, "shutdown");
+ worker_register_job_name(RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, "page lru");
+ worker_register_job_name(RRDENG_QUIESCE, "quiesce");
+ worker_register_job_name(RRDENG_MAX_OPCODE, "cleanup");
+ worker_register_job_name(RRDENG_MAX_OPCODE + 1, "timer");
+
struct rrdengine_worker_config* wc = arg;
struct rrdengine_instance *ctx = wc->ctx;
uv_loop_t* loop;
@@ -1175,8 +1210,11 @@ void rrdeng_worker(void* arg)
fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
shutdown = 0;
+ int set_name = 0;
while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) {
+ worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);
+ worker_is_busy(RRDENG_MAX_OPCODE);
rrdeng_cleanup_finished_threads(wc);
/* wait for commands */
@@ -1193,6 +1231,9 @@ void rrdeng_worker(void* arg)
opcode = cmd.opcode;
++cmd_batch_size;
+ if(likely(opcode != RRDENG_NOOP))
+ worker_is_busy(opcode);
+
switch (opcode) {
case RRDENG_NOOP:
/* the command queue was empty, do nothing */
@@ -1219,6 +1260,10 @@ void rrdeng_worker(void* arg)
break;
case RRDENG_READ_EXTENT:
do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1);
+ if (unlikely(!set_name)) {
+ set_name = 1;
+ uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE");
+ }
break;
case RRDENG_COMMIT_PAGE:
do_commit_transaction(wc, STORE_DATA, NULL);
@@ -1265,6 +1310,7 @@ void rrdeng_worker(void* arg)
fatal_assert(0 == uv_loop_close(loop));
freez(loop);
+ worker_unregister();
return;
error_after_timer_init:
@@ -1277,6 +1323,7 @@ error_after_loop_init:
wc->error = UV_EAGAIN;
/* wake up initialization thread */
completion_mark_complete(&ctx->rrdengine_completion);
+ worker_unregister();
}
/* C entry point for development purposes