summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c188
1 files changed, 144 insertions, 44 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 8b35051d8..e4cd37e98 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -30,7 +30,7 @@ void dbengine_page_free(void *page) {
if (unlikely(db_engine_use_malloc))
freez(page);
else
- munmap(page, RRDENG_BLOCK_SIZE);
+ netdata_munmap(page, RRDENG_BLOCK_SIZE);
}
static void sanity_check(void)
@@ -206,8 +206,8 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str
/* care, we don't hold the descriptor mutex */
if (!uuid_compare(*extent->pages[j]->id, *descr->id) &&
extent->pages[j]->page_length == descr->page_length &&
- extent->pages[j]->start_time == descr->start_time &&
- extent->pages[j]->end_time == descr->end_time) {
+ extent->pages[j]->start_time_ut == descr->start_time_ut &&
+ extent->pages[j]->end_time_ut == descr->end_time_ut) {
break;
}
page_offset += extent->pages[j]->page_length;
@@ -272,11 +272,9 @@ static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type)
}
}
-void read_extent_cb(uv_fs_t* req)
+static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed)
{
- struct rrdengine_worker_config* wc = req->loop->data;
struct rrdengine_instance *ctx = wc->ctx;
- struct extent_io_descriptor *xt_io_descr;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
int ret;
@@ -289,21 +287,20 @@ void read_extent_cb(uv_fs_t* req)
struct rrdeng_df_extent_trailer *trailer;
uLong crc;
- xt_io_descr = req->data;
header = xt_io_descr->buf;
payload_length = header->payload_length;
count = header->number_of_pages;
payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer);
- if (req->result < 0) {
+ if (unlikely(read_failed)) {
struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
have_read_error = 1;
- error("%s: uv_fs_read - %s - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__,
- uv_strerror((int)req->result), xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ error("%s: uv_fs_read - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, xt_io_descr->pos,
+ xt_io_descr->bytes, datafile->tier, datafile->fileno);
goto after_crc_check;
}
crc = crc32(0L, Z_NULL, 0);
@@ -378,8 +375,8 @@ after_crc_check:
/* care, we don't hold the descriptor mutex */
if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj->id) &&
header->descr[i].page_length == descrj->page_length &&
- header->descr[i].start_time == descrj->start_time &&
- header->descr[i].end_time == descrj->end_time) {
+ header->descr[i].start_time_ut == descrj->start_time_ut &&
+ header->descr[i].end_time_ut == descrj->end_time_ut) {
descr = descrj;
break;
}
@@ -387,7 +384,7 @@ after_crc_check:
is_prefetched_page = 0;
if (!descr) { /* This extent page has not been requested. Try populating it for locality (best effort). */
descr = pg_cache_lookup_unpopulated_and_lock(ctx, (uuid_t *)header->descr[i].uuid,
- header->descr[i].start_time);
+ header->descr[i].start_time_ut);
if (!descr)
continue; /* Failed to reserve a suitable page */
is_prefetched_page = 1;
@@ -421,11 +418,67 @@ after_crc_check:
}
if (xt_io_descr->completion)
completion_mark_complete(xt_io_descr->completion);
+}
+
+static void read_extent_cb(uv_fs_t *req)
+{
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct extent_io_descriptor *xt_io_descr;
+
+ xt_io_descr = req->data;
+ do_extent_processing(wc, xt_io_descr, req->result < 0);
uv_fs_req_cleanup(req);
- free(xt_io_descr->buf);
+ posix_memfree(xt_io_descr->buf);
freez(xt_io_descr);
}
+static void read_mmap_extent_cb(uv_work_t *req, int status __maybe_unused)
+{
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct extent_io_descriptor *xt_io_descr;
+ xt_io_descr = req->data;
+
+ if (likely(xt_io_descr->map_base)) {
+ do_extent_processing(wc, xt_io_descr, false);
+ munmap(xt_io_descr->map_base, xt_io_descr->map_length);
+ freez(xt_io_descr);
+ return;
+ }
+
+ // MMAP failed, so do uv_fs_read
+ int ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(xt_io_descr->bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ unsigned real_io_size = ALIGN_BYTES_CEILING( xt_io_descr->bytes);
+ xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
+ xt_io_descr->req.data = xt_io_descr;
+ ret = uv_fs_read(req->loop, &xt_io_descr->req, xt_io_descr->file, &xt_io_descr->iov, 1, (unsigned) xt_io_descr->pos, read_extent_cb);
+ fatal_assert(-1 != ret);
+ ctx->stats.io_read_bytes += real_io_size;
+ ctx->stats.io_read_extent_bytes += real_io_size;
+}
+
+static void do_mmap_read_extent(uv_work_t *req)
+{
+ struct extent_io_descriptor *xt_io_descr = (struct extent_io_descriptor * )req->data;
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ off_t map_start = ALIGN_BYTES_FLOOR(xt_io_descr->pos);
+ size_t length = ALIGN_BYTES_CEILING(xt_io_descr->pos + xt_io_descr->bytes) - map_start;
+ unsigned real_io_size = xt_io_descr->bytes;
+
+ void *data = mmap(NULL, length, PROT_READ, MAP_SHARED, xt_io_descr->file, map_start);
+ if (likely(data != MAP_FAILED)) {
+ xt_io_descr->map_base = data;
+ xt_io_descr->map_length = length;
+ xt_io_descr->buf = data + (xt_io_descr->pos - map_start);
+ ctx->stats.io_read_bytes += real_io_size;
+ ctx->stats.io_read_extent_bytes += real_io_size;
+ }
+}
static void do_read_extent(struct rrdengine_worker_config* wc,
struct rrdeng_page_descr **descr,
@@ -435,8 +488,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
struct rrdengine_instance *ctx = wc->ctx;
struct page_cache_descr *pg_cache_descr;
int ret;
- unsigned i, size_bytes, pos, real_io_size;
-// uint32_t payload_length;
+ unsigned i, size_bytes, pos;
struct extent_io_descriptor *xt_io_descr;
struct rrdengine_datafile *datafile;
struct extent_info *extent = descr[0]->extent;
@@ -452,18 +504,17 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
rrdeng_page_descr_mutex_lock(ctx, descr[i]);
pg_cache_descr = descr[i]->pg_cache_descr;
pg_cache_descr->flags |= RRD_PAGE_READ_PENDING;
-// payload_length = descr[i]->page_length;
rrdeng_page_descr_mutex_unlock(ctx, descr[i]);
-
xt_io_descr->descr_array[i] = descr[i];
}
xt_io_descr->descr_count = count;
+ xt_io_descr->file = datafile->file;
xt_io_descr->bytes = size_bytes;
xt_io_descr->pos = pos;
- xt_io_descr->req.data = xt_io_descr;
+ xt_io_descr->req_worker.data = xt_io_descr;
xt_io_descr->completion = NULL;
- /* xt_io_descr->descr_commit_idx_array[0] */
xt_io_descr->release_descr = release_descr;
+ xt_io_descr->buf = NULL;
xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
if (xt_is_cached) {
@@ -483,19 +534,10 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
}
}
- ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
- if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
- /* freez(xt_io_descr);
- return;*/
- }
- real_io_size = ALIGN_BYTES_CEILING(size_bytes);
- xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
- ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb);
+ ret = uv_queue_work(wc->loop, &xt_io_descr->req_worker, do_mmap_read_extent, read_mmap_extent_cb);
fatal_assert(-1 != ret);
- ctx->stats.io_read_bytes += real_io_size;
+
++ctx->stats.io_read_requests;
- ctx->stats.io_read_extent_bytes += real_io_size;
++ctx->stats.io_read_extents;
ctx->stats.pg_cache_backfills += count;
}
@@ -696,7 +738,7 @@ void flush_pages_cb(uv_fs_t* req)
if (xt_io_descr->completion)
completion_mark_complete(xt_io_descr->completion);
uv_fs_req_cleanup(req);
- free(xt_io_descr->buf);
+ posix_memfree(xt_io_descr->buf);
freez(xt_io_descr);
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
@@ -820,8 +862,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
header->descr[i].type = descr->type;
uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
header->descr[i].page_length = descr->page_length;
- header->descr[i].start_time = descr->start_time;
- header->descr[i].end_time = descr->end_time;
+ header->descr[i].start_time_ut = descr->start_time_ut;
+ header->descr[i].end_time_ut = descr->end_time_ut;
pos += sizeof(header->descr[i]);
}
for (i = 0 ; i < count ; ++i) {
@@ -922,7 +964,6 @@ static void after_delete_old_data(struct rrdengine_worker_config* wc)
wc->now_deleting_files = NULL;
wc->cleanup_thread_deleting_files = 0;
- aclk_data_rotated();
rrdcontext_db_rotation();
/* interrupt event loop */
@@ -948,12 +989,12 @@ static void delete_old_data(void *arg)
for (i = 0 ; i < count ; ++i) {
descr = extent->pages[i];
can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id);
- if (unlikely(can_delete_metric && ctx->metalog_ctx->initialized)) {
+ if (unlikely(can_delete_metric)) {
/*
* If the metric is empty, has no active writers and if the metadata log has been initialized then
* attempt to delete the corresponding netdata dimension.
*/
- metalog_delete_dimension_by_uuid(ctx->metalog_ctx, &metric_id);
+ metaqueue_delete_dimension_uuid(&metric_id);
}
}
next = extent->next;
@@ -1044,7 +1085,70 @@ static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc)
/* return 0 on success */
int init_rrd_files(struct rrdengine_instance *ctx)
{
- return init_data_files(ctx);
+ int ret = init_data_files(ctx);
+
+ BUFFER *wb = buffer_create(1000);
+ size_t all_errors = 0;
+ usec_t now = now_realtime_usec();
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter) {
+ buffer_sprintf(wb, "%s%zu pages had start time > end time (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter) {
+ buffer_sprintf(wb, "%s%zu pages had start time = end time with more than 1 entries (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter) {
+ buffer_sprintf(wb, "%s%zu pages had zero points (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter) {
+ buffer_sprintf(wb, "%s%zu pages had update every == 0 with entries > 1 (latest: %llu secs ago)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter) {
+ buffer_sprintf(wb, "%s%zu pages had a different number of points compared to their timestamps (latest: %llu secs ago; these page have been loaded)"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter
+ , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut) / USEC_PER_SEC
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter;
+ }
+
+ if(ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter) {
+ buffer_sprintf(wb, "%s%zu extents have been dropped because they didn't have any valid pages"
+ , (all_errors)?", ":""
+ , ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter
+ );
+ all_errors += ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter;
+ }
+
+ if(all_errors)
+ info("DBENGINE: tier %d: %s", ctx->tier, buffer_tostring(wb));
+
+ buffer_free(wb);
+ return ret;
}
void finalize_rrd_files(struct rrdengine_instance *ctx)
@@ -1139,10 +1243,6 @@ void timer_cb(uv_timer_t* handle)
uv_stop(handle->loop);
uv_update_time(handle->loop);
- if (unlikely(!ctx->metalog_ctx->initialized)) {
- worker_is_idle();
- return; /* Wait for the metadata log to initialize */
- }
rrdeng_test_quota(wc);
debug(D_RRDENGINE, "%s: timeout reached.", __func__);
if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) {
@@ -1329,7 +1429,7 @@ void rrdeng_worker(void* arg)
}
/* cleanup operations of the event loop */
- info("Shutting down RRD engine event loop.");
+ info("Shutting down RRD engine event loop for tier %d", ctx->tier);
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
@@ -1344,7 +1444,7 @@ void rrdeng_worker(void* arg)
wal_flush_transaction_buffer(wc);
uv_run(loop, UV_RUN_DEFAULT);
- info("Shutting down RRD engine event loop complete.");
+ info("Shutting down RRD engine event loop for tier %d complete", ctx->tier);
/* TODO: don't let the API block by waiting to enqueue commands */
uv_cond_destroy(&wc->cmd_cond);
/* uv_mutex_destroy(&wc->cmd_mutex); */