diff options
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r-- | database/engine/rrdengine.c | 53 |
1 files changed, 50 insertions, 3 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index a975cfa6..9f43f445 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -11,8 +11,24 @@ rrdeng_stats_t global_flushing_pressure_page_deletions = 0; static unsigned pages_per_extent = MAX_PAGES_PER_EXTENT; +#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2) +#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2) +#endif + +void *dbengine_page_alloc() { + void *page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm); + if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()"); + return page; +} + +void dbengine_page_free(void *page) { + munmap(page, RRDENG_BLOCK_SIZE); +} + static void sanity_check(void) { + BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2)); + /* Magic numbers must fit in the super-blocks */ BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ); BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ); @@ -176,7 +192,7 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str struct extent_info *extent = xt_io_descr->descr_array[0]->extent; for (i = 0 ; i < xt_io_descr->descr_count; ++i) { - page = mallocz(RRDENG_BLOCK_SIZE); + page = dbengine_page_alloc(); descr = xt_io_descr->descr_array[i]; for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) { /* care, we don't hold the descriptor mutex */ @@ -331,7 +347,7 @@ after_crc_check: continue; /* Failed to reserve a suitable page */ is_prefetched_page = 1; } - page = mallocz(RRDENG_BLOCK_SIZE); + page = dbengine_page_alloc(); /* care, we don't hold the descriptor mutex */ if (have_read_error) { @@ -735,6 +751,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct fatal("posix_memalign:%s", strerror(ret)); /* freez(xt_io_descr);*/ } + memset(xt_io_descr->buf, 0, ALIGN_BYTES_CEILING(size_bytes)); (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count); xt_io_descr->descr_count = count; @@ -1074,13 +1091,17 @@ void async_cb(uv_async_t *handle) void timer_cb(uv_timer_t* handle) { + worker_is_busy(RRDENG_MAX_OPCODE + 1); + struct rrdengine_worker_config* wc = handle->data; struct rrdengine_instance *ctx = wc->ctx; uv_stop(handle->loop); uv_update_time(handle->loop); - if (unlikely(!ctx->metalog_ctx->initialized)) + if (unlikely(!ctx->metalog_ctx->initialized)) { + worker_is_idle(); return; /* Wait for the metadata log to initialize */ + } rrdeng_test_quota(wc); debug(D_RRDENGINE, "%s: timeout reached.", __func__); if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) { @@ -1122,12 +1143,26 @@ void timer_cb(uv_timer_t* handle) debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf))); } #endif + + worker_is_idle(); } #define MAX_CMD_BATCH_SIZE (256) void rrdeng_worker(void* arg) { + worker_register("DBENGINE"); + worker_register_job_name(RRDENG_NOOP, "noop"); + worker_register_job_name(RRDENG_READ_PAGE, "page read"); + worker_register_job_name(RRDENG_READ_EXTENT, "extent read"); + worker_register_job_name(RRDENG_COMMIT_PAGE, "commit"); + worker_register_job_name(RRDENG_FLUSH_PAGES, "flush"); + worker_register_job_name(RRDENG_SHUTDOWN, "shutdown"); + worker_register_job_name(RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, "page lru"); + worker_register_job_name(RRDENG_QUIESCE, "quiesce"); + worker_register_job_name(RRDENG_MAX_OPCODE, "cleanup"); + worker_register_job_name(RRDENG_MAX_OPCODE + 1, "timer"); + struct rrdengine_worker_config* wc = arg; struct rrdengine_instance *ctx = wc->ctx; uv_loop_t* loop; @@ -1175,8 +1210,11 @@ void rrdeng_worker(void* arg) fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); shutdown = 0; + int set_name = 0; while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) { + worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); + worker_is_busy(RRDENG_MAX_OPCODE); rrdeng_cleanup_finished_threads(wc); /* wait for commands */ @@ -1193,6 +1231,9 @@ void rrdeng_worker(void* arg) opcode = cmd.opcode; ++cmd_batch_size; + if(likely(opcode != RRDENG_NOOP)) + worker_is_busy(opcode); + switch (opcode) { case RRDENG_NOOP: /* the command queue was empty, do nothing */ @@ -1219,6 +1260,10 @@ void rrdeng_worker(void* arg) break; case RRDENG_READ_EXTENT: do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1); + if (unlikely(!set_name)) { + set_name = 1; + uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE"); + } break; case RRDENG_COMMIT_PAGE: do_commit_transaction(wc, STORE_DATA, NULL); @@ -1265,6 +1310,7 @@ void rrdeng_worker(void* arg) fatal_assert(0 == uv_loop_close(loop)); freez(loop); + worker_unregister(); return; error_after_timer_init: @@ -1277,6 +1323,7 @@ error_after_loop_init: wc->error = UV_EAGAIN; /* wake up initialization thread */ completion_mark_complete(&ctx->rrdengine_completion); + worker_unregister(); } /* C entry point for development purposes |