// SPDX-License-Identifier: GPL-3.0-or-later #define NETDATA_RRD_INTERNALS #include "rrdengine.h" rrdeng_stats_t global_io_errors = 0; rrdeng_stats_t global_fs_errors = 0; rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; void sanity_check(void) { /* Magic numbers must fit in the super-blocks */ BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ); BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ); /* Version strings must fit in the super-blocks */ BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ); BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ); /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */ BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0); BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */ /* page count must fit in 8 bits */ BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); /* page info scratch space must be able to hold 2 32-bit integers */ BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t)); } void read_extent_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; struct rrdengine_instance *ctx = wc->ctx; struct extent_io_descriptor *xt_io_descr; struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; int ret; unsigned i, j, count; void *page, *uncompressed_buf = NULL; uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length; uint8_t have_read_error = 0; /* persistent structures */ struct rrdeng_df_extent_header *header; struct rrdeng_df_extent_trailer *trailer; uLong crc; xt_io_descr = req->data; header = xt_io_descr->buf; payload_length = header->payload_length; count = header->number_of_pages; payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count; trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer); if (req->result < 0) { struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; ++ctx->stats.io_errors; rrd_stat_atomic_add(&global_io_errors, 1); have_read_error = 1; error("%s: uv_fs_read - %s - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, uv_strerror((int)req->result), xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); goto after_crc_check; } crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer)); ret = crc32cmp(trailer->checksum, crc); #ifdef NETDATA_INTERNAL_CHECKS { struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); } #endif if (unlikely(ret)) { struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; ++ctx->stats.io_errors; rrd_stat_atomic_add(&global_io_errors, 1); have_read_error = 1; error("%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: FAILED", __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); } after_crc_check: if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { uncompressed_payload_length = 0; for (i = 0 ; i < count ; ++i) { uncompressed_payload_length += header->descr[i].page_length; } uncompressed_buf = mallocz(uncompressed_payload_length); ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf, payload_length, uncompressed_payload_length); ctx->stats.before_decompress_bytes += payload_length; ctx->stats.after_decompress_bytes += ret; debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret); /* care, we don't hold the descriptor mutex */ } for (i = 0 ; i < xt_io_descr->descr_count; ++i) { page = mallocz(RRDENG_BLOCK_SIZE); descr = xt_io_descr->descr_array[i]; for (j = 0, page_offset = 0; j < count; ++j) { /* care, we don't hold the descriptor mutex */ if (!uuid_compare(*(uuid_t *) header->descr[j].uuid, *descr->id) && header->descr[j].page_length == descr->page_length && header->descr[j].start_time == descr->start_time && header->descr[j].end_time == descr->end_time) { break; } page_offset += header->descr[j].page_length; } /* care, we don't hold the descriptor mutex */ if (have_read_error) { /* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */ memset(page, 0, descr->page_length); } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length); } else { (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length); } pg_cache_replaceQ_insert(ctx, descr); rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; pg_cache_descr->page = page; pg_cache_descr->flags |= RRD_PAGE_POPULATED; pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); if (xt_io_descr->release_descr) { pg_cache_put_unsafe(descr); } else { pg_cache_wake_up_waiters_unsafe(descr); } rrdeng_page_descr_mutex_unlock(ctx, descr); } if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { freez(uncompressed_buf); } if (xt_io_descr->completion) complete(xt_io_descr->completion); uv_fs_req_cleanup(req); free(xt_io_descr->buf); freez(xt_io_descr); } static void do_read_extent(struct rrdengine_worker_config* wc, struct rrdeng_page_descr **descr, unsigned count, uint8_t release_descr) { struct rrdengine_instance *ctx = wc->ctx; struct page_cache_descr *pg_cache_descr; int ret; unsigned i, size_bytes, pos, real_io_size; // uint32_t payload_length; struct extent_io_descriptor *xt_io_descr; struct rrdengine_datafile *datafile; datafile = descr[0]->extent->datafile; pos = descr[0]->extent->offset; size_bytes = descr[0]->extent->size; xt_io_descr = mallocz(sizeof(*xt_io_descr)); ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("posix_memalign:%s", strerror(ret)); /* freez(xt_io_descr); return;*/ } for (i = 0 ; i < count; ++i) { rrdeng_page_descr_mutex_lock(ctx, descr[i]); pg_cache_descr = descr[i]->pg_cache_descr; pg_cache_descr->flags |= RRD_PAGE_READ_PENDING; // payload_length = descr[i]->page_length; rrdeng_page_descr_mutex_unlock(ctx, descr[i]); xt_io_descr->descr_array[i] = descr[i]; } xt_io_descr->descr_count = count; xt_io_descr->bytes = size_bytes; xt_io_descr->pos = pos; xt_io_descr->req.data = xt_io_descr; xt_io_descr->completion = NULL; /* xt_io_descr->descr_commit_idx_array[0] */ xt_io_descr->release_descr = release_descr; real_io_size = ALIGN_BYTES_CEILING(size_bytes); xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb); assert (-1 != ret); ctx->stats.io_read_bytes += real_io_size; ++ctx->stats.io_read_requests; ctx->stats.io_read_extent_bytes += real_io_size; ++ctx->stats.io_read_extents; ctx->stats.pg_cache_backfills += count; } static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent_io_descriptor *xt_io_descr) { struct rrdengine_instance *ctx = wc->ctx; unsigned count, payload_length, descr_size, size_bytes; void *buf; /* persistent structures */ struct rrdeng_df_extent_header *df_header; struct rrdeng_jf_transaction_header *jf_header; struct rrdeng_jf_store_data *jf_metric_data; struct rrdeng_jf_transaction_trailer *jf_trailer; uLong crc; df_header = xt_io_descr->buf; count = df_header->number_of_pages; descr_size = sizeof(*jf_metric_data->descr) * count; payload_length = sizeof(*jf_metric_data) + descr_size; size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); buf = wal_get_transaction_buffer(wc, size_bytes); jf_header = buf; jf_header->type = STORE_DATA; jf_header->reserved = 0; jf_header->id = ctx->commit_log.transaction_id++; jf_header->payload_length = payload_length; jf_metric_data = buf + sizeof(*jf_header); jf_metric_data->extent_offset = xt_io_descr->pos; jf_metric_data->extent_size = xt_io_descr->bytes; jf_metric_data->number_of_pages = count; memcpy(jf_metric_data->descr, df_header->descr, descr_size); jf_trailer = buf + sizeof(*jf_header) + payload_length; crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); crc32set(jf_trailer->checksum, crc); } static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t type, void *data) { switch (type) { case STORE_DATA: commit_data_extent(wc, (struct extent_io_descriptor *)data); break; default: assert(type == STORE_DATA); break; } } void flush_pages_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; struct rrdengine_instance *ctx = wc->ctx; struct page_cache *pg_cache = &ctx->pg_cache; struct extent_io_descriptor *xt_io_descr; struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; int ret; unsigned i, count; Word_t commit_id; xt_io_descr = req->data; if (req->result < 0) { ++ctx->stats.io_errors; rrd_stat_atomic_add(&global_io_errors, 1); error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); } #ifdef NETDATA_INTERNAL_CHECKS { struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); } #endif count = xt_io_descr->descr_count; for (i = 0 ; i < count ; ++i) { /* care, we don't hold the descriptor mutex */ descr = xt_io_descr->descr_array[i]; uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); commit_id = xt_io_descr->descr_commit_idx_array[i]; ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, commit_id, PJE0); assert(1 == ret); --pg_cache->commited_page_index.nr_commited_pages; uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); pg_cache_replaceQ_insert(ctx, descr); rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING); /* wake up waiters, care no reference being held */ pg_cache_wake_up_waiters_unsafe(descr); rrdeng_page_descr_mutex_unlock(ctx, descr); } if (xt_io_descr->completion) complete(xt_io_descr->completion); uv_fs_req_cleanup(req); free(xt_io_descr->buf); freez(xt_io_descr); } /* * completion must be NULL or valid. * Returns 0 when no flushing can take place. * Returns datafile bytes to be written on successful flushing initiation. */ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct completion *completion) { struct rrdengine_instance *ctx = wc->ctx; struct page_cache *pg_cache = &ctx->pg_cache; int ret; int compressed_size, max_compressed_size = 0; unsigned i, count, size_bytes, pos, real_io_size; uint32_t uncompressed_payload_length, payload_offset; struct rrdeng_page_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; struct page_cache_descr *pg_cache_descr; struct extent_io_descriptor *xt_io_descr; void *compressed_buf = NULL; Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; Pvoid_t *PValue; Word_t Index; uint8_t compression_algorithm = ctx->global_compress_alg; struct extent_info *extent; struct rrdengine_datafile *datafile; /* persistent structures */ struct rrdeng_df_extent_header *header; struct rrdeng_df_extent_trailer *trailer; uLong crc; if (force) { debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure."); } uv_rwlock_rdlock(&pg_cache->commited_page_index.lock); for (Index = 0, count = 0, uncompressed_payload_length = 0, PValue = JudyLFirst(pg_cache->commited_page_index.JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue ; descr != NULL && count != MAX_PAGES_PER_EXTENT ; PValue = JudyLNext(pg_cache->commited_page_index.JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue) { assert(0 != descr->page_length); rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) { /* care, no reference being held */ pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING; uncompressed_payload_length += descr->page_length; descr_commit_idx_array[count] = Index; eligible_pages[count++] = descr; } rrdeng_page_descr_mutex_unlock(ctx, descr); } uv_rwlock_rdunlock(&pg_cache->commited_page_index.lock); if (!count) { debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__); if (completion) complete(completion); return 0; } xt_io_descr = mallocz(sizeof(*xt_io_descr)); payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); switch (compression_algorithm) { case RRD_NO_COMPRESSION: size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); break; default: /* Compress */ assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); max_compressed_size = LZ4_compressBound(uncompressed_payload_length); compressed_buf = mallocz(max_compressed_size); size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); break; } ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("posix_memalign:%s", strerror(ret)); /* freez(xt_io_descr);*/ } (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count); xt_io_descr->descr_count = count; pos = 0; header = xt_io_descr->buf; header->compression_algorithm = compression_algorithm; header->number_of_pages = count; pos += sizeof(*header); extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0])); datafile = ctx->datafiles.last; /* TODO: check for exceeded size quota */ extent->offset = datafile->pos; extent->number_of_pages = count; extent->datafile = datafile; extent->next = NULL; for (i = 0 ; i < count ; ++i) { /* This is here for performance reasons */ xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i]; descr = xt_io_descr->descr_array[i]; header->descr[i].type = PAGE_METRICS; uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); header->descr[i].page_length = descr->page_length; header->descr[i].start_time = descr->start_time; header->descr[i].end_time = descr->end_time; pos += sizeof(header->descr[i]); } for (i = 0 ; i < count ; ++i) { descr = xt_io_descr->descr_array[i]; /* care, we don't hold the descriptor mutex */ (void) memcpy(xt_io_descr->buf + pos, descr->pg_cache_descr->page, descr->page_length); descr->extent = extent; extent->pages[i] = descr; pos += descr->page_length; } df_extent_insert(extent); switch (compression_algorithm) { case RRD_NO_COMPRESSION: header->payload_length = uncompressed_payload_length; break; default: /* Compress */ compressed_size = LZ4_compress_default(xt_io_descr->buf + payload_offset, compressed_buf, uncompressed_payload_length, max_compressed_size); ctx->stats.before_compress_bytes += uncompressed_payload_length; ctx->stats.after_compress_bytes += compressed_size; debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size); (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); freez(compressed_buf); size_bytes = payload_offset + compressed_size + sizeof(*trailer); header->payload_length = compressed_size; break; } extent->size = size_bytes; xt_io_descr->bytes = size_bytes; xt_io_descr->pos = datafile->pos; xt_io_descr->req.data = xt_io_descr; xt_io_descr->completion = completion; trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer); crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer)); crc32set(trailer->checksum, crc); real_io_size = ALIGN_BYTES_CEILING(size_bytes); xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb); assert (-1 != ret); ctx->stats.io_write_bytes += real_io_size; ++ctx->stats.io_write_requests; ctx->stats.io_write_extent_bytes += real_io_size; ++ctx->stats.io_write_extents; do_commit_transaction(wc, STORE_DATA, xt_io_descr); datafile->pos += ALIGN_BYTES_CEILING(size_bytes); ctx->disk_space += ALIGN_BYTES_CEILING(size_bytes); rrdeng_test_quota(wc); return ALIGN_BYTES_CEILING(size_bytes); } static void after_delete_old_data(uv_work_t *req, int status) { struct rrdengine_instance *ctx = req->data; struct rrdengine_worker_config* wc = &ctx->worker_config; struct rrdengine_datafile *datafile; struct rrdengine_journalfile *journalfile; unsigned deleted_bytes, journalfile_bytes, datafile_bytes; int ret; char path[RRDENG_PATH_MAX]; (void)status; datafile = ctx->datafiles.first; journalfile = datafile->journalfile; datafile_bytes = datafile->pos; journalfile_bytes = journalfile->pos; deleted_bytes = 0; info("Deleting data and journal file pair."); datafile_list_delete(ctx, datafile); ret = destroy_journal_file(journalfile, datafile); if (!ret) { generate_journalfilepath(datafile, path, sizeof(path)); info("Deleted journal file \"%s\".", path); deleted_bytes += journalfile_bytes; } ret = destroy_data_file(datafile); if (!ret) { generate_datafilepath(datafile, path, sizeof(path)); info("Deleted data file \"%s\".", path); deleted_bytes += datafile_bytes; } freez(journalfile); freez(datafile); ctx->disk_space -= deleted_bytes; info("Reclaimed %u bytes of disk space.", deleted_bytes); /* unfreeze command processing */ wc->now_deleting.data = NULL; /* wake up event loop */ assert(0 == uv_async_send(&wc->async)); } static void delete_old_data(uv_work_t *req) { struct rrdengine_instance *ctx = req->data; struct rrdengine_datafile *datafile; struct extent_info *extent, *next; struct rrdeng_page_descr *descr; unsigned count, i; /* Safe to use since it will be deleted after we are done */ datafile = ctx->datafiles.first; for (extent = datafile->extents.first ; extent != NULL ; extent = next) { count = extent->number_of_pages; for (i = 0 ; i < count ; ++i) { descr = extent->pages[i]; pg_cache_punch_hole(ctx, descr, 0); } next = extent->next; freez(extent); } } void rrdeng_test_quota(struct rrdengine_worker_config* wc) { struct rrdengine_instance *ctx = wc->ctx; struct rrdengine_datafile *datafile; unsigned current_size, target_size; uint8_t out_of_space, only_one_datafile; int ret; out_of_space = 0; if (unlikely(ctx->disk_space > ctx->max_disk_space)) { out_of_space = 1; } datafile = ctx->datafiles.last; current_size = datafile->pos; target_size = ctx->max_disk_space / TARGET_DATAFILES; target_size = MIN(target_size, MAX_DATAFILE_SIZE); target_size = MAX(target_size, MIN_DATAFILE_SIZE); only_one_datafile = (datafile == ctx->datafiles.first) ? 1 : 0; if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) { /* Finalize data and journal file and create a new pair */ wal_flush_transaction_buffer(wc); ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1); if (likely(!ret)) { ++ctx->last_fileno; } } if (unlikely(out_of_space)) { /* delete old data */ if (wc->now_deleting.data) { /* already deleting data */ return; } if (NULL == ctx->datafiles.first->next) { error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\"" " to reclaim space, there are no other file pairs left.", ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); return; } info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); wc->now_deleting.data = ctx; assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data)); } } /* return 0 on success */ int init_rrd_files(struct rrdengine_instance *ctx) { return init_data_files(ctx); } void finalize_rrd_files(struct rrdengine_instance *ctx) { return finalize_data_files(ctx); } void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc) { wc->cmd_queue.head = wc->cmd_queue.tail = 0; wc->queue_size = 0; assert(0 == uv_cond_init(&wc->cmd_cond)); assert(0 == uv_mutex_init(&wc->cmd_mutex)); } void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) { unsigned queue_size; /* wait for free space in queue */ uv_mutex_lock(&wc->cmd_mutex); while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) { uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); } assert(queue_size < RRDENG_CMD_Q_MAX_SIZE); /* enqueue command */ wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ? wc->cmd_queue.tail + 1 : 0; wc->queue_size = queue_size + 1; uv_mutex_unlock(&wc->cmd_mutex); /* wake up event loop */ assert(0 == uv_async_send(&wc->async)); } struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc) { struct rrdeng_cmd ret; unsigned queue_size; uv_mutex_lock(&wc->cmd_mutex); queue_size = wc->queue_size; if (queue_size == 0) { ret.opcode = RRDENG_NOOP; } else { /* dequeue command */ ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; if (queue_size == 1) { wc->cmd_queue.head = wc->cmd_queue.tail = 0; } else { wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ? wc->cmd_queue.head + 1 : 0; } wc->queue_size = queue_size - 1; /* wake up producers */ uv_cond_signal(&wc->cmd_cond); } uv_mutex_unlock(&wc->cmd_mutex); return ret; } void async_cb(uv_async_t *handle) { uv_stop(handle->loop); uv_update_time(handle->loop); debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); } void timer_cb(uv_timer_t* handle) { struct rrdengine_worker_config* wc = handle->data; uv_stop(handle->loop); uv_update_time(handle->loop); rrdeng_test_quota(wc); debug(D_RRDENGINE, "%s: timeout reached.", __func__); if (likely(!wc->now_deleting.data)) { unsigned total_bytes, bytes_written; /* There is free space so we can write to disk */ debug(D_RRDENGINE, "Flushing pages to disk."); for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ; bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ; total_bytes += bytes_written) { bytes_written = do_flush_pages(wc, 0, NULL); } } #ifdef NETDATA_INTERNAL_CHECKS { char buf[4096]; debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf))); } #endif } /* Flushes dirty pages when timer expires */ #define TIMER_PERIOD_MS (1000) #define CMD_BATCH_SIZE (256) void rrdeng_worker(void* arg) { struct rrdengine_worker_config* wc = arg; struct rrdengine_instance *ctx = wc->ctx; uv_loop_t* loop; int shutdown, ret; enum rrdeng_opcode opcode; uv_timer_t timer_req; struct rrdeng_cmd cmd; rrdeng_init_cmd_queue(wc); loop = wc->loop = mallocz(sizeof(uv_loop_t)); ret = uv_loop_init(loop); if (ret) { error("uv_loop_init(): %s", uv_strerror(ret)); goto error_after_loop_init; } loop->data = wc; ret = uv_async_init(wc->loop, &wc->async, async_cb); if (ret) { error("uv_async_init(): %s", uv_strerror(ret)); goto error_after_async_init; } wc->async.data = wc; wc->now_deleting.data = NULL; /* dirty page flushing timer */ ret = uv_timer_init(loop, &timer_req); if (ret) { error("uv_timer_init(): %s", uv_strerror(ret)); goto error_after_timer_init; } timer_req.data = wc; wc->error = 0; /* wake up initialization thread */ complete(&ctx->rrdengine_completion); assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); shutdown = 0; while (shutdown == 0 || uv_loop_alive(loop)) { uv_run(loop, UV_RUN_DEFAULT); /* wait for commands */ do { cmd = rrdeng_deq_cmd(wc); opcode = cmd.opcode; switch (opcode) { case RRDENG_NOOP: /* the command queue was empty, do nothing */ break; case RRDENG_SHUTDOWN: shutdown = 1; /* * uv_async_send after uv_close does not seem to crash in linux at the moment, * it is however undocumented behaviour and we need to be aware if this becomes * an issue in the future. */ uv_close((uv_handle_t *)&wc->async, NULL); assert(0 == uv_timer_stop(&timer_req)); uv_close((uv_handle_t *)&timer_req, NULL); break; case RRDENG_READ_PAGE: do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0); break; case RRDENG_READ_EXTENT: do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1); break; case RRDENG_COMMIT_PAGE: do_commit_transaction(wc, STORE_DATA, NULL); break; case RRDENG_FLUSH_PAGES: { unsigned bytes_written; /* First I/O should be enough to call completion */ bytes_written = do_flush_pages(wc, 1, cmd.completion); if (bytes_written) { while (do_flush_pages(wc, 1, NULL)) { ; /* Force flushing of all commited pages. */ } } break; } default: debug(D_RRDENGINE, "%s: default.", __func__); break; } } while (opcode != RRDENG_NOOP); } /* cleanup operations of the event loop */ if (unlikely(wc->now_deleting.data)) { info("Postponing shutting RRD engine event loop down until after datafile deletion is finished."); } info("Shutting down RRD engine event loop."); while (do_flush_pages(wc, 1, NULL)) { ; /* Force flushing of all commited pages. */ } wal_flush_transaction_buffer(wc); uv_run(loop, UV_RUN_DEFAULT); info("Shutting down RRD engine event loop complete."); /* TODO: don't let the API block by waiting to enqueue commands */ uv_cond_destroy(&wc->cmd_cond); /* uv_mutex_destroy(&wc->cmd_mutex); */ assert(0 == uv_loop_close(loop)); freez(loop); return; error_after_timer_init: uv_close((uv_handle_t *)&wc->async, NULL); error_after_async_init: assert(0 == uv_loop_close(loop)); error_after_loop_init: freez(loop); wc->error = UV_EAGAIN; /* wake up initialization thread */ complete(&ctx->rrdengine_completion); } #define NR_PAGES (256) static void basic_functional_test(struct rrdengine_instance *ctx) { int i, j, failed_validations; uuid_t uuid[NR_PAGES]; void *buf; struct rrdeng_page_descr *handle[NR_PAGES]; char uuid_str[UUID_STR_LEN]; char backup[NR_PAGES][UUID_STR_LEN * 100]; /* backup storage for page data verification */ for (i = 0 ; i < NR_PAGES ; ++i) { uuid_generate(uuid[i]); uuid_unparse_lower(uuid[i], uuid_str); // fprintf(stderr, "Generated uuid[%d]=%s\n", i, uuid_str); buf = rrdeng_create_page(ctx, &uuid[i], &handle[i]); /* Each page contains 10 times its own UUID stringified */ for (j = 0 ; j < 100 ; ++j) { strcpy(buf + UUID_STR_LEN * j, uuid_str); strcpy(backup[i] + UUID_STR_LEN * j, uuid_str); } rrdeng_commit_page(ctx, handle[i], (Word_t)i); } fprintf(stderr, "\n********** CREATED %d METRIC PAGES ***********\n\n", NR_PAGES); failed_validations = 0; for (i = 0 ; i < NR_PAGES ; ++i) { buf = rrdeng_get_latest_page(ctx, &uuid[i], (void **)&handle[i]); if (NULL == buf) { ++failed_validations; fprintf(stderr, "Page %d was LOST.\n", i); } if (memcmp(backup[i], buf, UUID_STR_LEN * 100)) { ++failed_validations; fprintf(stderr, "Page %d data comparison with backup FAILED validation.\n", i); } rrdeng_put_page(ctx, handle[i]); } fprintf(stderr, "\n********** CORRECTLY VALIDATED %d/%d METRIC PAGES ***********\n\n", NR_PAGES - failed_validations, NR_PAGES); } /* C entry point for development purposes * make "LDFLAGS=-errdengine_main" */ void rrdengine_main(void) { int ret; struct rrdengine_instance *ctx; ret = rrdeng_init(&ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB); if (ret) { exit(ret); } basic_functional_test(ctx); rrdeng_exit(ctx); fprintf(stderr, "Hello world!"); exit(0); }