summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/journalfile.c12
-rw-r--r--database/engine/rrdengine.c44
2 files changed, 37 insertions, 19 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 30eaa0ec..d6e4f317 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -3,13 +3,18 @@
static void flush_transaction_buffer_cb(uv_fs_t* req)
{
- struct generic_io_descriptor *io_descr;
+ struct generic_io_descriptor *io_descr = req->data;
+ struct rrdengine_worker_config* wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
if (req->result < 0) {
- fatal("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ } else {
+ debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
}
- io_descr = req->data;
uv_fs_req_cleanup(req);
free(io_descr->buf);
@@ -348,6 +353,7 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
ret = crc32cmp(jf_trailer->checksum, crc);
debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
if (unlikely(ret)) {
+ error("Transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
return size_bytes;
}
switch (jf_header->type) {
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 0f2dceaa..221216bb 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -37,24 +37,29 @@ void read_extent_cb(uv_fs_t* req)
unsigned i, j, count;
void *page, *uncompressed_buf = NULL;
uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length;
+ uint8_t have_read_error = 0;
/* persistent structures */
struct rrdeng_df_extent_header *header;
struct rrdeng_df_extent_trailer *trailer;
uLong crc;
xt_io_descr = req->data;
- if (req->result < 0) {
- error("%s: uv_fs_read: %s", __func__, uv_strerror((int)req->result));
- goto cleanup;
- }
-
header = xt_io_descr->buf;
payload_length = header->payload_length;
count = header->number_of_pages;
-
payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
-
trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer);
+
+ if (req->result < 0) {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ have_read_error = 1;
+ error("%s: uv_fs_read - %s - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__,
+ uv_strerror((int)req->result), xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ goto after_crc_check;
+ }
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer));
ret = crc32cmp(trailer->checksum, crc);
@@ -66,12 +71,17 @@ void read_extent_cb(uv_fs_t* req)
}
#endif
if (unlikely(ret)) {
- /* TODO: handle errors */
- exit(UV_EIO);
- goto cleanup;
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
+ have_read_error = 1;
+ error("%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: FAILED", __func__,
+ xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
}
- if (RRD_NO_COMPRESSION != header->compression_algorithm) {
+after_crc_check:
+ if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
uncompressed_payload_length = 0;
for (i = 0 ; i < count ; ++i) {
uncompressed_payload_length += header->descr[i].page_length;
@@ -99,7 +109,10 @@ void read_extent_cb(uv_fs_t* req)
page_offset += header->descr[j].page_length;
}
/* care, we don't hold the descriptor mutex */
- if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ if (have_read_error) {
+ /* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */
+ memset(page, 0, descr->page_length);
+ } else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
(void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length);
} else {
(void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
@@ -118,12 +131,11 @@ void read_extent_cb(uv_fs_t* req)
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
}
- if (RRD_NO_COMPRESSION != header->compression_algorithm) {
+ if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
freez(uncompressed_buf);
}
if (xt_io_descr->completion)
complete(xt_io_descr->completion);
-cleanup:
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
freez(xt_io_descr);
@@ -246,8 +258,9 @@ void flush_pages_cb(uv_fs_t* req)
xt_io_descr = req->data;
if (req->result < 0) {
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
- goto cleanup;
}
#ifdef NETDATA_INTERNAL_CHECKS
{
@@ -279,7 +292,6 @@ void flush_pages_cb(uv_fs_t* req)
}
if (xt_io_descr->completion)
complete(xt_io_descr->completion);
-cleanup:
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
freez(xt_io_descr);