diff options
Diffstat (limited to 'database/engine')
-rw-r--r-- | database/engine/journalfile.c | 12 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 44 |
2 files changed, 37 insertions, 19 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 30eaa0ec6..d6e4f3174 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -3,13 +3,18 @@ static void flush_transaction_buffer_cb(uv_fs_t* req) { - struct generic_io_descriptor *io_descr; + struct generic_io_descriptor *io_descr = req->data; + struct rrdengine_worker_config* wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); if (req->result < 0) { - fatal("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + } else { + debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); } - io_descr = req->data; uv_fs_req_cleanup(req); free(io_descr->buf); @@ -348,6 +353,7 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng ret = crc32cmp(jf_trailer->checksum, crc); debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED"); if (unlikely(ret)) { + error("Transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id); return size_bytes; } switch (jf_header->type) { diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 0f2dceaa4..221216bb3 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -37,24 +37,29 @@ void read_extent_cb(uv_fs_t* req) unsigned i, j, count; void *page, *uncompressed_buf = NULL; uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length; + uint8_t have_read_error = 0; /* persistent structures */ struct rrdeng_df_extent_header *header; struct rrdeng_df_extent_trailer *trailer; uLong crc; xt_io_descr = req->data; - if (req->result < 0) { - error("%s: uv_fs_read: %s", __func__, uv_strerror((int)req->result)); - goto cleanup; - } - header = xt_io_descr->buf; payload_length = header->payload_length; count = header->number_of_pages; - payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count; - trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer); + + if (req->result < 0) { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + have_read_error = 1; + error("%s: uv_fs_read - %s - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, + uv_strerror((int)req->result), xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + goto after_crc_check; + } crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer)); ret = crc32cmp(trailer->checksum, crc); @@ -66,12 +71,17 @@ void read_extent_cb(uv_fs_t* req) } #endif if (unlikely(ret)) { - /* TODO: handle errors */ - exit(UV_EIO); - goto cleanup; + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); + have_read_error = 1; + error("%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: FAILED", __func__, + xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); } - if (RRD_NO_COMPRESSION != header->compression_algorithm) { +after_crc_check: + if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { uncompressed_payload_length = 0; for (i = 0 ; i < count ; ++i) { uncompressed_payload_length += header->descr[i].page_length; @@ -99,7 +109,10 @@ void read_extent_cb(uv_fs_t* req) page_offset += header->descr[j].page_length; } /* care, we don't hold the descriptor mutex */ - if (RRD_NO_COMPRESSION == header->compression_algorithm) { + if (have_read_error) { + /* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */ + memset(page, 0, descr->page_length); + } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length); } else { (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length); @@ -118,12 +131,11 @@ void read_extent_cb(uv_fs_t* req) } rrdeng_page_descr_mutex_unlock(ctx, descr); } - if (RRD_NO_COMPRESSION != header->compression_algorithm) { + if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { freez(uncompressed_buf); } if (xt_io_descr->completion) complete(xt_io_descr->completion); -cleanup: uv_fs_req_cleanup(req); free(xt_io_descr->buf); freez(xt_io_descr); @@ -246,8 +258,9 @@ void flush_pages_cb(uv_fs_t* req) xt_io_descr = req->data; if (req->result < 0) { + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); - goto cleanup; } #ifdef NETDATA_INTERNAL_CHECKS { @@ -279,7 +292,6 @@ void flush_pages_cb(uv_fs_t* req) } if (xt_io_descr->completion) complete(xt_io_descr->completion); -cleanup: uv_fs_req_cleanup(req); free(xt_io_descr->buf); freez(xt_io_descr); |