diff options
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r-- | database/engine/rrdengine.c | 780 |
1 files changed, 780 insertions, 0 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c new file mode 100644 index 000000000..b8e4eba01 --- /dev/null +++ b/database/engine/rrdengine.c @@ -0,0 +1,780 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "rrdengine.h" + +void sanity_check(void) +{ + /* Magic numbers must fit in the super-blocks */ + BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ); + BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ); + + /* Version strings must fit in the super-blocks */ + BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ); + BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ); + + /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */ + BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0); + + BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */ + + /* page count must fit in 8 bits */ + BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); +} + +void read_extent_cb(uv_fs_t* req) +{ + struct rrdengine_worker_config* wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + struct extent_io_descriptor *xt_io_descr; + struct rrdeng_page_cache_descr *descr; + int ret; + unsigned i, j, count; + void *page, *uncompressed_buf = NULL; + uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length; + struct rrdengine_datafile *datafile; + /* persistent structures */ + struct rrdeng_df_extent_header *header; + struct rrdeng_df_extent_trailer *trailer; + uLong crc; + + xt_io_descr = req->data; + if (req->result < 0) { + error("%s: uv_fs_read: %s", __func__, uv_strerror((int)req->result)); + goto cleanup; + } + + header = xt_io_descr->buf; + payload_length = header->payload_length; + count = header->number_of_pages; + + payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count; + + trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer)); + ret = crc32cmp(trailer->checksum, crc); + datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, + xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); + if (unlikely(ret)) { + /* TODO: handle errors */ + exit(UV_EIO); + goto cleanup; + } + + if (RRD_NO_COMPRESSION != header->compression_algorithm) { + uncompressed_payload_length = 0; + for (i = 0 ; i < count ; ++i) { + uncompressed_payload_length += header->descr[i].page_length; + } + uncompressed_buf = mallocz(uncompressed_payload_length); + ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf, + payload_length, uncompressed_payload_length); + ctx->stats.before_decompress_bytes += payload_length; + ctx->stats.after_decompress_bytes += ret; + debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret); + /* care, we don't hold the descriptor mutex */ + } + + for (i = 0 ; i < xt_io_descr->descr_count; ++i) { + page = mallocz(RRDENG_BLOCK_SIZE); + descr = xt_io_descr->descr_array[i]; + for (j = 0, page_offset = 0; j < count; ++j) { + /* care, we don't hold the descriptor mutex */ + if (!uuid_compare(*(uuid_t *) header->descr[j].uuid, *descr->id) && + header->descr[j].page_length == descr->page_length && + header->descr[j].start_time == descr->start_time && + header->descr[j].end_time == descr->end_time) { + break; + } + page_offset += header->descr[j].page_length; + } + /* care, we don't hold the descriptor mutex */ + if (RRD_NO_COMPRESSION == header->compression_algorithm) { + (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length); + } else { + (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length); + } + pg_cache_replaceQ_insert(ctx, descr); + uv_mutex_lock(&descr->mutex); + descr->page = page; + descr->flags |= RRD_PAGE_POPULATED; + descr->flags &= ~RRD_PAGE_READ_PENDING; + debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + if (xt_io_descr->release_descr) { + pg_cache_put_unsafe(descr); + } else { + pg_cache_wake_up_waiters_unsafe(descr); + } + uv_mutex_unlock(&descr->mutex); + } + if (RRD_NO_COMPRESSION != header->compression_algorithm) { + free(uncompressed_buf); + } + if (xt_io_descr->completion) + complete(xt_io_descr->completion); +cleanup: + uv_fs_req_cleanup(req); + free(xt_io_descr->buf); + free(xt_io_descr); +} + + +static void do_read_extent(struct rrdengine_worker_config* wc, + struct rrdeng_page_cache_descr **descr, + unsigned count, + uint8_t release_descr) +{ + struct rrdengine_instance *ctx = wc->ctx; + int ret; + unsigned i, size_bytes, pos, real_io_size; +// uint32_t payload_length; + struct extent_io_descriptor *xt_io_descr; + struct rrdengine_datafile *datafile; + + datafile = descr[0]->extent->datafile; + pos = descr[0]->extent->offset; + size_bytes = descr[0]->extent->size; + + xt_io_descr = mallocz(sizeof(*xt_io_descr)); + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + /* free(xt_io_descr); + return;*/ + } + for (i = 0 ; i < count; ++i) { + uv_mutex_lock(&descr[i]->mutex); + descr[i]->flags |= RRD_PAGE_READ_PENDING; +// payload_length = descr[i]->page_length; + uv_mutex_unlock(&descr[i]->mutex); + + xt_io_descr->descr_array[i] = descr[i]; + } + xt_io_descr->descr_count = count; + xt_io_descr->bytes = size_bytes; + xt_io_descr->pos = pos; + xt_io_descr->req.data = xt_io_descr; + xt_io_descr->completion = NULL; + /* xt_io_descr->descr_commit_idx_array[0] */ + xt_io_descr->release_descr = release_descr; + + real_io_size = ALIGN_BYTES_CEILING(size_bytes); + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb); + assert (-1 != ret); + ctx->stats.io_read_bytes += real_io_size; + ++ctx->stats.io_read_requests; + ctx->stats.io_read_extent_bytes += real_io_size; + ++ctx->stats.io_read_extents; + ctx->stats.pg_cache_backfills += count; +} + +static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent_io_descriptor *xt_io_descr) +{ + struct rrdengine_instance *ctx = wc->ctx; + unsigned count, payload_length, descr_size, size_bytes; + void *buf; + /* persistent structures */ + struct rrdeng_df_extent_header *df_header; + struct rrdeng_jf_transaction_header *jf_header; + struct rrdeng_jf_store_data *jf_metric_data; + struct rrdeng_jf_transaction_trailer *jf_trailer; + uLong crc; + + df_header = xt_io_descr->buf; + count = df_header->number_of_pages; + descr_size = sizeof(*jf_metric_data->descr) * count; + payload_length = sizeof(*jf_metric_data) + descr_size; + size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); + + buf = wal_get_transaction_buffer(wc, size_bytes); + + jf_header = buf; + jf_header->type = STORE_DATA; + jf_header->reserved = 0; + jf_header->id = ctx->commit_log.transaction_id++; + jf_header->payload_length = payload_length; + + jf_metric_data = buf + sizeof(*jf_header); + jf_metric_data->extent_offset = xt_io_descr->pos; + jf_metric_data->extent_size = xt_io_descr->bytes; + jf_metric_data->number_of_pages = count; + memcpy(jf_metric_data->descr, df_header->descr, descr_size); + + jf_trailer = buf + sizeof(*jf_header) + payload_length; + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); + crc32set(jf_trailer->checksum, crc); +} + +static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t type, void *data) +{ + switch (type) { + case STORE_DATA: + commit_data_extent(wc, (struct extent_io_descriptor *)data); + break; + default: + assert(type == STORE_DATA); + break; + } +} + +void flush_pages_cb(uv_fs_t* req) +{ + struct rrdengine_worker_config* wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + struct extent_io_descriptor *xt_io_descr; + struct rrdeng_page_cache_descr *descr; + struct rrdengine_datafile *datafile; + int ret; + unsigned i, count; + Word_t commit_id; + + xt_io_descr = req->data; + if (req->result < 0) { + error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + goto cleanup; + } + datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", + __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + + count = xt_io_descr->descr_count; + for (i = 0 ; i < count ; ++i) { + /* care, we don't hold the descriptor mutex */ + descr = xt_io_descr->descr_array[i]; + + uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); + commit_id = xt_io_descr->descr_commit_idx_array[i]; + ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, commit_id, PJE0); + assert(1 == ret); + --pg_cache->commited_page_index.nr_commited_pages; + uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); + + pg_cache_replaceQ_insert(ctx, descr); + + uv_mutex_lock(&descr->mutex); + descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING); + /* wake up waiters, care no reference being held */ + pg_cache_wake_up_waiters_unsafe(descr); + uv_mutex_unlock(&descr->mutex); + } + if (xt_io_descr->completion) + complete(xt_io_descr->completion); +cleanup: + uv_fs_req_cleanup(req); + free(xt_io_descr->buf); + free(xt_io_descr); +} + +/* + * completion must be NULL or valid. + * Returns 0 when no flushing can take place. + * Returns datafile bytes to be written on successful flushing initiation. + */ +static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct completion *completion) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + int ret; + int compressed_size, max_compressed_size = 0; + unsigned i, count, size_bytes, pos, real_io_size; + uint32_t uncompressed_payload_length, payload_offset; + struct rrdeng_page_cache_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; + struct extent_io_descriptor *xt_io_descr; + void *compressed_buf = NULL; + Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; + Pvoid_t *PValue; + Word_t Index; + uint8_t compression_algorithm = ctx->global_compress_alg; + struct extent_info *extent; + struct rrdengine_datafile *datafile; + /* persistent structures */ + struct rrdeng_df_extent_header *header; + struct rrdeng_df_extent_trailer *trailer; + uLong crc; + + if (force) { + debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure."); + } + uv_rwlock_rdlock(&pg_cache->commited_page_index.lock); + for (Index = 0, count = 0, uncompressed_payload_length = 0, + PValue = JudyLFirst(pg_cache->commited_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue ; + + descr != NULL && count != MAX_PAGES_PER_EXTENT ; + + PValue = JudyLNext(pg_cache->commited_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + assert(0 != descr->page_length); + + uv_mutex_lock(&descr->mutex); + if (!(descr->flags & RRD_PAGE_WRITE_PENDING)) { + /* care, no reference being held */ + descr->flags |= RRD_PAGE_WRITE_PENDING; + uncompressed_payload_length += descr->page_length; + descr_commit_idx_array[count] = Index; + eligible_pages[count++] = descr; + } + uv_mutex_unlock(&descr->mutex); + } + uv_rwlock_rdunlock(&pg_cache->commited_page_index.lock); + + if (!count) { + debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__); + if (completion) + complete(completion); + return 0; + } + xt_io_descr = mallocz(sizeof(*xt_io_descr)); + payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); + switch (compression_algorithm) { + case RRD_NO_COMPRESSION: + size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); + break; + default: /* Compress */ + assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); + max_compressed_size = LZ4_compressBound(uncompressed_payload_length); + compressed_buf = mallocz(max_compressed_size); + size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); + break; + } + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + /* free(xt_io_descr);*/ + } + (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_cache_descr *) * count); + xt_io_descr->descr_count = count; + + pos = 0; + header = xt_io_descr->buf; + header->compression_algorithm = compression_algorithm; + header->number_of_pages = count; + pos += sizeof(*header); + + extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0])); + datafile = ctx->datafiles.last; /* TODO: check for exceeded size quota */ + extent->offset = datafile->pos; + extent->number_of_pages = count; + extent->datafile = datafile; + extent->next = NULL; + + for (i = 0 ; i < count ; ++i) { + /* This is here for performance reasons */ + xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i]; + + descr = xt_io_descr->descr_array[i]; + header->descr[i].type = PAGE_METRICS; + uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); + header->descr[i].page_length = descr->page_length; + header->descr[i].start_time = descr->start_time; + header->descr[i].end_time = descr->end_time; + pos += sizeof(header->descr[i]); + } + for (i = 0 ; i < count ; ++i) { + descr = xt_io_descr->descr_array[i]; + /* care, we don't hold the descriptor mutex */ + (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length); + descr->extent = extent; + extent->pages[i] = descr; + + pos += descr->page_length; + } + df_extent_insert(extent); + + switch (compression_algorithm) { + case RRD_NO_COMPRESSION: + header->payload_length = uncompressed_payload_length; + break; + default: /* Compress */ + compressed_size = LZ4_compress_default(xt_io_descr->buf + payload_offset, compressed_buf, + uncompressed_payload_length, max_compressed_size); + ctx->stats.before_compress_bytes += uncompressed_payload_length; + ctx->stats.after_compress_bytes += compressed_size; + debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size); + (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); + free(compressed_buf); + size_bytes = payload_offset + compressed_size + sizeof(*trailer); + header->payload_length = compressed_size; + break; + } + extent->size = size_bytes; + xt_io_descr->bytes = size_bytes; + xt_io_descr->pos = datafile->pos; + xt_io_descr->req.data = xt_io_descr; + xt_io_descr->completion = completion; + + trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer)); + crc32set(trailer->checksum, crc); + + real_io_size = ALIGN_BYTES_CEILING(size_bytes); + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb); + assert (-1 != ret); + ctx->stats.io_write_bytes += real_io_size; + ++ctx->stats.io_write_requests; + ctx->stats.io_write_extent_bytes += real_io_size; + ++ctx->stats.io_write_extents; + do_commit_transaction(wc, STORE_DATA, xt_io_descr); + datafile->pos += ALIGN_BYTES_CEILING(size_bytes); + ctx->disk_space += ALIGN_BYTES_CEILING(size_bytes); + rrdeng_test_quota(wc); + + return ALIGN_BYTES_CEILING(size_bytes); +} + +static void after_delete_old_data(uv_work_t *req, int status) +{ + struct rrdengine_instance *ctx = req->data; + struct rrdengine_worker_config* wc = &ctx->worker_config; + struct rrdengine_datafile *datafile; + struct rrdengine_journalfile *journalfile; + unsigned bytes; + + (void)status; + datafile = ctx->datafiles.first; + journalfile = datafile->journalfile; + bytes = datafile->pos + journalfile->pos; + + datafile_list_delete(ctx, datafile); + destroy_journal_file(journalfile, datafile); + destroy_data_file(datafile); + info("Deleted data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", + datafile->tier, datafile->fileno); + free(journalfile); + free(datafile); + + ctx->disk_space -= bytes; + info("Reclaimed %u bytes of disk space.", bytes); + + /* unfreeze command processing */ + wc->now_deleting.data = NULL; + /* wake up event loop */ + assert(0 == uv_async_send(&wc->async)); +} + +static void delete_old_data(uv_work_t *req) +{ + struct rrdengine_instance *ctx = req->data; + struct rrdengine_datafile *datafile; + struct extent_info *extent, *next; + struct rrdeng_page_cache_descr *descr; + unsigned count, i; + + /* Safe to use since it will be deleted after we are done */ + datafile = ctx->datafiles.first; + + for (extent = datafile->extents.first ; extent != NULL ; extent = next) { + count = extent->number_of_pages; + for (i = 0 ; i < count ; ++i) { + descr = extent->pages[i]; + pg_cache_punch_hole(ctx, descr); + } + next = extent->next; + free(extent); + } +} + +void rrdeng_test_quota(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct rrdengine_datafile *datafile; + unsigned current_size, target_size; + uint8_t out_of_space, only_one_datafile; + + out_of_space = 0; + if (unlikely(ctx->disk_space > ctx->max_disk_space)) { + out_of_space = 1; + } + datafile = ctx->datafiles.last; + current_size = datafile->pos; + target_size = ctx->max_disk_space / TARGET_DATAFILES; + target_size = MIN(target_size, MAX_DATAFILE_SIZE); + target_size = MAX(target_size, MIN_DATAFILE_SIZE); + only_one_datafile = (datafile == ctx->datafiles.first) ? 1 : 0; + if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) { + /* Finalize data and journal file and create a new pair */ + wal_flush_transaction_buffer(wc); + create_new_datafile_pair(ctx, 1, datafile->fileno + 1); + } + if (unlikely(out_of_space)) { + /* delete old data */ + if (wc->now_deleting.data) { + /* already deleting data */ + return; + } + info("Deleting data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", + ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + wc->now_deleting.data = ctx; + uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data); + } +} + +int init_rrd_files(struct rrdengine_instance *ctx) +{ + return init_data_files(ctx); +} + +void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc) +{ + wc->cmd_queue.head = wc->cmd_queue.tail = 0; + wc->queue_size = 0; + assert(0 == uv_cond_init(&wc->cmd_cond)); + assert(0 == uv_mutex_init(&wc->cmd_mutex)); +} + +void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) +{ + unsigned queue_size; + + /* wait for free space in queue */ + uv_mutex_lock(&wc->cmd_mutex); + while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) { + uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); + } + assert(queue_size < RRDENG_CMD_Q_MAX_SIZE); + /* enqueue command */ + wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; + wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ? + wc->cmd_queue.tail + 1 : 0; + wc->queue_size = queue_size + 1; + uv_mutex_unlock(&wc->cmd_mutex); + + /* wake up event loop */ + assert(0 == uv_async_send(&wc->async)); +} + +struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc) +{ + struct rrdeng_cmd ret; + unsigned queue_size; + + uv_mutex_lock(&wc->cmd_mutex); + queue_size = wc->queue_size; + if (queue_size == 0) { + ret.opcode = RRDENG_NOOP; + } else { + /* dequeue command */ + ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; + if (queue_size == 1) { + wc->cmd_queue.head = wc->cmd_queue.tail = 0; + } else { + wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ? + wc->cmd_queue.head + 1 : 0; + } + wc->queue_size = queue_size - 1; + + /* wake up producers */ + uv_cond_signal(&wc->cmd_cond); + } + uv_mutex_unlock(&wc->cmd_mutex); + + return ret; +} + +void async_cb(uv_async_t *handle) +{ + uv_stop(handle->loop); + uv_update_time(handle->loop); + debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); +} + +void timer_cb(uv_timer_t* handle) +{ + struct rrdengine_worker_config* wc = handle->data; + struct rrdengine_instance *ctx = wc->ctx; + + uv_stop(handle->loop); + uv_update_time(handle->loop); + rrdeng_test_quota(wc); + debug(D_RRDENGINE, "%s: timeout reached.", __func__); + if (likely(!wc->now_deleting.data)) { + unsigned total_bytes, bytes_written; + + /* There is free space so we can write to disk */ + debug(D_RRDENGINE, "Flushing pages to disk."); + for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ; + bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ; + total_bytes += bytes_written) { + bytes_written = do_flush_pages(wc, 0, NULL); + } + } +#ifdef NETDATA_INTERNAL_CHECKS + { + char buf[4096]; + debug(D_RRDENGINE, "%s", get_rrdeng_statistics(ctx, buf, sizeof(buf))); + } +#endif +} + +/* Flushes dirty pages when timer expires */ +#define TIMER_PERIOD_MS (1000) + +#define CMD_BATCH_SIZE (256) + +void rrdeng_worker(void* arg) +{ + struct rrdengine_worker_config* wc = arg; + struct rrdengine_instance *ctx = wc->ctx; + uv_loop_t* loop; + int shutdown; + enum rrdeng_opcode opcode; + uv_timer_t timer_req; + struct rrdeng_cmd cmd; + + rrdeng_init_cmd_queue(wc); + + loop = wc->loop = mallocz(sizeof(uv_loop_t)); + uv_loop_init(loop); + loop->data = wc; + + uv_async_init(wc->loop, &wc->async, async_cb); + wc->async.data = wc; + + wc->now_deleting.data = NULL; + + /* dirty page flushing timer */ + uv_timer_init(loop, &timer_req); + timer_req.data = wc; + + /* wake up initialization thread */ + complete(&ctx->rrdengine_completion); + + uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS); + shutdown = 0; + while (shutdown == 0 || uv_loop_alive(loop)) { + uv_run(loop, UV_RUN_DEFAULT); + /* wait for commands */ + do { + cmd = rrdeng_deq_cmd(wc); + opcode = cmd.opcode; + + switch (opcode) { + case RRDENG_NOOP: + /* the command queue was empty, do nothing */ + break; + case RRDENG_SHUTDOWN: + shutdown = 1; + if (unlikely(wc->now_deleting.data)) { + /* postpone shutdown until after deletion */ + info("Postponing shutting RRD engine event loop down until after datafile deletion is finished."); + rrdeng_enq_cmd(wc, &cmd); + break; + } + /* + * uv_async_send after uv_close does not seem to crash in linux at the moment, + * it is however undocumented behaviour and we need to be aware if this becomes + * an issue in the future. + */ + uv_close((uv_handle_t *)&wc->async, NULL); + assert(0 == uv_timer_stop(&timer_req)); + uv_close((uv_handle_t *)&timer_req, NULL); + info("Shutting down RRD engine event loop."); + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all commited pages. */ + } + break; + case RRDENG_READ_PAGE: + do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0); + break; + case RRDENG_READ_EXTENT: + do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1); + break; + case RRDENG_COMMIT_PAGE: + do_commit_transaction(wc, STORE_DATA, NULL); + break; + case RRDENG_FLUSH_PAGES: { + unsigned total_bytes, bytes_written; + + /* First I/O should be enough to call completion */ + bytes_written = do_flush_pages(wc, 1, cmd.completion); + for (total_bytes = bytes_written ; + bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ; + total_bytes += bytes_written) { + bytes_written = do_flush_pages(wc, 1, NULL); + } + break; + } + default: + debug(D_RRDENGINE, "%s: default.", __func__); + break; + } + } while (opcode != RRDENG_NOOP); + } + /* cleanup operations of the event loop */ + wal_flush_transaction_buffer(wc); + uv_run(loop, UV_RUN_DEFAULT); + + info("Shutting down RRD engine event loop complete."); + /* TODO: don't let the API block by waiting to enqueue commands */ + uv_cond_destroy(&wc->cmd_cond); +/* uv_mutex_destroy(&wc->cmd_mutex); */ + assert(0 == uv_loop_close(loop)); + free(loop); +} + + +#define NR_PAGES (256) +static void basic_functional_test(struct rrdengine_instance *ctx) +{ + int i, j, failed_validations; + uuid_t uuid[NR_PAGES]; + void *buf; + struct rrdeng_page_cache_descr *handle[NR_PAGES]; + char uuid_str[37]; + char backup[NR_PAGES][37 * 100]; /* backup storage for page data verification */ + + for (i = 0 ; i < NR_PAGES ; ++i) { + uuid_generate(uuid[i]); + uuid_unparse_lower(uuid[i], uuid_str); +// fprintf(stderr, "Generated uuid[%d]=%s\n", i, uuid_str); + buf = rrdeng_create_page(&uuid[i], &handle[i]); + /* Each page contains 10 times its own UUID stringified */ + for (j = 0 ; j < 100 ; ++j) { + strcpy(buf + 37 * j, uuid_str); + strcpy(backup[i] + 37 * j, uuid_str); + } + rrdeng_commit_page(ctx, handle[i], (Word_t)i); + } + fprintf(stderr, "\n********** CREATED %d METRIC PAGES ***********\n\n", NR_PAGES); + failed_validations = 0; + for (i = 0 ; i < NR_PAGES ; ++i) { + buf = rrdeng_get_latest_page(ctx, &uuid[i], (void **)&handle[i]); + if (NULL == buf) { + ++failed_validations; + fprintf(stderr, "Page %d was LOST.\n", i); + } + if (memcmp(backup[i], buf, 37 * 100)) { + ++failed_validations; + fprintf(stderr, "Page %d data comparison with backup FAILED validation.\n", i); + } + rrdeng_put_page(ctx, handle[i]); + } + fprintf(stderr, "\n********** CORRECTLY VALIDATED %d/%d METRIC PAGES ***********\n\n", + NR_PAGES - failed_validations, NR_PAGES); + +} +/* C entry point for development purposes + * make "LDFLAGS=-errdengine_main" + */ +void rrdengine_main(void) +{ + int ret; + struct rrdengine_instance *ctx; + + ret = rrdeng_init(&ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB); + if (ret) { + exit(ret); + } + basic_functional_test(ctx); + + rrdeng_exit(ctx); + fprintf(stderr, "Hello world!"); + exit(0); +}
\ No newline at end of file |