summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c202
1 files changed, 137 insertions, 65 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index d64868f03..7811a5eaa 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -16,6 +16,24 @@ unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT;
#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2)
#endif
+struct rrdeng_cmd {
+ struct rrdengine_instance *ctx;
+ enum rrdeng_opcode opcode;
+ void *data;
+ struct completion *completion;
+ enum storage_priority priority;
+ dequeue_callback_t dequeue_cb;
+
+ struct {
+ struct rrdeng_cmd *prev;
+ struct rrdeng_cmd *next;
+ } queue;
+};
+
+static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker);
+static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker);
+static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker);
+
struct rrdeng_main {
uv_thread_t thread;
uv_loop_t loop;
@@ -45,7 +63,6 @@ struct rrdeng_main {
struct {
size_t dispatched;
size_t executing;
- size_t pending_cb;
} atomics;
} work_cmd;
@@ -132,8 +149,22 @@ static void work_request_init(void) {
);
}
-static inline bool work_request_full(void) {
- return __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED) >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS);
+enum LIBUV_WORKERS_STATUS {
+ LIBUV_WORKERS_RELAXED,
+ LIBUV_WORKERS_STRESSED,
+ LIBUV_WORKERS_CRITICAL,
+};
+
+static inline enum LIBUV_WORKERS_STATUS work_request_full(void) {
+ size_t dispatched = __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED);
+
+ if(dispatched >= (size_t)(libuv_worker_threads))
+ return LIBUV_WORKERS_CRITICAL;
+
+ else if(dispatched >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS))
+ return LIBUV_WORKERS_STRESSED;
+
+ return LIBUV_WORKERS_RELAXED;
}
static inline void work_done(struct rrdeng_work *work_request) {
@@ -147,12 +178,38 @@ static void work_standard_worker(uv_work_t *req) {
worker_is_busy(UV_EVENT_WORKER_INIT);
struct rrdeng_work *work_request = req->data;
+
work_request->data = work_request->work_cb(work_request->ctx, work_request->data, work_request->completion, req);
worker_is_idle();
+ if(work_request->opcode == RRDENG_OPCODE_EXTENT_READ || work_request->opcode == RRDENG_OPCODE_QUERY) {
+ internal_fatal(work_request->after_work_cb != NULL, "DBENGINE: opcodes with a callback should not boosted");
+
+ while(1) {
+ struct rrdeng_cmd cmd = rrdeng_deq_cmd(true);
+ if (cmd.opcode == RRDENG_OPCODE_NOOP)
+ break;
+
+ worker_is_busy(UV_EVENT_WORKER_INIT);
+ switch (cmd.opcode) {
+ case RRDENG_OPCODE_EXTENT_READ:
+ worker_dispatch_extent_read(cmd, true);
+ break;
+
+ case RRDENG_OPCODE_QUERY:
+ worker_dispatch_query_prep(cmd, true);
+ break;
+
+ default:
+ fatal("DBENGINE: Opcode should not be executed synchronously");
+ break;
+ }
+ worker_is_idle();
+ }
+ }
+
__atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED);
// signal the event loop a worker is available
fatal_assert(0 == uv_async_send(&rrdeng_main.async));
@@ -167,7 +224,6 @@ static void after_work_standard_callback(uv_work_t* req, int status) {
work_request->after_work_cb(work_request->ctx, work_request->data, work_request->completion, req, status);
work_done(work_request);
- __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED);
worker_is_idle();
}
@@ -369,20 +425,6 @@ void wal_release(WAL *wal) {
// ----------------------------------------------------------------------------
// command queue cache
-struct rrdeng_cmd {
- struct rrdengine_instance *ctx;
- enum rrdeng_opcode opcode;
- void *data;
- struct completion *completion;
- enum storage_priority priority;
- dequeue_callback_t dequeue_cb;
-
- struct {
- struct rrdeng_cmd *prev;
- struct rrdeng_cmd *next;
- } queue;
-};
-
static void rrdeng_cmd_queue_init(void) {
rrdeng_main.cmd_queue.ar = aral_create("dbengine-opcodes",
sizeof(struct rrdeng_cmd),
@@ -465,14 +507,33 @@ static inline bool rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(STORAGE_PR
return false;
}
-static inline struct rrdeng_cmd rrdeng_deq_cmd(void) {
+#define opcode_empty (struct rrdeng_cmd) { \
+ .ctx = NULL, \
+ .opcode = RRDENG_OPCODE_NOOP, \
+ .priority = STORAGE_PRIORITY_BEST_EFFORT, \
+ .completion = NULL, \
+ .data = NULL, \
+}
+
+static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) {
struct rrdeng_cmd *cmd = NULL;
+ enum LIBUV_WORKERS_STATUS status = work_request_full();
+
+ STORAGE_PRIORITY min_priority, max_priority;
+ min_priority = STORAGE_PRIORITY_INTERNAL_DBENGINE;
+ max_priority = (status != LIBUV_WORKERS_RELAXED) ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1;
- STORAGE_PRIORITY max_priority = work_request_full() ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_BEST_EFFORT;
+ if(from_worker) {
+ if(status == LIBUV_WORKERS_CRITICAL)
+ return opcode_empty;
+
+ min_priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP;
+ max_priority = STORAGE_PRIORITY_BEST_EFFORT;
+ }
// find an opcode to execute from the queue
netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
- for(STORAGE_PRIORITY priority = STORAGE_PRIORITY_INTERNAL_DBENGINE; priority <= max_priority ; priority++) {
+ for(STORAGE_PRIORITY priority = min_priority; priority <= max_priority ; priority++) {
cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority];
if(cmd) {
@@ -508,13 +569,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(void) {
aral_freez(rrdeng_main.cmd_queue.ar, cmd);
}
else
- ret = (struct rrdeng_cmd) {
- .ctx = NULL,
- .opcode = RRDENG_OPCODE_NOOP,
- .priority = STORAGE_PRIORITY_BEST_EFFORT,
- .completion = NULL,
- .data = NULL,
- };
+ ret = opcode_empty;
return ret;
}
@@ -927,11 +982,6 @@ struct uuid_first_time_s {
size_t df_index_oldest;
};
-static int journal_metric_compare(const void *key, const void *metric)
-{
- return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid);
-}
-
struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) {
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
@@ -987,7 +1037,10 @@ void find_uuid_first_time(
if (uuid_original_entry->df_matched > 3 || uuid_original_entry->pages_found > 5)
continue;
- struct journal_metric_list *live_entry = bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_compare);
+ struct journal_metric_list *live_entry =
+ bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count,
+ sizeof(*uuid_list), journal_metric_uuid_compare);
+
if (!live_entry) {
// Not found in this journal
not_matching_bsearches++;
@@ -1087,13 +1140,20 @@ void find_uuid_first_time(
}
static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) {
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.metrics_retention_started, 1, __ATOMIC_RELAXED);
-
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS);
struct rrdengine_journalfile *journalfile = datafile_to_delete->journalfile;
struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, NULL, 0, 0);
+
+ if (unlikely(!j2_header)) {
+ if (worker)
+ worker_is_idle();
+ return;
+ }
+
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.metrics_retention_started, 1, __ATOMIC_RELAXED);
+
struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
size_t count = j2_header->metric_count;
@@ -1348,14 +1408,9 @@ static void *cache_evict_tp_worker(struct rrdengine_instance *ctx __maybe_unused
return data;
}
-static void after_prep_query(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
- ;
-}
-
static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) {
- worker_is_busy(UV_EVENT_DBENGINE_QUERY);
PDC *pdc = data;
- rrdeng_prep_query(pdc);
+ rrdeng_prep_query(pdc, true);
return data;
}
@@ -1435,21 +1490,28 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb
worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX);
count = 0;
while (datafile && datafile->fileno != ctx_last_fileno_get(ctx) && datafile->fileno != ctx_last_flush_fileno_get(ctx)) {
+ if(journalfile_v2_data_available(datafile->journalfile)) {
+ // journal file v2 is already there for this datafile
+ datafile = datafile->next;
+ continue;
+ }
netdata_spinlock_lock(&datafile->writers.spinlock);
bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true;
netdata_spinlock_unlock(&datafile->writers.spinlock);
- if(!available)
+ if(!available) {
+ info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno);
+ datafile = datafile->next;
continue;
-
- if (unlikely(!journalfile_v2_data_available(datafile->journalfile))) {
- info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
- pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
- journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
- count++;
}
+ info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
+ pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
+ journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
+
+ count++;
+
datafile = datafile->next;
if (unlikely(!ctx_is_available_for_queries(ctx)))
@@ -1472,10 +1534,6 @@ static void after_do_cache_evict(struct rrdengine_instance *ctx __maybe_unused,
rrdeng_main.evictions_running--;
}
-static void after_extent_read(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
- ;
-}
-
static void after_journal_v2_indexing(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
__atomic_store_n(&ctx->atomic.migration_to_v2_running, false, __ATOMIC_RELAXED);
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
@@ -1604,6 +1662,26 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
return true;
}
+static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker) {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ EPDL *epdl = cmd.data;
+
+ if(from_worker)
+ epdl_find_extent_and_populate_pages(ctx, epdl, true);
+ else
+ work_dispatch(ctx, epdl, NULL, cmd.opcode, extent_read_tp_worker, NULL);
+}
+
+static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker) {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ PDC *pdc = cmd.data;
+
+ if(from_worker)
+ rrdeng_prep_query(pdc, true);
+ else
+ work_dispatch(ctx, pdc, NULL, cmd.opcode, query_prep_tp_worker, NULL);
+}
+
void dbengine_event_loop(void* arg) {
sanity_check();
uv_thread_set_name_np(pthread_self(), "DBENGINE");
@@ -1661,25 +1739,19 @@ void dbengine_event_loop(void* arg) {
/* wait for commands */
do {
worker_is_busy(RRDENG_OPCODE_MAX);
- cmd = rrdeng_deq_cmd();
+ cmd = rrdeng_deq_cmd(RRDENG_OPCODE_NOOP);
opcode = cmd.opcode;
worker_is_busy(opcode);
switch (opcode) {
- case RRDENG_OPCODE_EXTENT_READ: {
- struct rrdengine_instance *ctx = cmd.ctx;
- EPDL *epdl = cmd.data;
- work_dispatch(ctx, epdl, NULL, opcode, extent_read_tp_worker, after_extent_read);
+ case RRDENG_OPCODE_EXTENT_READ:
+ worker_dispatch_extent_read(cmd, false);
break;
- }
- case RRDENG_OPCODE_QUERY: {
- struct rrdengine_instance *ctx = cmd.ctx;
- PDC *pdc = cmd.data;
- work_dispatch(ctx, pdc, NULL, opcode, query_prep_tp_worker, after_prep_query);
+ case RRDENG_OPCODE_QUERY:
+ worker_dispatch_query_prep(cmd, false);
break;
- }
case RRDENG_OPCODE_EXTENT_WRITE: {
struct rrdengine_instance *ctx = cmd.ctx;