summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:34 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:34 +0000
commitd079b656b4719739b2247dcd9d46e9bec793095a (patch)
treed2c950c70a776bcf697c963151c5bd959f8a9f03 /database/engine/rrdengine.c
parentReleasing debian version 1.37.1-2. (diff)
downloadnetdata-d079b656b4719739b2247dcd9d46e9bec793095a.tar.xz
netdata-d079b656b4719739b2247dcd9d46e9bec793095a.zip
Merging upstream version 1.38.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c2634
1 files changed, 1465 insertions, 1169 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index a6840f38..d64868f0 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -2,6 +2,7 @@
#define NETDATA_RRD_INTERNALS
#include "rrdengine.h"
+#include "pdc.h"
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
@@ -11,31 +12,74 @@ rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT;
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2)
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2)
#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2)
#endif
-void *dbengine_page_alloc() {
- void *page = NULL;
- if (unlikely(db_engine_use_malloc))
- page = mallocz(RRDENG_BLOCK_SIZE);
- else {
- page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm);
- if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()");
- }
- return page;
-}
-
-void dbengine_page_free(void *page) {
- if (unlikely(db_engine_use_malloc))
- freez(page);
- else
- netdata_munmap(page, RRDENG_BLOCK_SIZE);
-}
+struct rrdeng_main {
+ uv_thread_t thread;
+ uv_loop_t loop;
+ uv_async_t async;
+ uv_timer_t timer;
+ pid_t tid;
+
+ size_t flushes_running;
+ size_t evictions_running;
+ size_t cleanup_running;
+
+ struct {
+ ARAL *ar;
+
+ struct {
+ SPINLOCK spinlock;
+
+ size_t waiting;
+ struct rrdeng_cmd *waiting_items_by_priority[STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE];
+ size_t executed_by_priority[STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE];
+ } unsafe;
+ } cmd_queue;
+
+ struct {
+ ARAL *ar;
+
+ struct {
+ size_t dispatched;
+ size_t executing;
+ size_t pending_cb;
+ } atomics;
+ } work_cmd;
+
+ struct {
+ ARAL *ar;
+ } handles;
+
+ struct {
+ ARAL *ar;
+ } descriptors;
+
+ struct {
+ ARAL *ar;
+ } xt_io_descr;
+
+} rrdeng_main = {
+ .thread = 0,
+ .loop = {},
+ .async = {},
+ .timer = {},
+ .flushes_running = 0,
+ .evictions_running = 0,
+ .cleanup_running = 0,
+
+ .cmd_queue = {
+ .unsafe = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ },
+ }
+};
static void sanity_check(void)
{
- BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2));
+ BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2));
/* Magic numbers must fit in the super-blocks */
BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
@@ -54,519 +98,489 @@ static void sanity_check(void)
BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
/* extent cache count must fit in 32 bits */
- BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32);
+// BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32);
/* page info scratch space must be able to hold 2 32-bit integers */
BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t));
}
-/* always inserts into tail */
-static inline void xt_cache_replaceQ_insert(struct rrdengine_worker_config* wc,
- struct extent_cache_element *xt_cache_elem)
-{
- struct extent_cache *xt_cache = &wc->xt_cache;
+// ----------------------------------------------------------------------------
+// work request cache
- xt_cache_elem->prev = NULL;
- xt_cache_elem->next = NULL;
+typedef void *(*work_cb)(struct rrdengine_instance *ctx, void *data, struct completion *completion, uv_work_t* req);
+typedef void (*after_work_cb)(struct rrdengine_instance *ctx, void *data, struct completion *completion, uv_work_t* req, int status);
- if (likely(NULL != xt_cache->replaceQ_tail)) {
- xt_cache_elem->prev = xt_cache->replaceQ_tail;
- xt_cache->replaceQ_tail->next = xt_cache_elem;
- }
- if (unlikely(NULL == xt_cache->replaceQ_head)) {
- xt_cache->replaceQ_head = xt_cache_elem;
- }
- xt_cache->replaceQ_tail = xt_cache_elem;
+struct rrdeng_work {
+ uv_work_t req;
+
+ struct rrdengine_instance *ctx;
+ void *data;
+ struct completion *completion;
+
+ work_cb work_cb;
+ after_work_cb after_work_cb;
+ enum rrdeng_opcode opcode;
+};
+
+static void work_request_init(void) {
+ rrdeng_main.work_cmd.ar = aral_create(
+ "dbengine-work-cmd",
+ sizeof(struct rrdeng_work),
+ 0,
+ 65536, NULL,
+ NULL, NULL, false, false
+ );
}
-static inline void xt_cache_replaceQ_delete(struct rrdengine_worker_config* wc,
- struct extent_cache_element *xt_cache_elem)
-{
- struct extent_cache *xt_cache = &wc->xt_cache;
- struct extent_cache_element *prev, *next;
+static inline bool work_request_full(void) {
+ return __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED) >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS);
+}
- prev = xt_cache_elem->prev;
- next = xt_cache_elem->next;
+static inline void work_done(struct rrdeng_work *work_request) {
+ aral_freez(rrdeng_main.work_cmd.ar, work_request);
+}
- if (likely(NULL != prev)) {
- prev->next = next;
- }
- if (likely(NULL != next)) {
- next->prev = prev;
- }
- if (unlikely(xt_cache_elem == xt_cache->replaceQ_head)) {
- xt_cache->replaceQ_head = next;
- }
- if (unlikely(xt_cache_elem == xt_cache->replaceQ_tail)) {
- xt_cache->replaceQ_tail = prev;
- }
- xt_cache_elem->prev = xt_cache_elem->next = NULL;
+static void work_standard_worker(uv_work_t *req) {
+ __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED);
+
+ register_libuv_worker_jobs();
+ worker_is_busy(UV_EVENT_WORKER_INIT);
+
+ struct rrdeng_work *work_request = req->data;
+ work_request->data = work_request->work_cb(work_request->ctx, work_request->data, work_request->completion, req);
+ worker_is_idle();
+
+ __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED);
+
+ // signal the event loop a worker is available
+ fatal_assert(0 == uv_async_send(&rrdeng_main.async));
}
-static inline void xt_cache_replaceQ_set_hot(struct rrdengine_worker_config* wc,
- struct extent_cache_element *xt_cache_elem)
-{
- xt_cache_replaceQ_delete(wc, xt_cache_elem);
- xt_cache_replaceQ_insert(wc, xt_cache_elem);
+static void after_work_standard_callback(uv_work_t* req, int status) {
+ struct rrdeng_work *work_request = req->data;
+
+ worker_is_busy(RRDENG_OPCODE_MAX + work_request->opcode);
+
+ if(work_request->after_work_cb)
+ work_request->after_work_cb(work_request->ctx, work_request->data, work_request->completion, req, status);
+
+ work_done(work_request);
+ __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED);
+
+ worker_is_idle();
}
-/* Returns the index of the cached extent if it was successfully inserted in the extent cache, otherwise -1 */
-static int try_insert_into_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent)
-{
- struct extent_cache *xt_cache = &wc->xt_cache;
- struct extent_cache_element *xt_cache_elem;
- unsigned idx;
- int ret;
+static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb work_cb, after_work_cb after_work_cb) {
+ struct rrdeng_work *work_request = NULL;
- ret = find_first_zero(xt_cache->allocation_bitmap);
- if (-1 == ret || ret >= MAX_CACHED_EXTENTS) {
- for (xt_cache_elem = xt_cache->replaceQ_head ; NULL != xt_cache_elem ; xt_cache_elem = xt_cache_elem->next) {
- idx = xt_cache_elem - xt_cache->extent_array;
- if (!check_bit(xt_cache->inflight_bitmap, idx)) {
- xt_cache_replaceQ_delete(wc, xt_cache_elem);
- break;
- }
- }
- if (NULL == xt_cache_elem)
- return -1;
- } else {
- idx = (unsigned)ret;
- xt_cache_elem = &xt_cache->extent_array[idx];
+ internal_fatal(rrdeng_main.tid != gettid(), "work_dispatch() can only be run from the event loop thread");
+
+ work_request = aral_mallocz(rrdeng_main.work_cmd.ar);
+ memset(work_request, 0, sizeof(struct rrdeng_work));
+ work_request->req.data = work_request;
+ work_request->ctx = ctx;
+ work_request->data = data;
+ work_request->completion = completion;
+ work_request->work_cb = work_cb;
+ work_request->after_work_cb = after_work_cb;
+ work_request->opcode = opcode;
+
+ if(uv_queue_work(&rrdeng_main.loop, &work_request->req, work_standard_worker, after_work_standard_callback)) {
+ internal_fatal(true, "DBENGINE: cannot queue work");
+ work_done(work_request);
+ return false;
}
- xt_cache_elem->extent = extent;
- xt_cache_elem->fileno = extent->datafile->fileno;
- xt_cache_elem->inflight_io_descr = NULL;
- xt_cache_replaceQ_insert(wc, xt_cache_elem);
- modify_bit(&xt_cache->allocation_bitmap, idx, 1);
- return (int)idx;
+ __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED);
+
+ return true;
}
-/**
- * Returns 0 if the cached extent was found in the extent cache, 1 otherwise.
- * Sets *idx to point to the position of the extent inside the cache.
- **/
-static uint8_t lookup_in_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent, unsigned *idx)
-{
- struct extent_cache *xt_cache = &wc->xt_cache;
- struct extent_cache_element *xt_cache_elem;
- unsigned i;
+// ----------------------------------------------------------------------------
+// page descriptor cache
+
+void page_descriptors_init(void) {
+ rrdeng_main.descriptors.ar = aral_create(
+ "dbengine-descriptors",
+ sizeof(struct page_descr_with_data),
+ 0,
+ 65536 * 4,
+ NULL,
+ NULL, NULL, false, false);
+}
- for (i = 0 ; i < MAX_CACHED_EXTENTS ; ++i) {
- xt_cache_elem = &xt_cache->extent_array[i];
- if (check_bit(xt_cache->allocation_bitmap, i) && xt_cache_elem->extent == extent &&
- xt_cache_elem->fileno == extent->datafile->fileno) {
- *idx = i;
- return 0;
- }
- }
- return 1;
+struct page_descr_with_data *page_descriptor_get(void) {
+ struct page_descr_with_data *descr = aral_mallocz(rrdeng_main.descriptors.ar);
+ memset(descr, 0, sizeof(struct page_descr_with_data));
+ return descr;
}
-#if 0 /* disabled code */
-static void delete_from_xt_cache(struct rrdengine_worker_config* wc, unsigned idx)
-{
- struct extent_cache *xt_cache = &wc->xt_cache;
- struct extent_cache_element *xt_cache_elem;
+static inline void page_descriptor_release(struct page_descr_with_data *descr) {
+ aral_freez(rrdeng_main.descriptors.ar, descr);
+}
- xt_cache_elem = &xt_cache->extent_array[idx];
- xt_cache_replaceQ_delete(wc, xt_cache_elem);
- xt_cache_elem->extent = NULL;
- modify_bit(&wc->xt_cache.allocation_bitmap, idx, 0); /* invalidate it */
- modify_bit(&wc->xt_cache.inflight_bitmap, idx, 0); /* not in-flight anymore */
+// ----------------------------------------------------------------------------
+// extent io descriptor cache
+
+static void extent_io_descriptor_init(void) {
+ rrdeng_main.xt_io_descr.ar = aral_create(
+ "dbengine-extent-io",
+ sizeof(struct extent_io_descriptor),
+ 0,
+ 65536,
+ NULL,
+ NULL, NULL, false, false
+ );
}
-#endif
-void enqueue_inflight_read_to_xt_cache(struct rrdengine_worker_config* wc, unsigned idx,
- struct extent_io_descriptor *xt_io_descr)
-{
- struct extent_cache *xt_cache = &wc->xt_cache;
- struct extent_cache_element *xt_cache_elem;
- struct extent_io_descriptor *old_next;
+static struct extent_io_descriptor *extent_io_descriptor_get(void) {
+ struct extent_io_descriptor *xt_io_descr = aral_mallocz(rrdeng_main.xt_io_descr.ar);
+ memset(xt_io_descr, 0, sizeof(struct extent_io_descriptor));
+ return xt_io_descr;
+}
- xt_cache_elem = &xt_cache->extent_array[idx];
- old_next = xt_cache_elem->inflight_io_descr->next;
- xt_cache_elem->inflight_io_descr->next = xt_io_descr;
- xt_io_descr->next = old_next;
+static inline void extent_io_descriptor_release(struct extent_io_descriptor *xt_io_descr) {
+ aral_freez(rrdeng_main.xt_io_descr.ar, xt_io_descr);
}
-void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, struct extent_io_descriptor *xt_io_descr)
-{
- unsigned i, j, page_offset;
- struct rrdengine_instance *ctx = wc->ctx;
- struct rrdeng_page_descr *descr;
- struct page_cache_descr *pg_cache_descr;
- void *page;
- struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
-
- for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
- page = dbengine_page_alloc();
- descr = xt_io_descr->descr_array[i];
- for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) {
- /* care, we don't hold the descriptor mutex */
- if (!uuid_compare(*extent->pages[j]->id, *descr->id) &&
- extent->pages[j]->page_length == descr->page_length &&
- extent->pages[j]->start_time_ut == descr->start_time_ut &&
- extent->pages[j]->end_time_ut == descr->end_time_ut) {
- break;
- }
- page_offset += extent->pages[j]->page_length;
+// ----------------------------------------------------------------------------
+// query handle cache
+
+void rrdeng_query_handle_init(void) {
+ rrdeng_main.handles.ar = aral_create(
+ "dbengine-query-handles",
+ sizeof(struct rrdeng_query_handle),
+ 0,
+ 65536,
+ NULL,
+ NULL, NULL, false, false);
+}
- }
- /* care, we don't hold the descriptor mutex */
- (void) memcpy(page, wc->xt_cache.extent_array[idx].pages + page_offset, descr->page_length);
-
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- pg_cache_descr->page = page;
- pg_cache_descr->flags |= RRD_PAGE_POPULATED;
- pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- pg_cache_replaceQ_insert(ctx, descr);
- if (xt_io_descr->release_descr) {
- pg_cache_put(ctx, descr);
- } else {
- debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
- pg_cache_wake_up_waiters(ctx, descr);
- }
+struct rrdeng_query_handle *rrdeng_query_handle_get(void) {
+ struct rrdeng_query_handle *handle = aral_mallocz(rrdeng_main.handles.ar);
+ memset(handle, 0, sizeof(struct rrdeng_query_handle));
+ return handle;
+}
+
+void rrdeng_query_handle_release(struct rrdeng_query_handle *handle) {
+ aral_freez(rrdeng_main.handles.ar, handle);
+}
+
+// ----------------------------------------------------------------------------
+// WAL cache
+
+static struct {
+ struct {
+ SPINLOCK spinlock;
+ WAL *available_items;
+ size_t available;
+ } protected;
+
+ struct {
+ size_t allocated;
+ } atomics;
+} wal_globals = {
+ .protected = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .available_items = NULL,
+ .available = 0,
+ },
+ .atomics = {
+ .allocated = 0,
+ },
+};
+
+static void wal_cleanup1(void) {
+ WAL *wal = NULL;
+
+ if(!netdata_spinlock_trylock(&wal_globals.protected.spinlock))
+ return;
+
+ if(wal_globals.protected.available_items && wal_globals.protected.available > storage_tiers) {
+ wal = wal_globals.protected.available_items;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next);
+ wal_globals.protected.available--;
}
- if (xt_io_descr->completion)
- completion_mark_complete(xt_io_descr->completion);
- freez(xt_io_descr);
-}
-
-static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) {
- switch(type) {
- case PAGE_METRICS: {
- storage_number n = pack_storage_number(NAN, SN_FLAG_NONE);
- storage_number *array = (storage_number *)page;
- size_t slots = page_length / sizeof(n);
- for(size_t i = 0; i < slots ; i++)
- array[i] = n;
- }
- break;
-
- case PAGE_TIER: {
- storage_number_tier1_t n = {
- .min_value = NAN,
- .max_value = NAN,
- .sum_value = NAN,
- .count = 1,
- .anomaly_count = 0,
- };
- storage_number_tier1_t *array = (storage_number_tier1_t *)page;
- size_t slots = page_length / sizeof(n);
- for(size_t i = 0; i < slots ; i++)
- array[i] = n;
- }
- break;
- default: {
- static bool logged = false;
- if(!logged) {
- error("DBENGINE: cannot fill page with nulls on unknown page type id %d", type);
- logged = true;
- }
- memset(page, 0, page_length);
- }
+ netdata_spinlock_unlock(&wal_globals.protected.spinlock);
+
+ if(wal) {
+ posix_memfree(wal->buf);
+ freez(wal);
+ __atomic_sub_fetch(&wal_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
}
}
-struct rrdeng_page_descr *get_descriptor(struct pg_cache_page_index *page_index, time_t start_time_s)
-{
- uv_rwlock_rdlock(&page_index->lock);
- Pvoid_t *PValue = JudyLGet(page_index->JudyL_array, start_time_s, PJE0);
- struct rrdeng_page_descr *descr = unlikely(NULL == PValue) ? NULL : *PValue;
- uv_rwlock_rdunlock(&page_index->lock);
- return descr;
-};
+WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) {
+ if(!size || size > RRDENG_BLOCK_SIZE)
+ fatal("DBENGINE: invalid WAL size requested");
-static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed)
-{
- struct rrdengine_instance *ctx = wc->ctx;
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_descr *descr;
- struct page_cache_descr *pg_cache_descr;
- int ret;
- unsigned i, j, count;
- void *page, *uncompressed_buf = NULL;
- uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length = 0;
- uint8_t have_read_error = 0;
- /* persistent structures */
- struct rrdeng_df_extent_header *header;
- struct rrdeng_df_extent_trailer *trailer;
- uLong crc;
+ WAL *wal = NULL;
- header = xt_io_descr->buf;
- payload_length = header->payload_length;
- count = header->number_of_pages;
- payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
- trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer);
-
- if (unlikely(read_failed)) {
- struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
-
- ++ctx->stats.io_errors;
- rrd_stat_atomic_add(&global_io_errors, 1);
- have_read_error = 1;
- error("%s: uv_fs_read - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, xt_io_descr->pos,
- xt_io_descr->bytes, datafile->tier, datafile->fileno);
- goto after_crc_check;
+ netdata_spinlock_lock(&wal_globals.protected.spinlock);
+
+ if(likely(wal_globals.protected.available_items)) {
+ wal = wal_globals.protected.available_items;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next);
+ wal_globals.protected.available--;
}
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer));
- ret = crc32cmp(trailer->checksum, crc);
-#ifdef NETDATA_INTERNAL_CHECKS
- {
- struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
- debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
- xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
+
+ uint64_t transaction_id = __atomic_fetch_add(&ctx->atomic.transaction_id, 1, __ATOMIC_RELAXED);
+ netdata_spinlock_unlock(&wal_globals.protected.spinlock);
+
+ if(unlikely(!wal)) {
+ wal = mallocz(sizeof(WAL));
+ wal->buf_size = RRDENG_BLOCK_SIZE;
+ int ret = posix_memalign((void *)&wal->buf, RRDFILE_ALIGNMENT, wal->buf_size);
+ if (unlikely(ret))
+ fatal("DBENGINE: posix_memalign:%s", strerror(ret));
+ __atomic_add_fetch(&wal_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
}
-#endif
- if (unlikely(ret)) {
- struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
- ++ctx->stats.io_errors;
- rrd_stat_atomic_add(&global_io_errors, 1);
- have_read_error = 1;
- error("%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: FAILED", __func__,
- xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ // these need to survive
+ unsigned buf_size = wal->buf_size;
+ void *buf = wal->buf;
+
+ memset(wal, 0, sizeof(WAL));
+
+ // put them back
+ wal->buf_size = buf_size;
+ wal->buf = buf;
+
+ memset(wal->buf, 0, wal->buf_size);
+
+ wal->transaction_id = transaction_id;
+ wal->size = size;
+
+ return wal;
+}
+
+void wal_release(WAL *wal) {
+ if(unlikely(!wal)) return;
+
+ netdata_spinlock_lock(&wal_globals.protected.spinlock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next);
+ wal_globals.protected.available++;
+ netdata_spinlock_unlock(&wal_globals.protected.spinlock);
+}
+
+// ----------------------------------------------------------------------------
+// command queue cache
+
+struct rrdeng_cmd {
+ struct rrdengine_instance *ctx;
+ enum rrdeng_opcode opcode;
+ void *data;
+ struct completion *completion;
+ enum storage_priority priority;
+ dequeue_callback_t dequeue_cb;
+
+ struct {
+ struct rrdeng_cmd *prev;
+ struct rrdeng_cmd *next;
+ } queue;
+};
+
+static void rrdeng_cmd_queue_init(void) {
+ rrdeng_main.cmd_queue.ar = aral_create("dbengine-opcodes",
+ sizeof(struct rrdeng_cmd),
+ 0,
+ 65536,
+ NULL,
+ NULL, NULL, false, false);
+}
+
+static inline STORAGE_PRIORITY rrdeng_enq_cmd_map_opcode_to_priority(enum rrdeng_opcode opcode, STORAGE_PRIORITY priority) {
+ if(unlikely(priority >= STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE))
+ priority = STORAGE_PRIORITY_BEST_EFFORT;
+
+ switch(opcode) {
+ case RRDENG_OPCODE_QUERY:
+ priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP;
+ break;
+
+ default:
+ break;
}
-after_crc_check:
- if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
- uncompressed_payload_length = 0;
- for (i = 0 ; i < count ; ++i) {
- uncompressed_payload_length += header->descr[i].page_length;
+ return priority;
+}
+
+void rrdeng_enqueue_epdl_cmd(struct rrdeng_cmd *cmd) {
+ epdl_cmd_queued(cmd->data, cmd);
+}
+
+void rrdeng_dequeue_epdl_cmd(struct rrdeng_cmd *cmd) {
+ epdl_cmd_dequeued(cmd->data);
+}
+
+void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY priority) {
+ netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+
+ struct rrdeng_cmd *cmd = get_cmd_cb(data);
+ if(cmd) {
+ priority = rrdeng_enq_cmd_map_opcode_to_priority(cmd->opcode, priority);
+
+ if (cmd->priority > priority) {
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[cmd->priority], cmd, queue.prev, queue.next);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next);
+ cmd->priority = priority;
}
- uncompressed_buf = mallocz(uncompressed_payload_length);
- ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf,
- payload_length, uncompressed_payload_length);
- ctx->stats.before_decompress_bytes += payload_length;
- ctx->stats.after_decompress_bytes += ret;
- debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret);
- /* care, we don't hold the descriptor mutex */
}
- {
- uint8_t xt_is_cached = 0;
- unsigned xt_idx;
- struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
-
- xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
- if (xt_is_cached && check_bit(wc->xt_cache.inflight_bitmap, xt_idx)) {
- struct extent_cache *xt_cache = &wc->xt_cache;
- struct extent_cache_element *xt_cache_elem = &xt_cache->extent_array[xt_idx];
- struct extent_io_descriptor *curr, *next;
-
- if (have_read_error) {
- memset(xt_cache_elem->pages, 0, sizeof(xt_cache_elem->pages));
- } else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
- (void)memcpy(xt_cache_elem->pages, xt_io_descr->buf + payload_offset, payload_length);
- } else {
- (void)memcpy(xt_cache_elem->pages, uncompressed_buf, uncompressed_payload_length);
- }
- /* complete all connected in-flight read requests */
- for (curr = xt_cache_elem->inflight_io_descr->next ; curr ; curr = next) {
- next = curr->next;
- read_cached_extent_cb(wc, xt_idx, curr);
+
+ netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+}
+
+void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, void *data, struct completion *completion,
+ enum storage_priority priority, enqueue_callback_t enqueue_cb, dequeue_callback_t dequeue_cb) {
+
+ priority = rrdeng_enq_cmd_map_opcode_to_priority(opcode, priority);
+
+ struct rrdeng_cmd *cmd = aral_mallocz(rrdeng_main.cmd_queue.ar);
+ memset(cmd, 0, sizeof(struct rrdeng_cmd));
+ cmd->ctx = ctx;
+ cmd->opcode = opcode;
+ cmd->data = data;
+ cmd->completion = completion;
+ cmd->priority = priority;
+ cmd->dequeue_cb = dequeue_cb;
+
+ netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next);
+ rrdeng_main.cmd_queue.unsafe.waiting++;
+ if(enqueue_cb)
+ enqueue_cb(cmd);
+ netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+
+ fatal_assert(0 == uv_async_send(&rrdeng_main.async));
+}
+
+static inline bool rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(STORAGE_PRIORITY priority, STORAGE_PRIORITY max_priority) {
+ for(; priority <= max_priority ; priority++)
+ if(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority])
+ return true;
+
+ return false;
+}
+
+static inline struct rrdeng_cmd rrdeng_deq_cmd(void) {
+ struct rrdeng_cmd *cmd = NULL;
+
+ STORAGE_PRIORITY max_priority = work_request_full() ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_BEST_EFFORT;
+
+ // find an opcode to execute from the queue
+ netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ for(STORAGE_PRIORITY priority = STORAGE_PRIORITY_INTERNAL_DBENGINE; priority <= max_priority ; priority++) {
+ cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority];
+ if(cmd) {
+
+ // avoid starvation of lower priorities
+ if(unlikely(priority >= STORAGE_PRIORITY_HIGH &&
+ priority < STORAGE_PRIORITY_BEST_EFFORT &&
+ ++rrdeng_main.cmd_queue.unsafe.executed_by_priority[priority] % 50 == 0 &&
+ rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(priority + 1, max_priority))) {
+ // let the others run 2% of the requests
+ cmd = NULL;
+ continue;
}
- xt_cache_elem->inflight_io_descr = NULL;
- modify_bit(&xt_cache->inflight_bitmap, xt_idx, 0); /* not in-flight anymore */
+
+ // remove it from the queue
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next);
+ rrdeng_main.cmd_queue.unsafe.waiting--;
+ break;
}
}
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, xt_io_descr->descr_array[0]->id, sizeof(uuid_t));
- struct pg_cache_page_index *page_index = likely( NULL != PValue) ? *PValue : NULL;
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
-
-
- for (i = 0, page_offset = 0; i < count; page_offset += header->descr[i++].page_length) {
- uint8_t is_prefetched_page;
- descr = NULL;
- for (j = 0 ; j < xt_io_descr->descr_count; ++j) {
- struct rrdeng_page_descr descrj;
-
- descrj = xt_io_descr->descr_read_array[j];
- /* care, we don't hold the descriptor mutex */
- if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj.id) &&
- header->descr[i].page_length == descrj.page_length &&
- header->descr[i].start_time_ut == descrj.start_time_ut &&
- header->descr[i].end_time_ut == descrj.end_time_ut) {
- //descr = descrj;
- descr = get_descriptor(page_index, (time_t) (descrj.start_time_ut / USEC_PER_SEC));
- if (unlikely(!descr)) {
- error_limit_static_thread_var(erl, 1, 0);
- error_limit(&erl, "%s: Required descriptor is not in the page index anymore", __FUNCTION__);
- }
- break;
- }
- }
- is_prefetched_page = 0;
- if (!descr) { /* This extent page has not been requested. Try populating it for locality (best effort). */
- descr = pg_cache_lookup_unpopulated_and_lock(ctx, (uuid_t *)header->descr[i].uuid,
- header->descr[i].start_time_ut);
- if (!descr)
- continue; /* Failed to reserve a suitable page */
- is_prefetched_page = 1;
- }
- page = dbengine_page_alloc();
-
- /* care, we don't hold the descriptor mutex */
- if (have_read_error) {
- fill_page_with_nulls(page, descr->page_length, descr->type);
- } else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
- (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length);
- } else {
- (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
- }
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- pg_cache_descr->page = page;
- pg_cache_descr->flags |= RRD_PAGE_POPULATED;
- pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- pg_cache_replaceQ_insert(ctx, descr);
- if (xt_io_descr->release_descr || is_prefetched_page) {
- pg_cache_put(ctx, descr);
- } else {
- debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
- pg_cache_wake_up_waiters(ctx, descr);
- }
+ if(cmd && cmd->dequeue_cb) {
+ cmd->dequeue_cb(cmd);
+ cmd->dequeue_cb = NULL;
}
- if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
- freez(uncompressed_buf);
+
+ netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+
+ struct rrdeng_cmd ret;
+ if(cmd) {
+ // copy it, to return it
+ ret = *cmd;
+
+ aral_freez(rrdeng_main.cmd_queue.ar, cmd);
}
- if (xt_io_descr->completion)
- completion_mark_complete(xt_io_descr->completion);
+ else
+ ret = (struct rrdeng_cmd) {
+ .ctx = NULL,
+ .opcode = RRDENG_OPCODE_NOOP,
+ .priority = STORAGE_PRIORITY_BEST_EFFORT,
+ .completion = NULL,
+ .data = NULL,
+ };
+
+ return ret;
}
-static void read_extent_cb(uv_fs_t *req)
-{
- struct rrdengine_worker_config *wc = req->loop->data;
- struct extent_io_descriptor *xt_io_descr;
- xt_io_descr = req->data;
- do_extent_processing(wc, xt_io_descr, req->result < 0);
- uv_fs_req_cleanup(req);
- posix_memfree(xt_io_descr->buf);
- freez(xt_io_descr);
+// ----------------------------------------------------------------------------
+
+struct {
+ ARAL *aral[RRD_STORAGE_TIERS];
+} dbengine_page_alloc_globals = {};
+
+static inline ARAL *page_size_lookup(size_t size) {
+ for(size_t tier = 0; tier < storage_tiers ;tier++)
+ if(size == tier_page_size[tier])
+ return dbengine_page_alloc_globals.aral[tier];
+
+ return NULL;
}
-static void read_mmap_extent_cb(uv_work_t *req, int status __maybe_unused)
-{
- struct rrdengine_worker_config *wc = req->loop->data;
- struct rrdengine_instance *ctx = wc->ctx;
- struct extent_io_descriptor *xt_io_descr;
- xt_io_descr = req->data;
+static void dbengine_page_alloc_init(void) {
+ for(size_t i = storage_tiers; i > 0 ;i--) {
+ size_t tier = storage_tiers - i;
- if (likely(xt_io_descr->map_base)) {
- do_extent_processing(wc, xt_io_descr, false);
- munmap(xt_io_descr->map_base, xt_io_descr->map_length);
- freez(xt_io_descr);
- return;
- }
+ char buf[20 + 1];
+ snprintfz(buf, 20, "tier%zu-pages", tier);
- // MMAP failed, so do uv_fs_read
- int ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(xt_io_descr->bytes));
- if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
+ dbengine_page_alloc_globals.aral[tier] = aral_create(
+ buf,
+ tier_page_size[tier],
+ 64,
+ 512 * tier_page_size[tier],
+ pgc_aral_statistics(),
+ NULL, NULL, false, false);
}
- unsigned real_io_size = ALIGN_BYTES_CEILING( xt_io_descr->bytes);
- xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
- xt_io_descr->req.data = xt_io_descr;
- ret = uv_fs_read(req->loop, &xt_io_descr->req, xt_io_descr->file, &xt_io_descr->iov, 1, (unsigned) xt_io_descr->pos, read_extent_cb);
- fatal_assert(-1 != ret);
- ctx->stats.io_read_bytes += real_io_size;
- ctx->stats.io_read_extent_bytes += real_io_size;
}
-static void do_mmap_read_extent(uv_work_t *req)
-{
- struct extent_io_descriptor *xt_io_descr = (struct extent_io_descriptor * )req->data;
- struct rrdengine_worker_config *wc = req->loop->data;
- struct rrdengine_instance *ctx = wc->ctx;
-
- off_t map_start = ALIGN_BYTES_FLOOR(xt_io_descr->pos);
- size_t length = ALIGN_BYTES_CEILING(xt_io_descr->pos + xt_io_descr->bytes) - map_start;
- unsigned real_io_size = xt_io_descr->bytes;
-
- void *data = mmap(NULL, length, PROT_READ, MAP_SHARED, xt_io_descr->file, map_start);
- if (likely(data != MAP_FAILED)) {
- xt_io_descr->map_base = data;
- xt_io_descr->map_length = length;
- xt_io_descr->buf = data + (xt_io_descr->pos - map_start);
- ctx->stats.io_read_bytes += real_io_size;
- ctx->stats.io_read_extent_bytes += real_io_size;
- }
+void *dbengine_page_alloc(size_t size) {
+ ARAL *ar = page_size_lookup(size);
+ if(ar) return aral_mallocz(ar);
+
+ return mallocz(size);
}
-static void do_read_extent(struct rrdengine_worker_config* wc,
- struct rrdeng_page_descr **descr,
- unsigned count,
- uint8_t release_descr)
-{
- struct rrdengine_instance *ctx = wc->ctx;
- struct page_cache_descr *pg_cache_descr;
- int ret;
- unsigned i, size_bytes, pos;
- struct extent_io_descriptor *xt_io_descr;
- struct rrdengine_datafile *datafile;
- struct extent_info *extent = descr[0]->extent;
- uint8_t xt_is_cached = 0, xt_is_inflight = 0;
- unsigned xt_idx;
-
- datafile = extent->datafile;
- pos = extent->offset;
- size_bytes = extent->size;
-
- xt_io_descr = callocz(1, sizeof(*xt_io_descr));
- for (i = 0 ; i < count; ++i) {
- rrdeng_page_descr_mutex_lock(ctx, descr[i]);
- pg_cache_descr = descr[i]->pg_cache_descr;
- pg_cache_descr->flags |= RRD_PAGE_READ_PENDING;
- rrdeng_page_descr_mutex_unlock(ctx, descr[i]);
- xt_io_descr->descr_array[i] = descr[i];
- xt_io_descr->descr_read_array[i] = *(descr[i]);
- }
- xt_io_descr->descr_count = count;
- xt_io_descr->file = datafile->file;
- xt_io_descr->bytes = size_bytes;
- xt_io_descr->pos = pos;
- xt_io_descr->req_worker.data = xt_io_descr;
- xt_io_descr->completion = NULL;
- xt_io_descr->release_descr = release_descr;
- xt_io_descr->buf = NULL;
-
- xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
- if (xt_is_cached) {
- xt_cache_replaceQ_set_hot(wc, &wc->xt_cache.extent_array[xt_idx]);
- xt_is_inflight = check_bit(wc->xt_cache.inflight_bitmap, xt_idx);
- if (xt_is_inflight) {
- enqueue_inflight_read_to_xt_cache(wc, xt_idx, xt_io_descr);
- return;
- }
- return read_cached_extent_cb(wc, xt_idx, xt_io_descr);
- } else {
- ret = try_insert_into_xt_cache(wc, extent);
- if (-1 != ret) {
- xt_idx = (unsigned)ret;
- modify_bit(&wc->xt_cache.inflight_bitmap, xt_idx, 1);
- wc->xt_cache.extent_array[xt_idx].inflight_io_descr = xt_io_descr;
- }
- }
+void dbengine_page_free(void *page, size_t size __maybe_unused) {
+ if(unlikely(!page || page == DBENGINE_EMPTY_PAGE))
+ return;
- ret = uv_queue_work(wc->loop, &xt_io_descr->req_worker, do_mmap_read_extent, read_mmap_extent_cb);
- fatal_assert(-1 != ret);
+ ARAL *ar = page_size_lookup(size);
+ if(ar)
+ aral_freez(ar, page);
+ else
+ freez(page);
+}
- ++ctx->stats.io_read_requests;
- ++ctx->stats.io_read_extents;
- ctx->stats.pg_cache_backfills += count;
+// ----------------------------------------------------------------------------
+
+void *dbengine_extent_alloc(size_t size) {
+ void *extent = mallocz(size);
+ return extent;
}
-static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent_io_descriptor *xt_io_descr)
-{
- struct rrdengine_instance *ctx = wc->ctx;
+void dbengine_extent_free(void *extent, size_t size __maybe_unused) {
+ freez(extent);
+}
+
+static void journalfile_extent_build(struct rrdengine_instance *ctx, struct extent_io_descriptor *xt_io_descr) {
unsigned count, payload_length, descr_size, size_bytes;
void *buf;
/* persistent structures */
@@ -582,12 +596,13 @@ static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent
payload_length = sizeof(*jf_metric_data) + descr_size;
size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
- buf = wal_get_transaction_buffer(wc, size_bytes);
+ xt_io_descr->wal = wal_get(ctx, size_bytes);
+ buf = xt_io_descr->wal->buf;
jf_header = buf;
jf_header->type = STORE_DATA;
jf_header->reserved = 0;
- jf_header->id = ctx->commit_log.transaction_id++;
+ jf_header->id = xt_io_descr->wal->transaction_id;
jf_header->payload_length = payload_length;
jf_metric_data = buf + sizeof(*jf_header);
@@ -602,265 +617,210 @@ static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent
crc32set(jf_trailer->checksum, crc);
}
-static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t type, void *data)
-{
- switch (type) {
- case STORE_DATA:
- commit_data_extent(wc, (struct extent_io_descriptor *)data);
- break;
- default:
- fatal_assert(type == STORE_DATA);
- break;
- }
-}
+static void after_extent_flushed_to_open(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
+ if(completion)
+ completion_mark_complete(completion);
-static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
-{
- int error;
-
- error = uv_thread_join(wc->now_invalidating_dirty_pages);
- if (error) {
- error("uv_thread_join(): %s", uv_strerror(error));
- }
- freez(wc->now_invalidating_dirty_pages);
- wc->now_invalidating_dirty_pages = NULL;
- wc->cleanup_thread_invalidating_dirty_pages = 0;
+ if(ctx_is_available_for_queries(ctx))
+ rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
}
-static void invalidate_oldest_committed(void *arg)
-{
- struct rrdengine_instance *ctx = arg;
- struct rrdengine_worker_config *wc = &ctx->worker_config;
- struct page_cache *pg_cache = &ctx->pg_cache;
- int ret;
- struct rrdeng_page_descr *descr;
- struct page_cache_descr *pg_cache_descr;
- Pvoid_t *PValue;
- Word_t Index;
- unsigned nr_committed_pages;
+static void *extent_flushed_to_open_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
+ worker_is_busy(UV_EVENT_DBENGINE_FLUSHED_TO_OPEN);
- do {
- uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
- for (Index = 0,
- PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ uv_fs_t *uv_fs_request = data;
+ struct extent_io_descriptor *xt_io_descr = uv_fs_request->data;
+ struct page_descr_with_data *descr;
+ struct rrdengine_datafile *datafile;
+ unsigned i;
- descr != NULL;
+ datafile = xt_io_descr->datafile;
- PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue) {
- fatal_assert(0 != descr->page_length);
+ bool still_running = ctx_is_available_for_queries(ctx);
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) {
- rrdeng_page_descr_mutex_unlock(ctx, descr);
+ for (i = 0 ; i < xt_io_descr->descr_count ; ++i) {
+ descr = xt_io_descr->descr_array[i];
- ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
- fatal_assert(1 == ret);
- break;
- }
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- }
- uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ if (likely(still_running))
+ pgc_open_add_hot_page(
+ (Word_t)ctx, descr->metric_id,
+ (time_t) (descr->start_time_ut / USEC_PER_SEC),
+ (time_t) (descr->end_time_ut / USEC_PER_SEC),
+ descr->update_every_s,
+ datafile,
+ xt_io_descr->pos, xt_io_descr->bytes, descr->page_length);
- if (!descr) {
- info("Failed to invalidate any dirty pages to relieve page cache pressure.");
+ page_descriptor_release(descr);
+ }
- goto out;
- }
- pg_cache_punch_hole(ctx, descr, 1, 1, NULL);
+ uv_fs_req_cleanup(uv_fs_request);
+ posix_memfree(xt_io_descr->buf);
+ extent_io_descriptor_release(xt_io_descr);
- uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
- nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages;
- uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
- rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1);
- rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1);
+ netdata_spinlock_lock(&datafile->writers.spinlock);
+ datafile->writers.flushed_to_open_running--;
+ netdata_spinlock_unlock(&datafile->writers.spinlock);
- } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx));
-out:
- wc->cleanup_thread_invalidating_dirty_pages = 1;
- /* wake up event loop */
- fatal_assert(0 == uv_async_send(&wc->async));
-}
+ if(datafile->fileno != ctx_last_fileno_get(ctx) && still_running)
+ // we just finished a flushing on a datafile that is not the active one
+ rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
-void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
-{
- struct rrdengine_instance *ctx = wc->ctx;
- struct page_cache *pg_cache = &ctx->pg_cache;
- unsigned nr_committed_pages;
- int error;
+ return data;
+}
- if (unlikely(ctx->quiesce != NO_QUIESCE)) /* Shutting down */
- return;
+// Main event loop callback
+static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) {
+ worker_is_busy(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_WRITE);
- uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
- nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
- uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
+ struct extent_io_descriptor *xt_io_descr = uv_fs_request->data;
+ struct rrdengine_datafile *datafile = xt_io_descr->datafile;
+ struct rrdengine_instance *ctx = datafile->ctx;
- if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
- /* delete the oldest page in memory */
- if (wc->now_invalidating_dirty_pages) {
- /* already deleting a page */
- return;
- }
- errno = 0;
- error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
- "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path);
-
- wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages));
- wc->cleanup_thread_invalidating_dirty_pages = 0;
-
- error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx);
- if (error) {
- error("uv_thread_create(): %s", uv_strerror(error));
- freez(wc->now_invalidating_dirty_pages);
- wc->now_invalidating_dirty_pages = NULL;
- }
+ if (uv_fs_request->result < 0) {
+ ctx_io_error(ctx);
+ error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result));
}
+
+ journalfile_v1_extent_write(ctx, xt_io_descr->datafile, xt_io_descr->wal, &rrdeng_main.loop);
+
+ netdata_spinlock_lock(&datafile->writers.spinlock);
+ datafile->writers.running--;
+ datafile->writers.flushed_to_open_running++;
+ netdata_spinlock_unlock(&datafile->writers.spinlock);
+
+ rrdeng_enq_cmd(xt_io_descr->ctx,
+ RRDENG_OPCODE_FLUSHED_TO_OPEN,
+ uv_fs_request,
+ xt_io_descr->completion,
+ STORAGE_PRIORITY_INTERNAL_DBENGINE,
+ NULL,
+ NULL);
+
+ worker_is_idle();
}
-void flush_pages_cb(uv_fs_t* req)
-{
- struct rrdengine_worker_config* wc = req->loop->data;
- struct rrdengine_instance *ctx = wc->ctx;
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct extent_io_descriptor *xt_io_descr;
- struct rrdeng_page_descr *descr;
- struct page_cache_descr *pg_cache_descr;
- unsigned i, count;
-
- xt_io_descr = req->data;
- if (req->result < 0) {
- ++ctx->stats.io_errors;
- rrd_stat_atomic_add(&global_io_errors, 1);
- error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
- }
-#ifdef NETDATA_INTERNAL_CHECKS
- {
- struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
- debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
- __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
- }
-#endif
- count = xt_io_descr->descr_count;
- for (i = 0 ; i < count ; ++i) {
- /* care, we don't hold the descriptor mutex */
- descr = xt_io_descr->descr_array[i];
+static bool datafile_is_full(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) {
+ bool ret = false;
+ netdata_spinlock_lock(&datafile->writers.spinlock);
- pg_cache_replaceQ_insert(ctx, descr);
+ if(ctx_is_available_for_queries(ctx) && datafile->pos > rrdeng_target_data_file_size(ctx))
+ ret = true;
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING);
- /* wake up waiters, care no reference being held */
- pg_cache_wake_up_waiters_unsafe(descr);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- }
- if (xt_io_descr->completion)
- completion_mark_complete(xt_io_descr->completion);
- uv_fs_req_cleanup(req);
- posix_memfree(xt_io_descr->buf);
- freez(xt_io_descr);
+ netdata_spinlock_unlock(&datafile->writers.spinlock);
+
+ return ret;
+}
+
+static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_instance *ctx) {
+ struct rrdengine_datafile *datafile;
- uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
- pg_cache->committed_page_index.nr_committed_pages -= count;
- uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
- wc->inflight_dirty_pages -= count;
+ // get the latest datafile
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+ datafile = ctx->datafiles.first->prev;
+ // become a writer on this datafile, to prevent it from vanishing
+ netdata_spinlock_lock(&datafile->writers.spinlock);
+ datafile->writers.running++;
+ netdata_spinlock_unlock(&datafile->writers.spinlock);
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+
+ if(datafile_is_full(ctx, datafile)) {
+ // remember the datafile we have become writers to
+ struct rrdengine_datafile *old_datafile = datafile;
+
+ // only 1 datafile creation at a time
+ static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
+ netdata_mutex_lock(&mutex);
+
+ // take the latest datafile again - without this, multiple threads may create multiple files
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+ datafile = ctx->datafiles.first->prev;
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+
+ if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx) == 0)
+ rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL,
+ NULL);
+
+ netdata_mutex_unlock(&mutex);
+
+ // get the new latest datafile again, like above
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+ datafile = ctx->datafiles.first->prev;
+ // become a writer on this datafile, to prevent it from vanishing
+ netdata_spinlock_lock(&datafile->writers.spinlock);
+ datafile->writers.running++;
+ netdata_spinlock_unlock(&datafile->writers.spinlock);
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+
+ // release the writers on the old datafile
+ netdata_spinlock_lock(&old_datafile->writers.spinlock);
+ old_datafile->writers.running--;
+ netdata_spinlock_unlock(&old_datafile->writers.spinlock);
+ }
+
+ return datafile;
}
/*
- * completion must be NULL or valid.
- * Returns 0 when no flushing can take place.
- * Returns datafile bytes to be written on successful flushing initiation.
+ * Take a page list in a judy array and write them
*/
-static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct completion *completion)
-{
- struct rrdengine_instance *ctx = wc->ctx;
- struct page_cache *pg_cache = &ctx->pg_cache;
+static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) {
int ret;
int compressed_size, max_compressed_size = 0;
unsigned i, count, size_bytes, pos, real_io_size;
uint32_t uncompressed_payload_length, payload_offset;
- struct rrdeng_page_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
- struct page_cache_descr *pg_cache_descr;
+ struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
struct extent_io_descriptor *xt_io_descr;
+ struct extent_buffer *eb = NULL;
void *compressed_buf = NULL;
- Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
- Pvoid_t *PValue;
Word_t Index;
- uint8_t compression_algorithm = ctx->global_compress_alg;
- struct extent_info *extent;
+ uint8_t compression_algorithm = ctx->config.global_compress_alg;
struct rrdengine_datafile *datafile;
/* persistent structures */
struct rrdeng_df_extent_header *header;
struct rrdeng_df_extent_trailer *trailer;
uLong crc;
- if (force) {
- debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure.");
- }
- uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
- for (Index = 0, count = 0, uncompressed_payload_length = 0,
- PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue ;
-
- descr != NULL && count != rrdeng_pages_per_extent;
-
- PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue) {
- uint8_t page_write_pending;
-
- fatal_assert(0 != descr->page_length);
- page_write_pending = 0;
-
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) {
- page_write_pending = 1;
- /* care, no reference being held */
- pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING;
- uncompressed_payload_length += descr->page_length;
- descr_commit_idx_array[count] = Index;
- eligible_pages[count++] = descr;
- }
- rrdeng_page_descr_mutex_unlock(ctx, descr);
+ for(descr = base, Index = 0, count = 0, uncompressed_payload_length = 0;
+ descr && count != rrdeng_pages_per_extent;
+ descr = descr->link.next, Index++) {
+
+ uncompressed_payload_length += descr->page_length;
+ eligible_pages[count++] = descr;
- if (page_write_pending) {
- ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
- fatal_assert(1 == ret);
- }
}
- uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
if (!count) {
- debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__);
if (completion)
completion_mark_complete(completion);
- return 0;
+
+ __atomic_sub_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
+ return NULL;
}
- wc->inflight_dirty_pages += count;
- xt_io_descr = mallocz(sizeof(*xt_io_descr));
+ xt_io_descr = extent_io_descriptor_get();
+ xt_io_descr->ctx = ctx;
payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
switch (compression_algorithm) {
- case RRD_NO_COMPRESSION:
- size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
- break;
- default: /* Compress */
- fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
- max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
- compressed_buf = mallocz(max_compressed_size);
- size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
- break;
+ case RRD_NO_COMPRESSION:
+ size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
+ break;
+
+ default: /* Compress */
+ fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
+ max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
+ eb = extent_buffer_get(max_compressed_size);
+ compressed_buf = eb->data;
+ size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
+ break;
}
+
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
+ fatal("DBENGINE: posix_memalign:%s", strerror(ret));
/* freez(xt_io_descr);*/
}
memset(xt_io_descr->buf, 0, ALIGN_BYTES_CEILING(size_bytes));
- (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count);
+ (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct page_descr_with_data *) * count);
xt_io_descr->descr_count = count;
pos = 0;
@@ -869,17 +829,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
header->number_of_pages = count;
pos += sizeof(*header);
- extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
- datafile = ctx->datafiles.last; /* TODO: check for exceeded size quota */
- extent->offset = datafile->pos;
- extent->number_of_pages = count;
- extent->datafile = datafile;
- extent->next = NULL;
-
for (i = 0 ; i < count ; ++i) {
- /* This is here for performance reasons */
- xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i];
-
descr = xt_io_descr->descr_array[i];
header->descr[i].type = descr->type;
uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
@@ -890,35 +840,40 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
}
for (i = 0 ; i < count ; ++i) {
descr = xt_io_descr->descr_array[i];
- /* care, we don't hold the descriptor mutex */
- (void) memcpy(xt_io_descr->buf + pos, descr->pg_cache_descr->page, descr->page_length);
- descr->extent = extent;
- extent->pages[i] = descr;
-
+ (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length);
pos += descr->page_length;
}
- df_extent_insert(extent);
- switch (compression_algorithm) {
- case RRD_NO_COMPRESSION:
- header->payload_length = uncompressed_payload_length;
- break;
- default: /* Compress */
- compressed_size = LZ4_compress_default(xt_io_descr->buf + payload_offset, compressed_buf,
- uncompressed_payload_length, max_compressed_size);
- ctx->stats.before_compress_bytes += uncompressed_payload_length;
- ctx->stats.after_compress_bytes += compressed_size;
- debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size);
+ if(likely(compression_algorithm == RRD_LZ4)) {
+ compressed_size = LZ4_compress_default(
+ xt_io_descr->buf + payload_offset,
+ compressed_buf,
+ (int)uncompressed_payload_length,
+ max_compressed_size);
+
+ __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED);
+
(void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
- freez(compressed_buf);
+ extent_buffer_release(eb);
size_bytes = payload_offset + compressed_size + sizeof(*trailer);
header->payload_length = compressed_size;
- break;
}
- extent->size = size_bytes;
- xt_io_descr->bytes = size_bytes;
+ else { // RRD_NO_COMPRESSION
+ header->payload_length = uncompressed_payload_length;
+ }
+
+ real_io_size = ALIGN_BYTES_CEILING(size_bytes);
+
+ datafile = get_datafile_to_write_extent(ctx);
+ netdata_spinlock_lock(&datafile->writers.spinlock);
+ xt_io_descr->datafile = datafile;
xt_io_descr->pos = datafile->pos;
- xt_io_descr->req.data = xt_io_descr;
+ datafile->pos += real_io_size;
+ netdata_spinlock_unlock(&datafile->writers.spinlock);
+
+ xt_io_descr->bytes = size_bytes;
+ xt_io_descr->uv_fs_request.data = xt_io_descr;
xt_io_descr->completion = completion;
trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer);
@@ -926,324 +881,508 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer));
crc32set(trailer->checksum, crc);
- real_io_size = ALIGN_BYTES_CEILING(size_bytes);
xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
- ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb);
- fatal_assert(-1 != ret);
- ctx->stats.io_write_bytes += real_io_size;
- ++ctx->stats.io_write_requests;
- ctx->stats.io_write_extent_bytes += real_io_size;
- ++ctx->stats.io_write_extents;
- do_commit_transaction(wc, STORE_DATA, xt_io_descr);
- datafile->pos += ALIGN_BYTES_CEILING(size_bytes);
- ctx->disk_space += ALIGN_BYTES_CEILING(size_bytes);
- rrdeng_test_quota(wc);
+ journalfile_extent_build(ctx, xt_io_descr);
+
+ ctx_last_flush_fileno_set(ctx, datafile->fileno);
+ ctx_current_disk_space_increase(ctx, real_io_size);
+ ctx_io_write_op_bytes(ctx, real_io_size);
- return ALIGN_BYTES_CEILING(size_bytes);
+ return xt_io_descr;
}
-static void after_delete_old_data(struct rrdengine_worker_config* wc)
-{
- struct rrdengine_instance *ctx = wc->ctx;
- struct rrdengine_datafile *datafile;
- struct rrdengine_journalfile *journalfile;
- unsigned deleted_bytes, journalfile_bytes, datafile_bytes;
- int ret, error;
- char path[RRDENG_PATH_MAX];
+static void after_extent_write(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* uv_work_req __maybe_unused, int status __maybe_unused) {
+ struct extent_io_descriptor *xt_io_descr = data;
- datafile = ctx->datafiles.first;
- journalfile = datafile->journalfile;
- datafile_bytes = datafile->pos;
- journalfile_bytes = journalfile->pos;
- deleted_bytes = 0;
+ if(xt_io_descr) {
+ int ret = uv_fs_write(&rrdeng_main.loop,
+ &xt_io_descr->uv_fs_request,
+ xt_io_descr->datafile->file,
+ &xt_io_descr->iov,
+ 1,
+ (int64_t) xt_io_descr->pos,
+ after_extent_write_datafile_io);
- info("Deleting data and journal file pair.");
- datafile_list_delete(ctx, datafile);
- ret = destroy_journal_file(journalfile, datafile);
- if (!ret) {
- generate_journalfilepath(datafile, path, sizeof(path));
- info("Deleted journal file \"%s\".", path);
- deleted_bytes += journalfile_bytes;
- }
- ret = destroy_data_file(datafile);
- if (!ret) {
- generate_datafilepath(datafile, path, sizeof(path));
- info("Deleted data file \"%s\".", path);
- deleted_bytes += datafile_bytes;
+ fatal_assert(-1 != ret);
}
- freez(journalfile);
- freez(datafile);
+}
- ctx->disk_space -= deleted_bytes;
- info("Reclaimed %u bytes of disk space.", deleted_bytes);
+static void *extent_write_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
+ worker_is_busy(UV_EVENT_DBENGINE_EXTENT_WRITE);
+ struct page_descr_with_data *base = data;
+ struct extent_io_descriptor *xt_io_descr = datafile_extent_build(ctx, base, completion);
+ return xt_io_descr;
+}
- error = uv_thread_join(wc->now_deleting_files);
- if (error) {
- error("uv_thread_join(): %s", uv_strerror(error));
- }
- freez(wc->now_deleting_files);
- /* unfreeze command processing */
- wc->now_deleting_files = NULL;
+static void after_database_rotate(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
+ __atomic_store_n(&ctx->atomic.now_deleting_files, false, __ATOMIC_RELAXED);
+}
- wc->cleanup_thread_deleting_files = 0;
- rrdcontext_db_rotation();
+struct uuid_first_time_s {
+ uuid_t *uuid;
+ time_t first_time_s;
+ METRIC *metric;
+ size_t pages_found;
+ size_t df_matched;
+ size_t df_index_oldest;
+};
- /* interrupt event loop */
- uv_stop(wc->loop);
+static int journal_metric_compare(const void *key, const void *metric)
+{
+ return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid);
}
-static void delete_old_data(void *arg)
-{
- struct rrdengine_instance *ctx = arg;
- struct rrdengine_worker_config* wc = &ctx->worker_config;
- struct rrdengine_datafile *datafile;
- struct extent_info *extent, *next;
- struct rrdeng_page_descr *descr;
- unsigned count, i;
- uint8_t can_delete_metric;
- uuid_t metric_id;
-
- /* Safe to use since it will be deleted after we are done */
- datafile = ctx->datafiles.first;
-
- for (extent = datafile->extents.first ; extent != NULL ; extent = next) {
- count = extent->number_of_pages;
- for (i = 0 ; i < count ; ++i) {
- descr = extent->pages[i];
- can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id);
- if (unlikely(can_delete_metric)) {
- /*
- * If the metric is empty, has no active writers and if the metadata log has been initialized then
- * attempt to delete the corresponding netdata dimension.
- */
- metaqueue_delete_dimension_uuid(&metric_id);
- }
- }
- next = extent->next;
- freez(extent);
- }
- wc->cleanup_thread_deleting_files = 1;
- /* wake up event loop */
- fatal_assert(0 == uv_async_send(&wc->async));
+struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) {
+
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+
+ struct rrdengine_datafile *next_datafile = datafile->next;
+
+ while(next_datafile && !datafile_acquire(next_datafile, DATAFILE_ACQUIRE_RETENTION))
+ next_datafile = next_datafile->next;
+
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+
+ datafile_release(datafile, DATAFILE_ACQUIRE_RETENTION);
+
+ return next_datafile;
}
-void rrdeng_test_quota(struct rrdengine_worker_config* wc)
+void find_uuid_first_time(
+ struct rrdengine_instance *ctx,
+ struct rrdengine_datafile *datafile,
+ struct uuid_first_time_s *uuid_first_entry_list,
+ size_t count)
{
- struct rrdengine_instance *ctx = wc->ctx;
- struct rrdengine_datafile *datafile;
- unsigned current_size, target_size;
- uint8_t out_of_space, only_one_datafile;
- int ret, error;
-
- out_of_space = 0;
- /* Do not allow the pinned pages to exceed the disk space quota to avoid deadlocks */
- if (unlikely(ctx->disk_space > MAX(ctx->max_disk_space, 2 * ctx->metric_API_max_producers * RRDENG_BLOCK_SIZE))) {
- out_of_space = 1;
- }
- datafile = ctx->datafiles.last;
- current_size = datafile->pos;
- target_size = ctx->max_disk_space / TARGET_DATAFILES;
- target_size = MIN(target_size, MAX_DATAFILE_SIZE);
- target_size = MAX(target_size, MIN_DATAFILE_SIZE);
- only_one_datafile = (datafile == ctx->datafiles.first) ? 1 : 0;
- if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) {
- /* Finalize data and journal file and create a new pair */
- wal_flush_transaction_buffer(wc);
- ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1);
- if (likely(!ret)) {
- ++ctx->last_fileno;
+ // acquire the datafile to work with it
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+ while(datafile && !datafile_acquire(datafile, DATAFILE_ACQUIRE_RETENTION))
+ datafile = datafile->next;
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+
+ if (unlikely(!datafile))
+ return;
+
+ unsigned journalfile_count = 0;
+ size_t binary_match = 0;
+ size_t not_matching_bsearches = 0;
+
+ while (datafile) {
+ struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL, 0, 0);
+ if (!j2_header) {
+ datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile);
+ continue;
+ }
+
+ time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
+ struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
+ struct uuid_first_time_s *uuid_original_entry;
+
+ size_t journal_metric_count = j2_header->metric_count;
+
+ for (size_t index = 0; index < count; ++index) {
+ uuid_original_entry = &uuid_first_entry_list[index];
+
+ // Check here if we should skip this
+ if (uuid_original_entry->df_matched > 3 || uuid_original_entry->pages_found > 5)
+ continue;
+
+ struct journal_metric_list *live_entry = bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_compare);
+ if (!live_entry) {
+ // Not found in this journal
+ not_matching_bsearches++;
+ continue;
+ }
+
+ uuid_original_entry->pages_found += live_entry->entries;
+ uuid_original_entry->df_matched++;
+
+ time_t old_first_time_s = uuid_original_entry->first_time_s;
+
+ // Calculate first / last for this match
+ time_t first_time_s = live_entry->delta_start_s + journal_start_time_s;
+ uuid_original_entry->first_time_s = MIN(uuid_original_entry->first_time_s, first_time_s);
+
+ if (uuid_original_entry->first_time_s != old_first_time_s)
+ uuid_original_entry->df_index_oldest = uuid_original_entry->df_matched;
+
+ binary_match++;
}
+
+ journalfile_count++;
+ journalfile_v2_data_release(datafile->journalfile);
+ datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile);
}
- if (unlikely(out_of_space && NO_QUIESCE == ctx->quiesce)) {
- /* delete old data */
- if (wc->now_deleting_files) {
- /* already deleting data */
- return;
+
+ // Let's scan the open cache for almost exact match
+ size_t open_cache_count = 0;
+
+ size_t df_index[10] = { 0 };
+ size_t without_metric = 0;
+ size_t open_cache_gave_first_time_s = 0;
+ size_t metric_count = 0;
+ size_t without_retention = 0;
+ size_t not_needed_bsearches = 0;
+
+ for (size_t index = 0; index < count; ++index) {
+ struct uuid_first_time_s *uuid_first_t_entry = &uuid_first_entry_list[index];
+
+ metric_count++;
+
+ size_t idx = uuid_first_t_entry->df_index_oldest;
+ if(idx >= 10)
+ idx = 9;
+
+ df_index[idx]++;
+
+ not_needed_bsearches += uuid_first_t_entry->df_matched - uuid_first_t_entry->df_index_oldest;
+
+ if (unlikely(!uuid_first_t_entry->metric)) {
+ without_metric++;
+ continue;
}
- if (NULL == ctx->datafiles.first->next) {
- error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\""
- " to reclaim space, there are no other file pairs left.",
- ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
- return;
+
+ PGC_PAGE *page = pgc_page_get_and_acquire(
+ open_cache, (Word_t)ctx,
+ (Word_t)uuid_first_t_entry->metric, 0,
+ PGC_SEARCH_FIRST);
+
+ if (page) {
+ time_t old_first_time_s = uuid_first_t_entry->first_time_s;
+
+ time_t first_time_s = pgc_page_start_time_s(page);
+ uuid_first_t_entry->first_time_s = MIN(uuid_first_t_entry->first_time_s, first_time_s);
+ pgc_page_release(open_cache, page);
+ open_cache_count++;
+
+ if(uuid_first_t_entry->first_time_s != old_first_time_s) {
+ open_cache_gave_first_time_s++;
+ }
}
- info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
- ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
- wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files));
- wc->cleanup_thread_deleting_files = 0;
-
- error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx);
- if (error) {
- error("uv_thread_create(): %s", uv_strerror(error));
- freez(wc->now_deleting_files);
- wc->now_deleting_files = NULL;
+ else {
+ if(!uuid_first_t_entry->df_index_oldest)
+ without_retention++;
}
}
+ internal_error(true,
+ "DBENGINE: analyzed the retention of %zu rotated metrics of tier %d, "
+ "did %zu jv2 matching binary searches (%zu not matching, %zu overflown) in %u journal files, "
+ "%zu metrics with entries in open cache, "
+ "metrics first time found per datafile index ([not in jv2]:%zu, [1]:%zu, [2]:%zu, [3]:%zu, [4]:%zu, [5]:%zu, [6]:%zu, [7]:%zu, [8]:%zu, [bigger]: %zu), "
+ "open cache found first time %zu, "
+ "metrics without any remaining retention %zu, "
+ "metrics not in MRG %zu",
+ metric_count,
+ ctx->config.tier,
+ binary_match,
+ not_matching_bsearches,
+ not_needed_bsearches,
+ journalfile_count,
+ open_cache_count,
+ df_index[0], df_index[1], df_index[2], df_index[3], df_index[4], df_index[5], df_index[6], df_index[7], df_index[8], df_index[9],
+ open_cache_gave_first_time_s,
+ without_retention,
+ without_metric
+ );
}
-static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc)
-{
- if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) {
- return 1;
+static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) {
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.metrics_retention_started, 1, __ATOMIC_RELAXED);
+
+ if(worker)
+ worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS);
+
+ struct rrdengine_journalfile *journalfile = datafile_to_delete->journalfile;
+ struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, NULL, 0, 0);
+ struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
+
+ size_t count = j2_header->metric_count;
+ struct uuid_first_time_s *uuid_first_t_entry;
+ struct uuid_first_time_s *uuid_first_entry_list = callocz(count, sizeof(struct uuid_first_time_s));
+
+ size_t added = 0;
+ for (size_t index = 0; index < count; ++index) {
+ METRIC *metric = mrg_metric_get_and_acquire(main_mrg, &uuid_list[index].uuid, (Word_t) ctx);
+ if (!metric)
+ continue;
+
+ uuid_first_entry_list[added].metric = metric;
+ uuid_first_entry_list[added].first_time_s = LONG_MAX;
+ uuid_first_entry_list[added].df_matched = 0;
+ uuid_first_entry_list[added].df_index_oldest = 0;
+ uuid_first_entry_list[added].uuid = mrg_metric_uuid(main_mrg, metric);
+ added++;
+ }
+
+ info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u",
+ ctx->config.tier, count, first_datafile_remaining->fileno);
+
+ journalfile_v2_data_release(journalfile);
+
+ // Update the first time / last time for all metrics we plan to delete
+
+ if(worker)
+ worker_is_busy(UV_EVENT_DBENGINE_FIND_REMAINING_RETENTION);
+
+ find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added);
+
+ if(worker)
+ worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG);
+
+ info("DBENGINE: updating tier %d metrics registry retention for %zu metrics",
+ ctx->config.tier, added);
+
+ size_t deleted_metrics = 0, zero_retention_referenced = 0, zero_disk_retention = 0, zero_disk_but_live = 0;
+ for (size_t index = 0; index < added; ++index) {
+ uuid_first_t_entry = &uuid_first_entry_list[index];
+ if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) {
+ mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s);
+ mrg_metric_release(main_mrg, uuid_first_t_entry->metric);
+ }
+ else {
+ zero_disk_retention++;
+
+ // there is no retention for this metric
+ bool has_retention = mrg_metric_zero_disk_retention(main_mrg, uuid_first_t_entry->metric);
+ if (!has_retention) {
+ bool deleted = mrg_metric_release_and_delete(main_mrg, uuid_first_t_entry->metric);
+ if(deleted)
+ deleted_metrics++;
+ else
+ zero_retention_referenced++;
+ }
+ else {
+ zero_disk_but_live++;
+ mrg_metric_release(main_mrg, uuid_first_t_entry->metric);
+ }
+ }
}
- return 0;
+ freez(uuid_first_entry_list);
+
+ internal_error(zero_disk_retention,
+ "DBENGINE: deleted %zu metrics, zero retention but referenced %zu (out of %zu total, of which %zu have main cache retention) zero on-disk retention tier %d metrics from metrics registry",
+ deleted_metrics, zero_retention_referenced, zero_disk_retention, zero_disk_but_live, ctx->config.tier);
+
+ if(worker)
+ worker_is_idle();
}
-static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc)
-{
- struct rrdengine_instance *ctx = wc->ctx;
+void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool update_retention, bool worker) {
+ if(worker)
+ worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE_WAIT);
+
+ bool datafile_got_for_deletion = datafile_acquire_for_deletion(datafile);
+
+ if (update_retention)
+ update_metrics_first_time_s(ctx, datafile, datafile->next, worker);
- if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) {
- after_invalidate_oldest_committed(wc);
+ while (!datafile_got_for_deletion) {
+ if(worker)
+ worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE_WAIT);
+
+ datafile_got_for_deletion = datafile_acquire_for_deletion(datafile);
+
+ if (!datafile_got_for_deletion) {
+ info("DBENGINE: waiting for data file '%s/"
+ DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION
+ "' to be available for deletion, "
+ "it is in use currently by %u users.",
+ ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno, datafile->users.lockers);
+
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_spin, 1, __ATOMIC_RELAXED);
+ sleep_usec(1 * USEC_PER_SEC);
+ }
}
- if (unlikely(wc->cleanup_thread_deleting_files)) {
- after_delete_old_data(wc);
+
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_started, 1, __ATOMIC_RELAXED);
+ info("DBENGINE: deleting data file '%s/"
+ DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION
+ "'.",
+ ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+
+ if(worker)
+ worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE);
+
+ struct rrdengine_journalfile *journal_file;
+ unsigned deleted_bytes, journal_file_bytes, datafile_bytes;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ uv_rwlock_wrlock(&ctx->datafiles.rwlock);
+ datafile_list_delete_unsafe(ctx, datafile);
+ uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
+
+ journal_file = datafile->journalfile;
+ datafile_bytes = datafile->pos;
+ journal_file_bytes = journalfile_current_size(journal_file);
+ deleted_bytes = journalfile_v2_data_size_get(journal_file);
+
+ info("DBENGINE: deleting data and journal files to maintain disk quota");
+ ret = journalfile_destroy_unsafe(journal_file, datafile);
+ if (!ret) {
+ journalfile_v1_generate_path(datafile, path, sizeof(path));
+ info("DBENGINE: deleted journal file \"%s\".", path);
+ journalfile_v2_generate_path(datafile, path, sizeof(path));
+ info("DBENGINE: deleted journal file \"%s\".", path);
+ deleted_bytes += journal_file_bytes;
}
- if (unlikely(SET_QUIESCE == ctx->quiesce && !rrdeng_threads_alive(wc))) {
- ctx->quiesce = QUIESCED;
- completion_mark_complete(&ctx->rrdengine_completion);
+ ret = destroy_data_file_unsafe(datafile);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("DBENGINE: deleted data file \"%s\".", path);
+ deleted_bytes += datafile_bytes;
}
+ freez(journal_file);
+ freez(datafile);
+
+ ctx_current_disk_space_decrease(ctx, deleted_bytes);
+ info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes);
}
-/* return 0 on success */
-int init_rrd_files(struct rrdengine_instance *ctx)
-{
- int ret = init_data_files(ctx);
-
- BUFFER *wb = buffer_create(1000);
- size_t all_errors = 0;
- usec_t now = now_realtime_usec();
-
- if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter) {
- buffer_sprintf(wb, "%s%zu pages had start time > end time (latest: %llu secs ago)"
- , (all_errors)?", ":""
- , ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter
- , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut) / USEC_PER_SEC
- );
- all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter;
- }
+static void *database_rotate_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
+ datafile_delete(ctx, ctx->datafiles.first, ctx_is_available_for_queries(ctx), true);
- if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter) {
- buffer_sprintf(wb, "%s%zu pages had start time = end time with more than 1 entries (latest: %llu secs ago)"
- , (all_errors)?", ":""
- , ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter
- , (now - ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut) / USEC_PER_SEC
- );
- all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter;
- }
+ if (rrdeng_ctx_exceeded_disk_quota(ctx))
+ rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
- if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter) {
- buffer_sprintf(wb, "%s%zu pages had zero points (latest: %llu secs ago)"
- , (all_errors)?", ":""
- , ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter
- , (now - ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut) / USEC_PER_SEC
- );
- all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter;
- }
+ rrdcontext_db_rotation();
- if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter) {
- buffer_sprintf(wb, "%s%zu pages had update every == 0 with entries > 1 (latest: %llu secs ago)"
- , (all_errors)?", ":""
- , ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter
- , (now - ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut) / USEC_PER_SEC
- );
- all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter;
- }
+ return data;
+}
- if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter) {
- buffer_sprintf(wb, "%s%zu pages had a different number of points compared to their timestamps (latest: %llu secs ago; these page have been loaded)"
- , (all_errors)?", ":""
- , ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter
- , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut) / USEC_PER_SEC
- );
- all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter;
- }
+static void after_flush_all_hot_and_dirty_pages_of_section(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
+ ;
+}
+
+static void *flush_all_hot_and_dirty_pages_of_section_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
+ worker_is_busy(UV_EVENT_DBENGINE_QUIESCE);
+ pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx);
+ completion_mark_complete(&ctx->quiesce.completion);
+ return data;
+}
+
+static void after_populate_mrg(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
+ ;
+}
+
+static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
+ worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG);
+
+ do {
+ struct rrdengine_datafile *datafile = NULL;
+
+ // find a datafile to work
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+ for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) {
+ if(!netdata_spinlock_trylock(&datafile->populate_mrg.spinlock))
+ continue;
+
+ if(datafile->populate_mrg.populated) {
+ netdata_spinlock_unlock(&datafile->populate_mrg.spinlock);
+ continue;
+ }
+
+ // we have the spinlock and it is not populated
+ break;
+ }
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+
+ if(!datafile)
+ break;
+
+ journalfile_v2_populate_retention_to_mrg(ctx, datafile->journalfile);
+ datafile->populate_mrg.populated = true;
+ netdata_spinlock_unlock(&datafile->populate_mrg.spinlock);
- if(ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter) {
- buffer_sprintf(wb, "%s%zu extents have been dropped because they didn't have any valid pages"
- , (all_errors)?", ":""
- , ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter
- );
- all_errors += ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter;
+ } while(1);
+
+ completion_mark_complete(completion);
+
+ return data;
+}
+
+static void after_ctx_shutdown(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
+ ;
+}
+
+static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
+ worker_is_busy(UV_EVENT_DBENGINE_SHUTDOWN);
+
+ completion_wait_for(&ctx->quiesce.completion);
+ completion_destroy(&ctx->quiesce.completion);
+
+ bool logged = false;
+ while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED) ||
+ __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) {
+ if(!logged) {
+ logged = true;
+ info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...",
+ __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED),
+ (ctx->config.legacy) ? -1 : ctx->config.tier);
+ }
+ sleep_usec(1 * USEC_PER_MS);
}
- if(all_errors)
- info("DBENGINE: tier %d: %s", ctx->tier, buffer_tostring(wb));
+ completion_mark_complete(completion);
- buffer_free(wb);
- return ret;
+ return data;
}
-void finalize_rrd_files(struct rrdengine_instance *ctx)
-{
- return finalize_data_files(ctx);
+static void *cache_flush_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
+ if (!main_cache)
+ return data;
+
+ worker_is_busy(UV_EVENT_DBENGINE_FLUSH_MAIN_CACHE);
+ pgc_flush_pages(main_cache, 0);
+
+ return data;
}
-void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc)
-{
- wc->cmd_queue.head = wc->cmd_queue.tail = 0;
- wc->queue_size = 0;
- fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
- fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
+static void *cache_evict_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) {
+ if (!main_cache)
+ return data;
+
+ worker_is_busy(UV_EVENT_DBENGINE_EVICT_MAIN_CACHE);
+ pgc_evict_pages(main_cache, 0, 0);
+
+ return data;
}
-void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd)
-{
- unsigned queue_size;
+static void after_prep_query(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
+ ;
+}
- /* wait for free space in queue */
- uv_mutex_lock(&wc->cmd_mutex);
- while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) {
- uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
- }
- fatal_assert(queue_size < RRDENG_CMD_Q_MAX_SIZE);
- /* enqueue command */
- wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
- wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.tail + 1 : 0;
- wc->queue_size = queue_size + 1;
- uv_mutex_unlock(&wc->cmd_mutex);
+static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) {
+ worker_is_busy(UV_EVENT_DBENGINE_QUERY);
+ PDC *pdc = data;
+ rrdeng_prep_query(pdc);
+ return data;
+}
- /* wake up event loop */
- fatal_assert(0 == uv_async_send(&wc->async));
+unsigned rrdeng_target_data_file_size(struct rrdengine_instance *ctx) {
+ unsigned target_size = ctx->config.max_disk_space / TARGET_DATAFILES;
+ target_size = MIN(target_size, MAX_DATAFILE_SIZE);
+ target_size = MAX(target_size, MIN_DATAFILE_SIZE);
+ return target_size;
}
-struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc)
+bool rrdeng_ctx_exceeded_disk_quota(struct rrdengine_instance *ctx)
{
- struct rrdeng_cmd ret;
- unsigned queue_size;
-
- uv_mutex_lock(&wc->cmd_mutex);
- queue_size = wc->queue_size;
- if (queue_size == 0) {
- ret.opcode = RRDENG_NOOP;
- } else {
- /* dequeue command */
- ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
- if (queue_size == 1) {
- wc->cmd_queue.head = wc->cmd_queue.tail = 0;
- } else {
- wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.head + 1 : 0;
- }
- wc->queue_size = queue_size - 1;
+ uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) -
+ (ctx->datafiles.first->prev ? ctx->datafiles.first->prev->pos : 0);
- /* wake up producers */
- uv_cond_signal(&wc->cmd_cond);
- }
- uv_mutex_unlock(&wc->cmd_mutex);
+ return estimated_disk_space > ctx->config.max_disk_space;
+}
- return ret;
+/* return 0 on success */
+int init_rrd_files(struct rrdengine_instance *ctx)
+{
+ return init_data_files(ctx);
}
-static void load_configuration_dynamic(void)
+void finalize_rrd_files(struct rrdengine_instance *ctx)
{
- unsigned read_num = (unsigned)config_get_number(CONFIG_SECTION_DB, "dbengine pages per extent", MAX_PAGES_PER_EXTENT);
- if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT)
- rrdeng_pages_per_extent = read_num;
- else {
- error("Invalid dbengine pages per extent %u given. Using %u.", read_num, rrdeng_pages_per_extent);
- config_set_number(CONFIG_SECTION_DB, "dbengine pages per extent", rrdeng_pages_per_extent);
- }
+ return finalize_data_files(ctx);
}
void async_cb(uv_async_t *handle)
@@ -1253,256 +1392,413 @@ void async_cb(uv_async_t *handle)
debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
}
-/* Flushes dirty pages when timer expires */
#define TIMER_PERIOD_MS (1000)
-void timer_cb(uv_timer_t* handle)
-{
- worker_is_busy(RRDENG_MAX_OPCODE + 1);
- struct rrdengine_worker_config* wc = handle->data;
- struct rrdengine_instance *ctx = wc->ctx;
+static void *extent_read_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
+ EPDL *epdl = data;
+ epdl_find_extent_and_populate_pages(ctx, epdl, true);
+ return data;
+}
- uv_stop(handle->loop);
- uv_update_time(handle->loop);
- rrdeng_test_quota(wc);
- debug(D_RRDENGINE, "%s: timeout reached.", __func__);
- if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) {
- /* There is free space so we can write to disk and we are not actively deleting dirty buffers */
- struct page_cache *pg_cache = &ctx->pg_cache;
- unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark,
- high_watermark;
-
- uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
- nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
- uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
- producers = ctx->metric_API_max_producers;
- /* are flushable pages more than 25% of the maximum page cache size */
- high_watermark = (ctx->max_cache_pages * 25LLU) / 100;
- low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */
-
- /* Flush more pages only if disk can keep up */
- if (wc->inflight_dirty_pages < high_watermark + producers) {
- if (nr_committed_pages > producers &&
- /* committed to be written pages are more than the produced number */
- nr_committed_pages - producers > high_watermark) {
- /* Flushing speed must increase to stop page cache from filling with dirty pages */
- bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
- }
- bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
+static void epdl_populate_pages_asynchronously(struct rrdengine_instance *ctx, EPDL *epdl, STORAGE_PRIORITY priority) {
+ rrdeng_enq_cmd(ctx, RRDENG_OPCODE_EXTENT_READ, epdl, NULL, priority,
+ rrdeng_enqueue_epdl_cmd, rrdeng_dequeue_epdl_cmd);
+}
- debug(D_RRDENGINE, "Flushing pages to disk.");
- for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL);
- bytes_written && (total_bytes < bytes_to_write);
- total_bytes += bytes_written) {
- bytes_written = do_flush_pages(wc, 0, NULL);
- }
+void pdc_route_asynchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc) {
+ pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_asynchronously, epdl_populate_pages_asynchronously);
+}
+
+void epdl_populate_pages_synchronously(struct rrdengine_instance *ctx, EPDL *epdl, enum storage_priority priority __maybe_unused) {
+ epdl_find_extent_and_populate_pages(ctx, epdl, false);
+}
+
+void pdc_route_synchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc) {
+ pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_synchronously, epdl_populate_pages_synchronously);
+}
+
+#define MAX_RETRIES_TO_START_INDEX (100)
+static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
+ unsigned count = 0;
+ worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX_WAIT);
+
+ while (__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) && count++ < MAX_RETRIES_TO_START_INDEX)
+ sleep_usec(100 * USEC_PER_MS);
+
+ if (count == MAX_RETRIES_TO_START_INDEX) {
+ worker_is_idle();
+ return data;
+ }
+
+ struct rrdengine_datafile *datafile = ctx->datafiles.first;
+ worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX);
+ count = 0;
+ while (datafile && datafile->fileno != ctx_last_fileno_get(ctx) && datafile->fileno != ctx_last_flush_fileno_get(ctx)) {
+
+ netdata_spinlock_lock(&datafile->writers.spinlock);
+ bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true;
+ netdata_spinlock_unlock(&datafile->writers.spinlock);
+
+ if(!available)
+ continue;
+
+ if (unlikely(!journalfile_v2_data_available(datafile->journalfile))) {
+ info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
+ pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
+ journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
+ count++;
}
+
+ datafile = datafile->next;
+
+ if (unlikely(!ctx_is_available_for_queries(ctx)))
+ break;
}
- load_configuration_dynamic();
-#ifdef NETDATA_INTERNAL_CHECKS
+
+ errno = 0;
+ internal_error(count, "DBENGINE: journal indexing done; %u files processed", count);
+
+ worker_is_idle();
+
+ return data;
+}
+
+static void after_do_cache_flush(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
+ rrdeng_main.flushes_running--;
+}
+
+static void after_do_cache_evict(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
+ rrdeng_main.evictions_running--;
+}
+
+static void after_extent_read(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
+ ;
+}
+
+static void after_journal_v2_indexing(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
+ __atomic_store_n(&ctx->atomic.migration_to_v2_running, false, __ATOMIC_RELAXED);
+ rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
+}
+
+struct rrdeng_buffer_sizes rrdeng_get_buffer_sizes(void) {
+ return (struct rrdeng_buffer_sizes) {
+ .pgc = pgc_aral_overhead() + pgc_aral_structures(),
+ .mrg = mrg_aral_overhead() + mrg_aral_structures(),
+ .opcodes = aral_overhead(rrdeng_main.cmd_queue.ar) + aral_structures(rrdeng_main.cmd_queue.ar),
+ .handles = aral_overhead(rrdeng_main.handles.ar) + aral_structures(rrdeng_main.handles.ar),
+ .descriptors = aral_overhead(rrdeng_main.descriptors.ar) + aral_structures(rrdeng_main.descriptors.ar),
+ .wal = __atomic_load_n(&wal_globals.atomics.allocated, __ATOMIC_RELAXED) * (sizeof(WAL) + RRDENG_BLOCK_SIZE),
+ .workers = aral_overhead(rrdeng_main.work_cmd.ar),
+ .pdc = pdc_cache_size(),
+ .xt_io = aral_overhead(rrdeng_main.xt_io_descr.ar) + aral_structures(rrdeng_main.xt_io_descr.ar),
+ .xt_buf = extent_buffer_cache_size(),
+ .epdl = epdl_cache_size(),
+ .deol = deol_cache_size(),
+ .pd = pd_cache_size(),
+
+#ifdef PDC_USE_JULYL
+ .julyl = julyl_cache_size(),
+#endif
+ };
+}
+
+static void after_cleanup(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
+ rrdeng_main.cleanup_running--;
+}
+
+static void *cleanup_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
+ worker_is_busy(UV_EVENT_DBENGINE_BUFFERS_CLEANUP);
+
+ wal_cleanup1();
+ extent_buffer_cleanup1();
+
{
- char buf[4096];
- debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf)));
+ static time_t last_run_s = 0;
+ time_t now_s = now_monotonic_sec();
+ if(now_s - last_run_s >= 10) {
+ last_run_s = now_s;
+ journalfile_v2_data_unmount_cleanup(now_s);
+ }
}
+
+#ifdef PDC_USE_JULYL
+ julyl_cleanup1();
#endif
+ return data;
+}
+
+void timer_cb(uv_timer_t* handle) {
+ worker_is_busy(RRDENG_TIMER_CB);
+ uv_stop(handle->loop);
+ uv_update_time(handle->loop);
+
+ worker_set_metric(RRDENG_OPCODES_WAITING, (NETDATA_DOUBLE)rrdeng_main.cmd_queue.unsafe.waiting);
+ worker_set_metric(RRDENG_WORKS_DISPATCHED, (NETDATA_DOUBLE)__atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED));
+ worker_set_metric(RRDENG_WORKS_EXECUTING, (NETDATA_DOUBLE)__atomic_load_n(&rrdeng_main.work_cmd.atomics.executing, __ATOMIC_RELAXED));
+
+ rrdeng_enq_cmd(NULL, RRDENG_OPCODE_FLUSH_INIT, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
+ rrdeng_enq_cmd(NULL, RRDENG_OPCODE_EVICT_INIT, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
+ rrdeng_enq_cmd(NULL, RRDENG_OPCODE_CLEANUP, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
+
worker_is_idle();
}
-#define MAX_CMD_BATCH_SIZE (256)
+static void dbengine_initialize_structures(void) {
+ pgc_and_mrg_initialize();
+
+ pdc_init();
+ page_details_init();
+ epdl_init();
+ deol_init();
+ rrdeng_cmd_queue_init();
+ work_request_init();
+ rrdeng_query_handle_init();
+ page_descriptors_init();
+ extent_buffer_init();
+ dbengine_page_alloc_init();
+ extent_io_descriptor_init();
+}
-void rrdeng_worker(void* arg)
-{
- worker_register("DBENGINE");
- worker_register_job_name(RRDENG_NOOP, "noop");
- worker_register_job_name(RRDENG_READ_PAGE, "page read");
- worker_register_job_name(RRDENG_READ_EXTENT, "extent read");
- worker_register_job_name(RRDENG_COMMIT_PAGE, "commit");
- worker_register_job_name(RRDENG_FLUSH_PAGES, "flush");
- worker_register_job_name(RRDENG_SHUTDOWN, "shutdown");
- worker_register_job_name(RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, "page lru");
- worker_register_job_name(RRDENG_QUIESCE, "quiesce");
- worker_register_job_name(RRDENG_MAX_OPCODE, "cleanup");
- worker_register_job_name(RRDENG_MAX_OPCODE + 1, "timer");
-
- struct rrdengine_worker_config* wc = arg;
- struct rrdengine_instance *ctx = wc->ctx;
- uv_loop_t* loop;
- int shutdown, ret;
- enum rrdeng_opcode opcode;
- uv_timer_t timer_req;
- struct rrdeng_cmd cmd;
- unsigned cmd_batch_size;
+bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
+ static bool spawned = false;
+ static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
- rrdeng_init_cmd_queue(wc);
+ netdata_spinlock_lock(&spinlock);
- loop = wc->loop = mallocz(sizeof(uv_loop_t));
- ret = uv_loop_init(loop);
- if (ret) {
- error("uv_loop_init(): %s", uv_strerror(ret));
- goto error_after_loop_init;
- }
- loop->data = wc;
+ if(!spawned) {
+ int ret;
- ret = uv_async_init(wc->loop, &wc->async, async_cb);
- if (ret) {
- error("uv_async_init(): %s", uv_strerror(ret));
- goto error_after_async_init;
- }
- wc->async.data = wc;
+ ret = uv_loop_init(&rrdeng_main.loop);
+ if (ret) {
+ error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret));
+ return false;
+ }
+ rrdeng_main.loop.data = &rrdeng_main;
- wc->now_deleting_files = NULL;
- wc->cleanup_thread_deleting_files = 0;
+ ret = uv_async_init(&rrdeng_main.loop, &rrdeng_main.async, async_cb);
+ if (ret) {
+ error("DBENGINE: uv_async_init(): %s", uv_strerror(ret));
+ fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
+ return false;
+ }
+ rrdeng_main.async.data = &rrdeng_main;
+
+ ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.timer);
+ if (ret) {
+ error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret));
+ uv_close((uv_handle_t *)&rrdeng_main.async, NULL);
+ fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
+ return false;
+ }
+ rrdeng_main.timer.data = &rrdeng_main;
- wc->now_invalidating_dirty_pages = NULL;
- wc->cleanup_thread_invalidating_dirty_pages = 0;
- wc->inflight_dirty_pages = 0;
+ dbengine_initialize_structures();
- /* dirty page flushing timer */
- ret = uv_timer_init(loop, &timer_req);
- if (ret) {
- error("uv_timer_init(): %s", uv_strerror(ret));
- goto error_after_timer_init;
+ fatal_assert(0 == uv_thread_create(&rrdeng_main.thread, dbengine_event_loop, &rrdeng_main));
+ spawned = true;
}
- timer_req.data = wc;
- wc->error = 0;
- /* wake up initialization thread */
- completion_mark_complete(&ctx->rrdengine_completion);
+ netdata_spinlock_unlock(&spinlock);
+ return true;
+}
- fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
- shutdown = 0;
- int set_name = 0;
- while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) {
+void dbengine_event_loop(void* arg) {
+ sanity_check();
+ uv_thread_set_name_np(pthread_self(), "DBENGINE");
+ service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
+
+ worker_register("DBENGINE");
+
+ // opcode jobs
+ worker_register_job_name(RRDENG_OPCODE_NOOP, "noop");
+
+ worker_register_job_name(RRDENG_OPCODE_QUERY, "query");
+ worker_register_job_name(RRDENG_OPCODE_EXTENT_WRITE, "extent write");
+ worker_register_job_name(RRDENG_OPCODE_EXTENT_READ, "extent read");
+ worker_register_job_name(RRDENG_OPCODE_FLUSHED_TO_OPEN, "flushed to open");
+ worker_register_job_name(RRDENG_OPCODE_DATABASE_ROTATE, "db rotate");
+ worker_register_job_name(RRDENG_OPCODE_JOURNAL_INDEX, "journal index");
+ worker_register_job_name(RRDENG_OPCODE_FLUSH_INIT, "flush init");
+ worker_register_job_name(RRDENG_OPCODE_EVICT_INIT, "evict init");
+ worker_register_job_name(RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown");
+ worker_register_job_name(RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce");
+
+ worker_register_job_name(RRDENG_OPCODE_MAX, "get opcode");
+
+ worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_QUERY, "query cb");
+ worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_WRITE, "extent write cb");
+ worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_READ, "extent read cb");
+ worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_FLUSHED_TO_OPEN, "flushed to open cb");
+ worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_DATABASE_ROTATE, "db rotate cb");
+ worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_JOURNAL_INDEX, "journal index cb");
+ worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_FLUSH_INIT, "flush init cb");
+ worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EVICT_INIT, "evict init cb");
+ worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown cb");
+ worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce cb");
+
+ // special jobs
+ worker_register_job_name(RRDENG_TIMER_CB, "timer");
+ worker_register_job_name(RRDENG_FLUSH_TRANSACTION_BUFFER_CB, "transaction buffer flush cb");
+
+ worker_register_job_custom_metric(RRDENG_OPCODES_WAITING, "opcodes waiting", "opcodes", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(RRDENG_WORKS_DISPATCHED, "works dispatched", "works", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(RRDENG_WORKS_EXECUTING, "works executing", "works", WORKER_METRIC_ABSOLUTE);
+
+ struct rrdeng_main *main = arg;
+ enum rrdeng_opcode opcode;
+ struct rrdeng_cmd cmd;
+ main->tid = gettid();
+
+ fatal_assert(0 == uv_timer_start(&main->timer, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
+
+ bool shutdown = false;
+ while (likely(!shutdown)) {
worker_is_idle();
- uv_run(loop, UV_RUN_DEFAULT);
- worker_is_busy(RRDENG_MAX_OPCODE);
- rrdeng_cleanup_finished_threads(wc);
+ uv_run(&main->loop, UV_RUN_DEFAULT);
/* wait for commands */
- cmd_batch_size = 0;
do {
- /*
- * Avoid starving the loop when there are too many commands coming in.
- * timer_cb will interrupt the loop again to allow serving more commands.
- */
- if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE))
- break;
-
- cmd = rrdeng_deq_cmd(wc);
+ worker_is_busy(RRDENG_OPCODE_MAX);
+ cmd = rrdeng_deq_cmd();
opcode = cmd.opcode;
- ++cmd_batch_size;
- if(likely(opcode != RRDENG_NOOP))
- worker_is_busy(opcode);
+ worker_is_busy(opcode);
switch (opcode) {
- case RRDENG_NOOP:
- /* the command queue was empty, do nothing */
- break;
- case RRDENG_SHUTDOWN:
- shutdown = 1;
- break;
- case RRDENG_QUIESCE:
- ctx->drop_metrics_under_page_cache_pressure = 0;
- ctx->quiesce = SET_QUIESCE;
- fatal_assert(0 == uv_timer_stop(&timer_req));
- uv_close((uv_handle_t *)&timer_req, NULL);
- while (do_flush_pages(wc, 1, NULL)) {
- ; /* Force flushing of all committed pages. */
+ case RRDENG_OPCODE_EXTENT_READ: {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ EPDL *epdl = cmd.data;
+ work_dispatch(ctx, epdl, NULL, opcode, extent_read_tp_worker, after_extent_read);
+ break;
}
- wal_flush_transaction_buffer(wc);
- if (!rrdeng_threads_alive(wc)) {
- ctx->quiesce = QUIESCED;
- completion_mark_complete(&ctx->rrdengine_completion);
+
+ case RRDENG_OPCODE_QUERY: {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ PDC *pdc = cmd.data;
+ work_dispatch(ctx, pdc, NULL, opcode, query_prep_tp_worker, after_prep_query);
+ break;
}
- break;
- case RRDENG_READ_PAGE:
- do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
- break;
- case RRDENG_READ_EXTENT:
- do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1);
- if (unlikely(!set_name)) {
- set_name = 1;
- uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE");
+
+ case RRDENG_OPCODE_EXTENT_WRITE: {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ struct page_descr_with_data *base = cmd.data;
+ struct completion *completion = cmd.completion; // optional
+ work_dispatch(ctx, base, completion, opcode, extent_write_tp_worker, after_extent_write);
+ break;
}
- break;
- case RRDENG_COMMIT_PAGE:
- do_commit_transaction(wc, STORE_DATA, NULL);
- break;
- case RRDENG_FLUSH_PAGES: {
- if (wc->now_invalidating_dirty_pages) {
- /* Do not flush if the disk cannot keep up */
- completion_mark_complete(cmd.completion);
- } else {
- (void)do_flush_pages(wc, 1, cmd.completion);
+
+ case RRDENG_OPCODE_FLUSHED_TO_OPEN: {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ uv_fs_t *uv_fs_request = cmd.data;
+ struct extent_io_descriptor *xt_io_descr = uv_fs_request->data;
+ struct completion *completion = xt_io_descr->completion;
+ work_dispatch(ctx, uv_fs_request, completion, opcode, extent_flushed_to_open_tp_worker, after_extent_flushed_to_open);
+ break;
+ }
+
+ case RRDENG_OPCODE_FLUSH_INIT: {
+ if(rrdeng_main.flushes_running < (size_t)(libuv_worker_threads / 4)) {
+ rrdeng_main.flushes_running++;
+ work_dispatch(NULL, NULL, NULL, opcode, cache_flush_tp_worker, after_do_cache_flush);
+ }
+ break;
+ }
+
+ case RRDENG_OPCODE_EVICT_INIT: {
+ if(!rrdeng_main.evictions_running) {
+ rrdeng_main.evictions_running++;
+ work_dispatch(NULL, NULL, NULL, opcode, cache_evict_tp_worker, after_do_cache_evict);
+ }
+ break;
+ }
+
+ case RRDENG_OPCODE_CLEANUP: {
+ if(!rrdeng_main.cleanup_running) {
+ rrdeng_main.cleanup_running++;
+ work_dispatch(NULL, NULL, NULL, opcode, cleanup_tp_worker, after_cleanup);
+ }
+ break;
+ }
+
+ case RRDENG_OPCODE_JOURNAL_INDEX: {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ struct rrdengine_datafile *datafile = cmd.data;
+ if(!__atomic_load_n(&ctx->atomic.migration_to_v2_running, __ATOMIC_RELAXED)) {
+
+ __atomic_store_n(&ctx->atomic.migration_to_v2_running, true, __ATOMIC_RELAXED);
+ work_dispatch(ctx, datafile, NULL, opcode, journal_v2_indexing_tp_worker, after_journal_v2_indexing);
+ }
+ break;
+ }
+
+ case RRDENG_OPCODE_DATABASE_ROTATE: {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ if (!__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) &&
+ ctx->datafiles.first->next != NULL &&
+ ctx->datafiles.first->next->next != NULL &&
+ rrdeng_ctx_exceeded_disk_quota(ctx)) {
+
+ __atomic_store_n(&ctx->atomic.now_deleting_files, true, __ATOMIC_RELAXED);
+ work_dispatch(ctx, NULL, NULL, opcode, database_rotate_tp_worker, after_database_rotate);
+ }
+ break;
+ }
+
+ case RRDENG_OPCODE_CTX_POPULATE_MRG: {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ struct completion *completion = cmd.completion;
+ work_dispatch(ctx, NULL, completion, opcode, populate_mrg_tp_worker, after_populate_mrg);
+ break;
+ }
+
+ case RRDENG_OPCODE_CTX_QUIESCE: {
+ // a ctx will shutdown shortly
+ struct rrdengine_instance *ctx = cmd.ctx;
+ __atomic_store_n(&ctx->quiesce.enabled, true, __ATOMIC_RELEASE);
+ work_dispatch(ctx, NULL, NULL, opcode,
+ flush_all_hot_and_dirty_pages_of_section_tp_worker,
+ after_flush_all_hot_and_dirty_pages_of_section);
+ break;
+ }
+
+ case RRDENG_OPCODE_CTX_SHUTDOWN: {
+ // a ctx is shutting down
+ struct rrdengine_instance *ctx = cmd.ctx;
+ struct completion *completion = cmd.completion;
+ work_dispatch(ctx, NULL, completion, opcode, ctx_shutdown_tp_worker, after_ctx_shutdown);
+ break;
+ }
+
+ case RRDENG_OPCODE_NOOP: {
+ /* the command queue was empty, do nothing */
+ break;
+ }
+
+ // not opcodes
+ case RRDENG_OPCODE_MAX:
+ default: {
+ internal_fatal(true, "DBENGINE: unknown opcode");
+ break;
}
- break;
- case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE:
- rrdeng_invalidate_oldest_committed(wc);
- break;
- }
- default:
- debug(D_RRDENGINE, "%s: default.", __func__);
- break;
}
- } while (opcode != RRDENG_NOOP);
+
+ } while (opcode != RRDENG_OPCODE_NOOP);
}
/* cleanup operations of the event loop */
- info("Shutting down RRD engine event loop for tier %d", ctx->tier);
+ info("DBENGINE: shutting down dbengine thread");
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
* it is however undocumented behaviour and we need to be aware if this becomes
* an issue in the future.
*/
- uv_close((uv_handle_t *)&wc->async, NULL);
-
- while (do_flush_pages(wc, 1, NULL)) {
- ; /* Force flushing of all committed pages. */
- }
- wal_flush_transaction_buffer(wc);
- uv_run(loop, UV_RUN_DEFAULT);
-
- info("Shutting down RRD engine event loop for tier %d complete", ctx->tier);
- /* TODO: don't let the API block by waiting to enqueue commands */
- uv_cond_destroy(&wc->cmd_cond);
-/* uv_mutex_destroy(&wc->cmd_mutex); */
- fatal_assert(0 == uv_loop_close(loop));
- freez(loop);
-
+ uv_close((uv_handle_t *)&main->async, NULL);
+ uv_timer_stop(&main->timer);
+ uv_close((uv_handle_t *)&main->timer, NULL);
+ uv_run(&main->loop, UV_RUN_DEFAULT);
+ uv_loop_close(&main->loop);
worker_unregister();
- return;
-
-error_after_timer_init:
- uv_close((uv_handle_t *)&wc->async, NULL);
-error_after_async_init:
- fatal_assert(0 == uv_loop_close(loop));
-error_after_loop_init:
- freez(loop);
-
- wc->error = UV_EAGAIN;
- /* wake up initialization thread */
- completion_mark_complete(&ctx->rrdengine_completion);
- worker_unregister();
-}
-
-/* C entry point for development purposes
- * make "LDFLAGS=-errdengine_main"
- */
-void rrdengine_main(void)
-{
- int ret;
- struct rrdengine_instance *ctx;
-
- sanity_check();
- ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB, 0);
- if (ret) {
- exit(ret);
- }
- rrdeng_exit(ctx);
- fprintf(stderr, "Hello world!");
- exit(0);
}