diff options
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r-- | database/engine/rrdengine.c | 234 |
1 files changed, 151 insertions, 83 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index b8e4eba01..0f2dceaa4 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -3,6 +3,10 @@ #include "rrdengine.h" +rrdeng_stats_t global_io_errors = 0; +rrdeng_stats_t global_fs_errors = 0; +rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; + void sanity_check(void) { /* Magic numbers must fit in the super-blocks */ @@ -27,12 +31,12 @@ void read_extent_cb(uv_fs_t* req) struct rrdengine_worker_config* wc = req->loop->data; struct rrdengine_instance *ctx = wc->ctx; struct extent_io_descriptor *xt_io_descr; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; int ret; unsigned i, j, count; void *page, *uncompressed_buf = NULL; uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length; - struct rrdengine_datafile *datafile; /* persistent structures */ struct rrdeng_df_extent_header *header; struct rrdeng_df_extent_trailer *trailer; @@ -54,9 +58,13 @@ void read_extent_cb(uv_fs_t* req) crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer)); ret = crc32cmp(trailer->checksum, crc); - datafile = xt_io_descr->descr_array[0]->extent->datafile; - debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, - xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); +#ifdef NETDATA_INTERNAL_CHECKS + { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, + xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); + } +#endif if (unlikely(ret)) { /* TODO: handle errors */ exit(UV_EIO); @@ -97,36 +105,38 @@ void read_extent_cb(uv_fs_t* req) (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length); } pg_cache_replaceQ_insert(ctx, descr); - uv_mutex_lock(&descr->mutex); - descr->page = page; - descr->flags |= RRD_PAGE_POPULATED; - descr->flags &= ~RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags |= RRD_PAGE_POPULATED; + pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); if (xt_io_descr->release_descr) { pg_cache_put_unsafe(descr); } else { pg_cache_wake_up_waiters_unsafe(descr); } - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); } if (RRD_NO_COMPRESSION != header->compression_algorithm) { - free(uncompressed_buf); + freez(uncompressed_buf); } if (xt_io_descr->completion) complete(xt_io_descr->completion); cleanup: uv_fs_req_cleanup(req); free(xt_io_descr->buf); - free(xt_io_descr); + freez(xt_io_descr); } static void do_read_extent(struct rrdengine_worker_config* wc, - struct rrdeng_page_cache_descr **descr, + struct rrdeng_page_descr **descr, unsigned count, uint8_t release_descr) { struct rrdengine_instance *ctx = wc->ctx; + struct page_cache_descr *pg_cache_descr; int ret; unsigned i, size_bytes, pos, real_io_size; // uint32_t payload_length; @@ -141,14 +151,15 @@ static void do_read_extent(struct rrdengine_worker_config* wc, ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("posix_memalign:%s", strerror(ret)); - /* free(xt_io_descr); + /* freez(xt_io_descr); return;*/ } for (i = 0 ; i < count; ++i) { - uv_mutex_lock(&descr[i]->mutex); - descr[i]->flags |= RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_lock(ctx, descr[i]); + pg_cache_descr = descr[i]->pg_cache_descr; + pg_cache_descr->flags |= RRD_PAGE_READ_PENDING; // payload_length = descr[i]->page_length; - uv_mutex_unlock(&descr[i]->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr[i]); xt_io_descr->descr_array[i] = descr[i]; } @@ -227,8 +238,8 @@ void flush_pages_cb(uv_fs_t* req) struct rrdengine_instance *ctx = wc->ctx; struct page_cache *pg_cache = &ctx->pg_cache; struct extent_io_descriptor *xt_io_descr; - struct rrdeng_page_cache_descr *descr; - struct rrdengine_datafile *datafile; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; int ret; unsigned i, count; Word_t commit_id; @@ -238,10 +249,13 @@ void flush_pages_cb(uv_fs_t* req) error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); goto cleanup; } - datafile = xt_io_descr->descr_array[0]->extent->datafile; - debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", - __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); - +#ifdef NETDATA_INTERNAL_CHECKS + { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", + __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + } +#endif count = xt_io_descr->descr_count; for (i = 0 ; i < count ; ++i) { /* care, we don't hold the descriptor mutex */ @@ -256,18 +270,19 @@ void flush_pages_cb(uv_fs_t* req) pg_cache_replaceQ_insert(ctx, descr); - uv_mutex_lock(&descr->mutex); - descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING); + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING); /* wake up waiters, care no reference being held */ pg_cache_wake_up_waiters_unsafe(descr); - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); } if (xt_io_descr->completion) complete(xt_io_descr->completion); cleanup: uv_fs_req_cleanup(req); free(xt_io_descr->buf); - free(xt_io_descr); + freez(xt_io_descr); } /* @@ -283,7 +298,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct int compressed_size, max_compressed_size = 0; unsigned i, count, size_bytes, pos, real_io_size; uint32_t uncompressed_payload_length, payload_offset; - struct rrdeng_page_cache_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; + struct rrdeng_page_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; + struct page_cache_descr *pg_cache_descr; struct extent_io_descriptor *xt_io_descr; void *compressed_buf = NULL; Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; @@ -311,15 +327,16 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct descr = unlikely(NULL == PValue) ? NULL : *PValue) { assert(0 != descr->page_length); - uv_mutex_lock(&descr->mutex); - if (!(descr->flags & RRD_PAGE_WRITE_PENDING)) { + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) { /* care, no reference being held */ - descr->flags |= RRD_PAGE_WRITE_PENDING; + pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING; uncompressed_payload_length += descr->page_length; descr_commit_idx_array[count] = Index; eligible_pages[count++] = descr; } - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); } uv_rwlock_rdunlock(&pg_cache->commited_page_index.lock); @@ -345,9 +362,9 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("posix_memalign:%s", strerror(ret)); - /* free(xt_io_descr);*/ + /* freez(xt_io_descr);*/ } - (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_cache_descr *) * count); + (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count); xt_io_descr->descr_count = count; pos = 0; @@ -378,7 +395,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct for (i = 0 ; i < count ; ++i) { descr = xt_io_descr->descr_array[i]; /* care, we don't hold the descriptor mutex */ - (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length); + (void) memcpy(xt_io_descr->buf + pos, descr->pg_cache_descr->page, descr->page_length); descr->extent = extent; extent->pages[i] = descr; @@ -397,7 +414,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct ctx->stats.after_compress_bytes += compressed_size; debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size); (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); - free(compressed_buf); + freez(compressed_buf); size_bytes = payload_offset + compressed_size + sizeof(*trailer); header->payload_length = compressed_size; break; @@ -435,23 +452,36 @@ static void after_delete_old_data(uv_work_t *req, int status) struct rrdengine_worker_config* wc = &ctx->worker_config; struct rrdengine_datafile *datafile; struct rrdengine_journalfile *journalfile; - unsigned bytes; + unsigned deleted_bytes, journalfile_bytes, datafile_bytes; + int ret; + char path[RRDENG_PATH_MAX]; (void)status; datafile = ctx->datafiles.first; journalfile = datafile->journalfile; - bytes = datafile->pos + journalfile->pos; + datafile_bytes = datafile->pos; + journalfile_bytes = journalfile->pos; + deleted_bytes = 0; + info("Deleting data and journal file pair."); datafile_list_delete(ctx, datafile); - destroy_journal_file(journalfile, datafile); - destroy_data_file(datafile); - info("Deleted data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", - datafile->tier, datafile->fileno); - free(journalfile); - free(datafile); + ret = destroy_journal_file(journalfile, datafile); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Deleted journal file \"%s\".", path); + deleted_bytes += journalfile_bytes; + } + ret = destroy_data_file(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Deleted data file \"%s\".", path); + deleted_bytes += datafile_bytes; + } + freez(journalfile); + freez(datafile); - ctx->disk_space -= bytes; - info("Reclaimed %u bytes of disk space.", bytes); + ctx->disk_space -= deleted_bytes; + info("Reclaimed %u bytes of disk space.", deleted_bytes); /* unfreeze command processing */ wc->now_deleting.data = NULL; @@ -464,7 +494,7 @@ static void delete_old_data(uv_work_t *req) struct rrdengine_instance *ctx = req->data; struct rrdengine_datafile *datafile; struct extent_info *extent, *next; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; unsigned count, i; /* Safe to use since it will be deleted after we are done */ @@ -474,10 +504,10 @@ static void delete_old_data(uv_work_t *req) count = extent->number_of_pages; for (i = 0 ; i < count ; ++i) { descr = extent->pages[i]; - pg_cache_punch_hole(ctx, descr); + pg_cache_punch_hole(ctx, descr, 0); } next = extent->next; - free(extent); + freez(extent); } } @@ -487,6 +517,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) struct rrdengine_datafile *datafile; unsigned current_size, target_size; uint8_t out_of_space, only_one_datafile; + int ret; out_of_space = 0; if (unlikely(ctx->disk_space > ctx->max_disk_space)) { @@ -501,7 +532,10 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) { /* Finalize data and journal file and create a new pair */ wal_flush_transaction_buffer(wc); - create_new_datafile_pair(ctx, 1, datafile->fileno + 1); + ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1); + if (likely(!ret)) { + ++ctx->last_fileno; + } } if (unlikely(out_of_space)) { /* delete old data */ @@ -509,18 +543,30 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) /* already deleting data */ return; } - info("Deleting data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", - ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + if (NULL == ctx->datafiles.first->next) { + error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\"" + " to reclaim space, there are no other file pairs left.", + ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + return; + } + info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", + ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); wc->now_deleting.data = ctx; - uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data); + assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data)); } } +/* return 0 on success */ int init_rrd_files(struct rrdengine_instance *ctx) { return init_data_files(ctx); } +void finalize_rrd_files(struct rrdengine_instance *ctx) +{ + return finalize_data_files(ctx); +} + void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc) { wc->cmd_queue.head = wc->cmd_queue.tail = 0; @@ -588,7 +634,6 @@ void async_cb(uv_async_t *handle) void timer_cb(uv_timer_t* handle) { struct rrdengine_worker_config* wc = handle->data; - struct rrdengine_instance *ctx = wc->ctx; uv_stop(handle->loop); uv_update_time(handle->loop); @@ -608,7 +653,7 @@ void timer_cb(uv_timer_t* handle) #ifdef NETDATA_INTERNAL_CHECKS { char buf[4096]; - debug(D_RRDENGINE, "%s", get_rrdeng_statistics(ctx, buf, sizeof(buf))); + debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf))); } #endif } @@ -623,7 +668,7 @@ void rrdeng_worker(void* arg) struct rrdengine_worker_config* wc = arg; struct rrdengine_instance *ctx = wc->ctx; uv_loop_t* loop; - int shutdown; + int shutdown, ret; enum rrdeng_opcode opcode; uv_timer_t timer_req; struct rrdeng_cmd cmd; @@ -631,22 +676,35 @@ void rrdeng_worker(void* arg) rrdeng_init_cmd_queue(wc); loop = wc->loop = mallocz(sizeof(uv_loop_t)); - uv_loop_init(loop); + ret = uv_loop_init(loop); + if (ret) { + error("uv_loop_init(): %s", uv_strerror(ret)); + goto error_after_loop_init; + } loop->data = wc; - uv_async_init(wc->loop, &wc->async, async_cb); + ret = uv_async_init(wc->loop, &wc->async, async_cb); + if (ret) { + error("uv_async_init(): %s", uv_strerror(ret)); + goto error_after_async_init; + } wc->async.data = wc; wc->now_deleting.data = NULL; /* dirty page flushing timer */ - uv_timer_init(loop, &timer_req); + ret = uv_timer_init(loop, &timer_req); + if (ret) { + error("uv_timer_init(): %s", uv_strerror(ret)); + goto error_after_timer_init; + } timer_req.data = wc; + wc->error = 0; /* wake up initialization thread */ complete(&ctx->rrdengine_completion); - uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS); + assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); shutdown = 0; while (shutdown == 0 || uv_loop_alive(loop)) { uv_run(loop, UV_RUN_DEFAULT); @@ -661,12 +719,6 @@ void rrdeng_worker(void* arg) break; case RRDENG_SHUTDOWN: shutdown = 1; - if (unlikely(wc->now_deleting.data)) { - /* postpone shutdown until after deletion */ - info("Postponing shutting RRD engine event loop down until after datafile deletion is finished."); - rrdeng_enq_cmd(wc, &cmd); - break; - } /* * uv_async_send after uv_close does not seem to crash in linux at the moment, * it is however undocumented behaviour and we need to be aware if this becomes @@ -675,10 +727,6 @@ void rrdeng_worker(void* arg) uv_close((uv_handle_t *)&wc->async, NULL); assert(0 == uv_timer_stop(&timer_req)); uv_close((uv_handle_t *)&timer_req, NULL); - info("Shutting down RRD engine event loop."); - while (do_flush_pages(wc, 1, NULL)) { - ; /* Force flushing of all commited pages. */ - } break; case RRDENG_READ_PAGE: do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0); @@ -690,14 +738,14 @@ void rrdeng_worker(void* arg) do_commit_transaction(wc, STORE_DATA, NULL); break; case RRDENG_FLUSH_PAGES: { - unsigned total_bytes, bytes_written; + unsigned bytes_written; /* First I/O should be enough to call completion */ bytes_written = do_flush_pages(wc, 1, cmd.completion); - for (total_bytes = bytes_written ; - bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ; - total_bytes += bytes_written) { - bytes_written = do_flush_pages(wc, 1, NULL); + if (bytes_written) { + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all commited pages. */ + } } break; } @@ -708,6 +756,13 @@ void rrdeng_worker(void* arg) } while (opcode != RRDENG_NOOP); } /* cleanup operations of the event loop */ + if (unlikely(wc->now_deleting.data)) { + info("Postponing shutting RRD engine event loop down until after datafile deletion is finished."); + } + info("Shutting down RRD engine event loop."); + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all commited pages. */ + } wal_flush_transaction_buffer(wc); uv_run(loop, UV_RUN_DEFAULT); @@ -716,7 +771,20 @@ void rrdeng_worker(void* arg) uv_cond_destroy(&wc->cmd_cond); /* uv_mutex_destroy(&wc->cmd_mutex); */ assert(0 == uv_loop_close(loop)); - free(loop); + freez(loop); + + return; + +error_after_timer_init: + uv_close((uv_handle_t *)&wc->async, NULL); +error_after_async_init: + assert(0 == uv_loop_close(loop)); +error_after_loop_init: + freez(loop); + + wc->error = UV_EAGAIN; + /* wake up initialization thread */ + complete(&ctx->rrdengine_completion); } @@ -726,19 +794,19 @@ static void basic_functional_test(struct rrdengine_instance *ctx) int i, j, failed_validations; uuid_t uuid[NR_PAGES]; void *buf; - struct rrdeng_page_cache_descr *handle[NR_PAGES]; - char uuid_str[37]; - char backup[NR_PAGES][37 * 100]; /* backup storage for page data verification */ + struct rrdeng_page_descr *handle[NR_PAGES]; + char uuid_str[UUID_STR_LEN]; + char backup[NR_PAGES][UUID_STR_LEN * 100]; /* backup storage for page data verification */ for (i = 0 ; i < NR_PAGES ; ++i) { uuid_generate(uuid[i]); uuid_unparse_lower(uuid[i], uuid_str); // fprintf(stderr, "Generated uuid[%d]=%s\n", i, uuid_str); - buf = rrdeng_create_page(&uuid[i], &handle[i]); + buf = rrdeng_create_page(ctx, &uuid[i], &handle[i]); /* Each page contains 10 times its own UUID stringified */ for (j = 0 ; j < 100 ; ++j) { - strcpy(buf + 37 * j, uuid_str); - strcpy(backup[i] + 37 * j, uuid_str); + strcpy(buf + UUID_STR_LEN * j, uuid_str); + strcpy(backup[i] + UUID_STR_LEN * j, uuid_str); } rrdeng_commit_page(ctx, handle[i], (Word_t)i); } @@ -750,7 +818,7 @@ static void basic_functional_test(struct rrdengine_instance *ctx) ++failed_validations; fprintf(stderr, "Page %d was LOST.\n", i); } - if (memcmp(backup[i], buf, 37 * 100)) { + if (memcmp(backup[i], buf, UUID_STR_LEN * 100)) { ++failed_validations; fprintf(stderr, "Page %d data comparison with backup FAILED validation.\n", i); } |