summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c117
1 files changed, 47 insertions, 70 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 99257b79d..b82cc1ad1 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -40,6 +40,7 @@ struct rrdeng_main {
uv_async_t async;
uv_timer_t timer;
pid_t tid;
+ bool shutdown;
size_t flushes_running;
size_t evictions_running;
@@ -577,55 +578,6 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) {
// ----------------------------------------------------------------------------
-struct {
- ARAL *aral[RRD_STORAGE_TIERS];
-} dbengine_page_alloc_globals = {};
-
-static inline ARAL *page_size_lookup(size_t size) {
- for(size_t tier = 0; tier < storage_tiers ;tier++)
- if(size == tier_page_size[tier])
- return dbengine_page_alloc_globals.aral[tier];
-
- return NULL;
-}
-
-static void dbengine_page_alloc_init(void) {
- for(size_t i = storage_tiers; i > 0 ;i--) {
- size_t tier = storage_tiers - i;
-
- char buf[20 + 1];
- snprintfz(buf, 20, "tier%zu-pages", tier);
-
- dbengine_page_alloc_globals.aral[tier] = aral_create(
- buf,
- tier_page_size[tier],
- 64,
- 512 * tier_page_size[tier],
- pgc_aral_statistics(),
- NULL, NULL, false, false);
- }
-}
-
-void *dbengine_page_alloc(size_t size) {
- ARAL *ar = page_size_lookup(size);
- if(ar) return aral_mallocz(ar);
-
- return mallocz(size);
-}
-
-void dbengine_page_free(void *page, size_t size __maybe_unused) {
- if(unlikely(!page || page == DBENGINE_EMPTY_PAGE))
- return;
-
- ARAL *ar = page_size_lookup(size);
- if(ar)
- aral_freez(ar, page);
- else
- freez(page);
-}
-
-// ----------------------------------------------------------------------------
-
void *dbengine_extent_alloc(size_t size) {
void *extent = mallocz(size);
return extent;
@@ -890,12 +842,25 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
header->descr[i].page_length = descr->page_length;
header->descr[i].start_time_ut = descr->start_time_ut;
- header->descr[i].end_time_ut = descr->end_time_ut;
+
+ switch (descr->type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ header->descr[i].end_time_ut = descr->end_time_ut;
+ break;
+ case PAGE_GORILLA_METRICS:
+ header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC);
+ header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd);
+ break;
+ default:
+ fatal("Unknown page type: %uc", descr->type);
+ }
+
pos += sizeof(header->descr[i]);
}
for (i = 0 ; i < count ; ++i) {
descr = xt_io_descr->descr_array[i];
- (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length);
+ pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length);
pos += descr->page_length;
}
@@ -1381,9 +1346,6 @@ static void after_ctx_shutdown(struct rrdengine_instance *ctx __maybe_unused, vo
static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
worker_is_busy(UV_EVENT_DBENGINE_SHUTDOWN);
- completion_wait_for(&ctx->quiesce.completion);
- completion_destroy(&ctx->quiesce.completion);
-
bool logged = false;
while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED) ||
__atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) {
@@ -1436,6 +1398,14 @@ uint64_t rrdeng_target_data_file_size(struct rrdengine_instance *ctx) {
bool rrdeng_ctx_exceeded_disk_quota(struct rrdengine_instance *ctx)
{
+ if(!ctx->datafiles.first)
+ // no datafiles available
+ return false;
+
+ if(!ctx->datafiles.first->next)
+ // only 1 datafile available
+ return false;
+
uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) -
(ctx->datafiles.first->prev ? ctx->datafiles.first->prev->pos : 0);
@@ -1514,12 +1484,19 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb
spinlock_unlock(&datafile->writers.spinlock);
if(!available) {
- netdata_log_info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno);
+ nd_log(NDLS_DAEMON, NDLP_NOTICE,
+ "DBENGINE: journal file %u needs to be indexed, but it has writers working on it - "
+ "skipping it for now",
+ datafile->fileno);
+
datafile = datafile->next;
continue;
}
- netdata_log_info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "DBENGINE: journal file %u is ready to be indexed",
+ datafile->fileno);
+
pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
@@ -1532,7 +1509,10 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb
}
errno = 0;
- internal_error(count, "DBENGINE: journal indexing done; %u files processed", count);
+ if(count)
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "DBENGINE: journal indexing done; %u files processed",
+ count);
worker_is_idle();
@@ -1628,7 +1608,7 @@ static void dbengine_initialize_structures(void) {
rrdeng_query_handle_init();
page_descriptors_init();
extent_buffer_init();
- dbengine_page_alloc_init();
+ pgd_init_arals();
extent_io_descriptor_init();
}
@@ -1715,6 +1695,7 @@ void dbengine_event_loop(void* arg) {
worker_register_job_name(RRDENG_OPCODE_EVICT_INIT, "evict init");
worker_register_job_name(RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown");
worker_register_job_name(RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce");
+ worker_register_job_name(RRDENG_OPCODE_SHUTDOWN_EVLOOP, "dbengine shutdown");
worker_register_job_name(RRDENG_OPCODE_MAX, "get opcode");
@@ -1856,6 +1837,13 @@ void dbengine_event_loop(void* arg) {
break;
}
+ case RRDENG_OPCODE_SHUTDOWN_EVLOOP: {
+ uv_close((uv_handle_t *)&main->async, NULL);
+ (void) uv_timer_stop(&main->timer);
+ uv_close((uv_handle_t *)&main->timer, NULL);
+ shutdown = true;
+ }
+
case RRDENG_OPCODE_NOOP: {
/* the command queue was empty, do nothing */
break;
@@ -1872,18 +1860,7 @@ void dbengine_event_loop(void* arg) {
} while (opcode != RRDENG_OPCODE_NOOP);
}
- /* cleanup operations of the event loop */
- netdata_log_info("DBENGINE: shutting down dbengine thread");
-
- /*
- * uv_async_send after uv_close does not seem to crash in linux at the moment,
- * it is however undocumented behaviour and we need to be aware if this becomes
- * an issue in the future.
- */
- uv_close((uv_handle_t *)&main->async, NULL);
- uv_timer_stop(&main->timer);
- uv_close((uv_handle_t *)&main->timer, NULL);
- uv_run(&main->loop, UV_RUN_DEFAULT);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down dbengine thread");
uv_loop_close(&main->loop);
worker_unregister();
}