diff options
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r-- | database/engine/rrdengine.c | 188 |
1 files changed, 144 insertions, 44 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 8b35051d8..e4cd37e98 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -30,7 +30,7 @@ void dbengine_page_free(void *page) { if (unlikely(db_engine_use_malloc)) freez(page); else - munmap(page, RRDENG_BLOCK_SIZE); + netdata_munmap(page, RRDENG_BLOCK_SIZE); } static void sanity_check(void) @@ -206,8 +206,8 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str /* care, we don't hold the descriptor mutex */ if (!uuid_compare(*extent->pages[j]->id, *descr->id) && extent->pages[j]->page_length == descr->page_length && - extent->pages[j]->start_time == descr->start_time && - extent->pages[j]->end_time == descr->end_time) { + extent->pages[j]->start_time_ut == descr->start_time_ut && + extent->pages[j]->end_time_ut == descr->end_time_ut) { break; } page_offset += extent->pages[j]->page_length; @@ -272,11 +272,9 @@ static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) } } -void read_extent_cb(uv_fs_t* req) +static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed) { - struct rrdengine_worker_config* wc = req->loop->data; struct rrdengine_instance *ctx = wc->ctx; - struct extent_io_descriptor *xt_io_descr; struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; int ret; @@ -289,21 +287,20 @@ void read_extent_cb(uv_fs_t* req) struct rrdeng_df_extent_trailer *trailer; uLong crc; - xt_io_descr = req->data; header = xt_io_descr->buf; payload_length = header->payload_length; count = header->number_of_pages; payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count; trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer); - if (req->result < 0) { + if (unlikely(read_failed)) { struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; ++ctx->stats.io_errors; rrd_stat_atomic_add(&global_io_errors, 1); have_read_error = 1; - error("%s: uv_fs_read - %s - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, - uv_strerror((int)req->result), xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + error("%s: uv_fs_read - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, xt_io_descr->pos, + xt_io_descr->bytes, datafile->tier, datafile->fileno); goto after_crc_check; } crc = crc32(0L, Z_NULL, 0); @@ -378,8 +375,8 @@ after_crc_check: /* care, we don't hold the descriptor mutex */ if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj->id) && header->descr[i].page_length == descrj->page_length && - header->descr[i].start_time == descrj->start_time && - header->descr[i].end_time == descrj->end_time) { + header->descr[i].start_time_ut == descrj->start_time_ut && + header->descr[i].end_time_ut == descrj->end_time_ut) { descr = descrj; break; } @@ -387,7 +384,7 @@ after_crc_check: is_prefetched_page = 0; if (!descr) { /* This extent page has not been requested. Try populating it for locality (best effort). */ descr = pg_cache_lookup_unpopulated_and_lock(ctx, (uuid_t *)header->descr[i].uuid, - header->descr[i].start_time); + header->descr[i].start_time_ut); if (!descr) continue; /* Failed to reserve a suitable page */ is_prefetched_page = 1; @@ -421,11 +418,67 @@ after_crc_check: } if (xt_io_descr->completion) completion_mark_complete(xt_io_descr->completion); +} + +static void read_extent_cb(uv_fs_t *req) +{ + struct rrdengine_worker_config *wc = req->loop->data; + struct extent_io_descriptor *xt_io_descr; + + xt_io_descr = req->data; + do_extent_processing(wc, xt_io_descr, req->result < 0); uv_fs_req_cleanup(req); - free(xt_io_descr->buf); + posix_memfree(xt_io_descr->buf); freez(xt_io_descr); } +static void read_mmap_extent_cb(uv_work_t *req, int status __maybe_unused) +{ + struct rrdengine_worker_config *wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + struct extent_io_descriptor *xt_io_descr; + xt_io_descr = req->data; + + if (likely(xt_io_descr->map_base)) { + do_extent_processing(wc, xt_io_descr, false); + munmap(xt_io_descr->map_base, xt_io_descr->map_length); + freez(xt_io_descr); + return; + } + + // MMAP failed, so do uv_fs_read + int ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(xt_io_descr->bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + unsigned real_io_size = ALIGN_BYTES_CEILING( xt_io_descr->bytes); + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + xt_io_descr->req.data = xt_io_descr; + ret = uv_fs_read(req->loop, &xt_io_descr->req, xt_io_descr->file, &xt_io_descr->iov, 1, (unsigned) xt_io_descr->pos, read_extent_cb); + fatal_assert(-1 != ret); + ctx->stats.io_read_bytes += real_io_size; + ctx->stats.io_read_extent_bytes += real_io_size; +} + +static void do_mmap_read_extent(uv_work_t *req) +{ + struct extent_io_descriptor *xt_io_descr = (struct extent_io_descriptor * )req->data; + struct rrdengine_worker_config *wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + + off_t map_start = ALIGN_BYTES_FLOOR(xt_io_descr->pos); + size_t length = ALIGN_BYTES_CEILING(xt_io_descr->pos + xt_io_descr->bytes) - map_start; + unsigned real_io_size = xt_io_descr->bytes; + + void *data = mmap(NULL, length, PROT_READ, MAP_SHARED, xt_io_descr->file, map_start); + if (likely(data != MAP_FAILED)) { + xt_io_descr->map_base = data; + xt_io_descr->map_length = length; + xt_io_descr->buf = data + (xt_io_descr->pos - map_start); + ctx->stats.io_read_bytes += real_io_size; + ctx->stats.io_read_extent_bytes += real_io_size; + } +} static void do_read_extent(struct rrdengine_worker_config* wc, struct rrdeng_page_descr **descr, @@ -435,8 +488,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc, struct rrdengine_instance *ctx = wc->ctx; struct page_cache_descr *pg_cache_descr; int ret; - unsigned i, size_bytes, pos, real_io_size; -// uint32_t payload_length; + unsigned i, size_bytes, pos; struct extent_io_descriptor *xt_io_descr; struct rrdengine_datafile *datafile; struct extent_info *extent = descr[0]->extent; @@ -452,18 +504,17 @@ static void do_read_extent(struct rrdengine_worker_config* wc, rrdeng_page_descr_mutex_lock(ctx, descr[i]); pg_cache_descr = descr[i]->pg_cache_descr; pg_cache_descr->flags |= RRD_PAGE_READ_PENDING; -// payload_length = descr[i]->page_length; rrdeng_page_descr_mutex_unlock(ctx, descr[i]); - xt_io_descr->descr_array[i] = descr[i]; } xt_io_descr->descr_count = count; + xt_io_descr->file = datafile->file; xt_io_descr->bytes = size_bytes; xt_io_descr->pos = pos; - xt_io_descr->req.data = xt_io_descr; + xt_io_descr->req_worker.data = xt_io_descr; xt_io_descr->completion = NULL; - /* xt_io_descr->descr_commit_idx_array[0] */ xt_io_descr->release_descr = release_descr; + xt_io_descr->buf = NULL; xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); if (xt_is_cached) { @@ -483,19 +534,10 @@ static void do_read_extent(struct rrdengine_worker_config* wc, } } - ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); - if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); - /* freez(xt_io_descr); - return;*/ - } - real_io_size = ALIGN_BYTES_CEILING(size_bytes); - xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); - ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb); + ret = uv_queue_work(wc->loop, &xt_io_descr->req_worker, do_mmap_read_extent, read_mmap_extent_cb); fatal_assert(-1 != ret); - ctx->stats.io_read_bytes += real_io_size; + ++ctx->stats.io_read_requests; - ctx->stats.io_read_extent_bytes += real_io_size; ++ctx->stats.io_read_extents; ctx->stats.pg_cache_backfills += count; } @@ -696,7 +738,7 @@ void flush_pages_cb(uv_fs_t* req) if (xt_io_descr->completion) completion_mark_complete(xt_io_descr->completion); uv_fs_req_cleanup(req); - free(xt_io_descr->buf); + posix_memfree(xt_io_descr->buf); freez(xt_io_descr); uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); @@ -820,8 +862,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct header->descr[i].type = descr->type; uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); header->descr[i].page_length = descr->page_length; - header->descr[i].start_time = descr->start_time; - header->descr[i].end_time = descr->end_time; + header->descr[i].start_time_ut = descr->start_time_ut; + header->descr[i].end_time_ut = descr->end_time_ut; pos += sizeof(header->descr[i]); } for (i = 0 ; i < count ; ++i) { @@ -922,7 +964,6 @@ static void after_delete_old_data(struct rrdengine_worker_config* wc) wc->now_deleting_files = NULL; wc->cleanup_thread_deleting_files = 0; - aclk_data_rotated(); rrdcontext_db_rotation(); /* interrupt event loop */ @@ -948,12 +989,12 @@ static void delete_old_data(void *arg) for (i = 0 ; i < count ; ++i) { descr = extent->pages[i]; can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id); - if (unlikely(can_delete_metric && ctx->metalog_ctx->initialized)) { + if (unlikely(can_delete_metric)) { /* * If the metric is empty, has no active writers and if the metadata log has been initialized then * attempt to delete the corresponding netdata dimension. */ - metalog_delete_dimension_by_uuid(ctx->metalog_ctx, &metric_id); + metaqueue_delete_dimension_uuid(&metric_id); } } next = extent->next; @@ -1044,7 +1085,70 @@ static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc) /* return 0 on success */ int init_rrd_files(struct rrdengine_instance *ctx) { - return init_data_files(ctx); + int ret = init_data_files(ctx); + + BUFFER *wb = buffer_create(1000); + size_t all_errors = 0; + usec_t now = now_realtime_usec(); + + if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter) { + buffer_sprintf(wb, "%s%zu pages had start time > end time (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter) { + buffer_sprintf(wb, "%s%zu pages had start time = end time with more than 1 entries (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter) { + buffer_sprintf(wb, "%s%zu pages had zero points (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter) { + buffer_sprintf(wb, "%s%zu pages had update every == 0 with entries > 1 (latest: %llu secs ago)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter) { + buffer_sprintf(wb, "%s%zu pages had a different number of points compared to their timestamps (latest: %llu secs ago; these page have been loaded)" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter + , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut) / USEC_PER_SEC + ); + all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter; + } + + if(ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter) { + buffer_sprintf(wb, "%s%zu extents have been dropped because they didn't have any valid pages" + , (all_errors)?", ":"" + , ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter + ); + all_errors += ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter; + } + + if(all_errors) + info("DBENGINE: tier %d: %s", ctx->tier, buffer_tostring(wb)); + + buffer_free(wb); + return ret; } void finalize_rrd_files(struct rrdengine_instance *ctx) @@ -1139,10 +1243,6 @@ void timer_cb(uv_timer_t* handle) uv_stop(handle->loop); uv_update_time(handle->loop); - if (unlikely(!ctx->metalog_ctx->initialized)) { - worker_is_idle(); - return; /* Wait for the metadata log to initialize */ - } rrdeng_test_quota(wc); debug(D_RRDENGINE, "%s: timeout reached.", __func__); if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) { @@ -1329,7 +1429,7 @@ void rrdeng_worker(void* arg) } /* cleanup operations of the event loop */ - info("Shutting down RRD engine event loop."); + info("Shutting down RRD engine event loop for tier %d", ctx->tier); /* * uv_async_send after uv_close does not seem to crash in linux at the moment, @@ -1344,7 +1444,7 @@ void rrdeng_worker(void* arg) wal_flush_transaction_buffer(wc); uv_run(loop, UV_RUN_DEFAULT); - info("Shutting down RRD engine event loop complete."); + info("Shutting down RRD engine event loop for tier %d complete", ctx->tier); /* TODO: don't let the API block by waiting to enqueue commands */ uv_cond_destroy(&wc->cmd_cond); /* uv_mutex_destroy(&wc->cmd_mutex); */ |