diff options
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r-- | database/engine/rrdengine.c | 587 |
1 files changed, 499 insertions, 88 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index b6b6548e..43135ff0 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -6,9 +6,10 @@ rrdeng_stats_t global_io_errors = 0; rrdeng_stats_t global_fs_errors = 0; rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; -rrdeng_stats_t global_flushing_errors = 0; +rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0; +rrdeng_stats_t global_flushing_pressure_page_deletions = 0; -void sanity_check(void) +static void sanity_check(void) { /* Magic numbers must fit in the super-blocks */ BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ); @@ -26,10 +27,188 @@ void sanity_check(void) /* page count must fit in 8 bits */ BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); + /* extent cache count must fit in 32 bits */ + BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32); + /* page info scratch space must be able to hold 2 32-bit integers */ BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t)); } +/* always inserts into tail */ +static inline void xt_cache_replaceQ_insert(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + + xt_cache_elem->prev = NULL; + xt_cache_elem->next = NULL; + + if (likely(NULL != xt_cache->replaceQ_tail)) { + xt_cache_elem->prev = xt_cache->replaceQ_tail; + xt_cache->replaceQ_tail->next = xt_cache_elem; + } + if (unlikely(NULL == xt_cache->replaceQ_head)) { + xt_cache->replaceQ_head = xt_cache_elem; + } + xt_cache->replaceQ_tail = xt_cache_elem; +} + +static inline void xt_cache_replaceQ_delete(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *prev, *next; + + prev = xt_cache_elem->prev; + next = xt_cache_elem->next; + + if (likely(NULL != prev)) { + prev->next = next; + } + if (likely(NULL != next)) { + next->prev = prev; + } + if (unlikely(xt_cache_elem == xt_cache->replaceQ_head)) { + xt_cache->replaceQ_head = next; + } + if (unlikely(xt_cache_elem == xt_cache->replaceQ_tail)) { + xt_cache->replaceQ_tail = prev; + } + xt_cache_elem->prev = xt_cache_elem->next = NULL; +} + +static inline void xt_cache_replaceQ_set_hot(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + xt_cache_replaceQ_delete(wc, xt_cache_elem); + xt_cache_replaceQ_insert(wc, xt_cache_elem); +} + +/* Returns the index of the cached extent if it was successfully inserted in the extent cache, otherwise -1 */ +static int try_insert_into_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + unsigned idx; + int ret; + + ret = find_first_zero(xt_cache->allocation_bitmap); + if (-1 == ret || ret >= MAX_CACHED_EXTENTS) { + for (xt_cache_elem = xt_cache->replaceQ_head ; NULL != xt_cache_elem ; xt_cache_elem = xt_cache_elem->next) { + idx = xt_cache_elem - xt_cache->extent_array; + if (!check_bit(xt_cache->inflight_bitmap, idx)) { + xt_cache_replaceQ_delete(wc, xt_cache_elem); + break; + } + } + if (NULL == xt_cache_elem) + return -1; + } else { + idx = (unsigned)ret; + xt_cache_elem = &xt_cache->extent_array[idx]; + } + xt_cache_elem->extent = extent; + xt_cache_elem->fileno = extent->datafile->fileno; + xt_cache_elem->inflight_io_descr = NULL; + xt_cache_replaceQ_insert(wc, xt_cache_elem); + modify_bit(&xt_cache->allocation_bitmap, idx, 1); + + return (int)idx; +} + +/** + * Returns 0 if the cached extent was found in the extent cache, 1 otherwise. + * Sets *idx to point to the position of the extent inside the cache. + **/ +static uint8_t lookup_in_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent, unsigned *idx) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + unsigned i; + + for (i = 0 ; i < MAX_CACHED_EXTENTS ; ++i) { + xt_cache_elem = &xt_cache->extent_array[i]; + if (check_bit(xt_cache->allocation_bitmap, i) && xt_cache_elem->extent == extent && + xt_cache_elem->fileno == extent->datafile->fileno) { + *idx = i; + return 0; + } + } + return 1; +} + +#if 0 /* disabled code */ +static void delete_from_xt_cache(struct rrdengine_worker_config* wc, unsigned idx) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + + xt_cache_elem = &xt_cache->extent_array[idx]; + xt_cache_replaceQ_delete(wc, xt_cache_elem); + xt_cache_elem->extent = NULL; + modify_bit(&wc->xt_cache.allocation_bitmap, idx, 0); /* invalidate it */ + modify_bit(&wc->xt_cache.inflight_bitmap, idx, 0); /* not in-flight anymore */ +} +#endif + +void enqueue_inflight_read_to_xt_cache(struct rrdengine_worker_config* wc, unsigned idx, + struct extent_io_descriptor *xt_io_descr) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + struct extent_io_descriptor *old_next; + + xt_cache_elem = &xt_cache->extent_array[idx]; + old_next = xt_cache_elem->inflight_io_descr->next; + xt_cache_elem->inflight_io_descr->next = xt_io_descr; + xt_io_descr->next = old_next; +} + +void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, struct extent_io_descriptor *xt_io_descr) +{ + unsigned i, j, page_offset; + struct rrdengine_instance *ctx = wc->ctx; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + void *page; + struct extent_info *extent = xt_io_descr->descr_array[0]->extent; + + for (i = 0 ; i < xt_io_descr->descr_count; ++i) { + page = mallocz(RRDENG_BLOCK_SIZE); + descr = xt_io_descr->descr_array[i]; + for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) { + /* care, we don't hold the descriptor mutex */ + if (!uuid_compare(*extent->pages[j]->id, *descr->id) && + extent->pages[j]->page_length == descr->page_length && + extent->pages[j]->start_time == descr->start_time && + extent->pages[j]->end_time == descr->end_time) { + break; + } + page_offset += extent->pages[j]->page_length; + + } + /* care, we don't hold the descriptor mutex */ + (void) memcpy(page, wc->xt_cache.extent_array[idx].pages + page_offset, descr->page_length); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags |= RRD_PAGE_POPULATED; + pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_unlock(ctx, descr); + pg_cache_replaceQ_insert(ctx, descr); + if (xt_io_descr->release_descr) { + pg_cache_put(ctx, descr); + } else { + debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + pg_cache_wake_up_waiters(ctx, descr); + } + } + if (xt_io_descr->completion) + complete(xt_io_descr->completion); + freez(xt_io_descr); +} + void read_extent_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; @@ -40,7 +219,7 @@ void read_extent_cb(uv_fs_t* req) int ret; unsigned i, j, count; void *page, *uncompressed_buf = NULL; - uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length; + uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length = 0; uint8_t have_read_error = 0; /* persistent structures */ struct rrdeng_df_extent_header *header; @@ -98,6 +277,33 @@ after_crc_check: debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret); /* care, we don't hold the descriptor mutex */ } + { + uint8_t xt_is_cached = 0; + unsigned xt_idx; + struct extent_info *extent = xt_io_descr->descr_array[0]->extent; + + xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); + if (xt_is_cached && check_bit(wc->xt_cache.inflight_bitmap, xt_idx)) { + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem = &xt_cache->extent_array[xt_idx]; + struct extent_io_descriptor *curr, *next; + + if (have_read_error) { + memset(xt_cache_elem->pages, 0, sizeof(xt_cache_elem->pages)); + } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { + (void)memcpy(xt_cache_elem->pages, xt_io_descr->buf + payload_offset, payload_length); + } else { + (void)memcpy(xt_cache_elem->pages, uncompressed_buf, uncompressed_payload_length); + } + /* complete all connected in-flight read requests */ + for (curr = xt_cache_elem->inflight_io_descr->next ; curr ; curr = next) { + next = curr->next; + read_cached_extent_cb(wc, xt_idx, curr); + } + xt_cache_elem->inflight_io_descr = NULL; + modify_bit(&xt_cache->inflight_bitmap, xt_idx, 0); /* not in-flight anymore */ + } + } for (i = 0 ; i < xt_io_descr->descr_count; ++i) { page = mallocz(RRDENG_BLOCK_SIZE); @@ -121,19 +327,19 @@ after_crc_check: } else { (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length); } - pg_cache_replaceQ_insert(ctx, descr); rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; pg_cache_descr->page = page; pg_cache_descr->flags |= RRD_PAGE_POPULATED; pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; - debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + rrdeng_page_descr_mutex_unlock(ctx, descr); + pg_cache_replaceQ_insert(ctx, descr); if (xt_io_descr->release_descr) { - pg_cache_put_unsafe(descr); + pg_cache_put(ctx, descr); } else { - pg_cache_wake_up_waiters_unsafe(descr); + debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + pg_cache_wake_up_waiters(ctx, descr); } - rrdeng_page_descr_mutex_unlock(ctx, descr); } if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { freez(uncompressed_buf); @@ -158,18 +364,15 @@ static void do_read_extent(struct rrdengine_worker_config* wc, // uint32_t payload_length; struct extent_io_descriptor *xt_io_descr; struct rrdengine_datafile *datafile; + struct extent_info *extent = descr[0]->extent; + uint8_t xt_is_cached = 0, xt_is_inflight = 0; + unsigned xt_idx; - datafile = descr[0]->extent->datafile; - pos = descr[0]->extent->offset; - size_bytes = descr[0]->extent->size; + datafile = extent->datafile; + pos = extent->offset; + size_bytes = extent->size; - xt_io_descr = mallocz(sizeof(*xt_io_descr)); - ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); - if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); - /* freez(xt_io_descr); - return;*/ - } + xt_io_descr = callocz(1, sizeof(*xt_io_descr)); for (i = 0 ; i < count; ++i) { rrdeng_page_descr_mutex_lock(ctx, descr[i]); pg_cache_descr = descr[i]->pg_cache_descr; @@ -187,10 +390,34 @@ static void do_read_extent(struct rrdengine_worker_config* wc, /* xt_io_descr->descr_commit_idx_array[0] */ xt_io_descr->release_descr = release_descr; + xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); + if (xt_is_cached) { + xt_cache_replaceQ_set_hot(wc, &wc->xt_cache.extent_array[xt_idx]); + xt_is_inflight = check_bit(wc->xt_cache.inflight_bitmap, xt_idx); + if (xt_is_inflight) { + enqueue_inflight_read_to_xt_cache(wc, xt_idx, xt_io_descr); + return; + } + return read_cached_extent_cb(wc, xt_idx, xt_io_descr); + } else { + ret = try_insert_into_xt_cache(wc, extent); + if (-1 != ret) { + xt_idx = (unsigned)ret; + modify_bit(&wc->xt_cache.inflight_bitmap, xt_idx, 1); + wc->xt_cache.extent_array[xt_idx].inflight_io_descr = xt_io_descr; + } + } + + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + /* freez(xt_io_descr); + return;*/ + } real_io_size = ALIGN_BYTES_CEILING(size_bytes); xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb); - assert (-1 != ret); + fatal_assert(-1 != ret); ctx->stats.io_read_bytes += real_io_size; ++ctx->stats.io_read_requests; ctx->stats.io_read_extent_bytes += real_io_size; @@ -243,11 +470,117 @@ static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t ty commit_data_extent(wc, (struct extent_io_descriptor *)data); break; default: - assert(type == STORE_DATA); + fatal_assert(type == STORE_DATA); break; } } +static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc) +{ + int error; + + error = uv_thread_join(wc->now_invalidating_dirty_pages); + if (error) { + error("uv_thread_join(): %s", uv_strerror(error)); + } + freez(wc->now_invalidating_dirty_pages); + wc->now_invalidating_dirty_pages = NULL; + wc->cleanup_thread_invalidating_dirty_pages = 0; +} + +static void invalidate_oldest_committed(void *arg) +{ + struct rrdengine_instance *ctx = arg; + struct rrdengine_worker_config *wc = &ctx->worker_config; + struct page_cache *pg_cache = &ctx->pg_cache; + int ret; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + Pvoid_t *PValue; + Word_t Index; + unsigned nr_committed_pages; + + do { + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + for (Index = 0, + PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue; + + descr != NULL; + + PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + fatal_assert(0 != descr->page_length); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) { + rrdeng_page_descr_mutex_unlock(ctx, descr); + + ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); + fatal_assert(1 == ret); + break; + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + } + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + + if (!descr) { + info("Failed to invalidate any dirty pages to relieve page cache pressure."); + + goto out; + } + pg_cache_punch_hole(ctx, descr, 1, 1, NULL); + + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1); + rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1); + + } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)); +out: + wc->cleanup_thread_invalidating_dirty_pages = 1; + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&wc->async)); +} + +void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned nr_committed_pages; + int error; + + if (unlikely(ctx->quiesce != NO_QUIESCE)) /* Shutting down */ + return; + + uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); + + if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) { + /* delete the oldest page in memory */ + if (wc->now_invalidating_dirty_pages) { + /* already deleting a page */ + return; + } + errno = 0; + error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". " + "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path); + + wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages)); + wc->cleanup_thread_invalidating_dirty_pages = 0; + + error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + freez(wc->now_invalidating_dirty_pages); + wc->now_invalidating_dirty_pages = NULL; + } + } +} + void flush_pages_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; @@ -294,6 +627,7 @@ void flush_pages_cb(uv_fs_t* req) uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); pg_cache->committed_page_index.nr_committed_pages -= count; uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + wc->inflight_dirty_pages -= count; } /* @@ -338,7 +672,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct descr = unlikely(NULL == PValue) ? NULL : *PValue) { uint8_t page_write_pending; - assert(0 != descr->page_length); + fatal_assert(0 != descr->page_length); page_write_pending = 0; rrdeng_page_descr_mutex_lock(ctx, descr); @@ -355,7 +689,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct if (page_write_pending) { ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); - assert(1 == ret); + fatal_assert(1 == ret); } } uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); @@ -366,6 +700,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct complete(completion); return 0; } + wc->inflight_dirty_pages += count; + xt_io_descr = mallocz(sizeof(*xt_io_descr)); payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); switch (compression_algorithm) { @@ -373,7 +709,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); break; default: /* Compress */ - assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); + fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); max_compressed_size = LZ4_compressBound(uncompressed_payload_length); compressed_buf = mallocz(max_compressed_size); size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); @@ -453,7 +789,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct real_io_size = ALIGN_BYTES_CEILING(size_bytes); xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb); - assert (-1 != ret); + fatal_assert(-1 != ret); ctx->stats.io_write_bytes += real_io_size; ++ctx->stats.io_write_requests; ctx->stats.io_write_extent_bytes += real_io_size; @@ -466,17 +802,15 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct return ALIGN_BYTES_CEILING(size_bytes); } -static void after_delete_old_data(uv_work_t *req, int status) +static void after_delete_old_data(struct rrdengine_worker_config* wc) { - struct rrdengine_instance *ctx = req->data; - struct rrdengine_worker_config* wc = &ctx->worker_config; + struct rrdengine_instance *ctx = wc->ctx; struct rrdengine_datafile *datafile; struct rrdengine_journalfile *journalfile; unsigned deleted_bytes, journalfile_bytes, datafile_bytes; - int ret; + int ret, error; char path[RRDENG_PATH_MAX]; - (void)status; datafile = ctx->datafiles.first; journalfile = datafile->journalfile; datafile_bytes = datafile->pos; @@ -503,19 +837,30 @@ static void after_delete_old_data(uv_work_t *req, int status) ctx->disk_space -= deleted_bytes; info("Reclaimed %u bytes of disk space.", deleted_bytes); + error = uv_thread_join(wc->now_deleting_files); + if (error) { + error("uv_thread_join(): %s", uv_strerror(error)); + } + freez(wc->now_deleting_files); /* unfreeze command processing */ - wc->now_deleting.data = NULL; - /* wake up event loop */ - assert(0 == uv_async_send(&wc->async)); + wc->now_deleting_files = NULL; + + wc->cleanup_thread_deleting_files = 0; + + /* interrupt event loop */ + uv_stop(wc->loop); } -static void delete_old_data(uv_work_t *req) +static void delete_old_data(void *arg) { - struct rrdengine_instance *ctx = req->data; + struct rrdengine_instance *ctx = arg; + struct rrdengine_worker_config* wc = &ctx->worker_config; struct rrdengine_datafile *datafile; struct extent_info *extent, *next; struct rrdeng_page_descr *descr; unsigned count, i; + uint8_t can_delete_metric; + uuid_t metric_id; /* Safe to use since it will be deleted after we are done */ datafile = ctx->datafiles.first; @@ -524,11 +869,21 @@ static void delete_old_data(uv_work_t *req) count = extent->number_of_pages; for (i = 0 ; i < count ; ++i) { descr = extent->pages[i]; - pg_cache_punch_hole(ctx, descr, 0); + can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id); + if (unlikely(can_delete_metric && ctx->metalog_ctx->initialized)) { + /* + * If the metric is empty, has no active writers and if the metadata log has been initialized then + * attempt to delete the corresponding netdata dimension. + */ + metalog_delete_dimension_by_uuid(ctx->metalog_ctx, &metric_id); + } } next = extent->next; freez(extent); } + wc->cleanup_thread_deleting_files = 1; + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&wc->async)); } void rrdeng_test_quota(struct rrdengine_worker_config* wc) @@ -537,10 +892,11 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) struct rrdengine_datafile *datafile; unsigned current_size, target_size; uint8_t out_of_space, only_one_datafile; - int ret; + int ret, error; out_of_space = 0; - if (unlikely(ctx->disk_space > ctx->max_disk_space)) { + /* Do not allow the pinned pages to exceed the disk space quota to avoid deadlocks */ + if (unlikely(ctx->disk_space > MAX(ctx->max_disk_space, 2 * ctx->metric_API_max_producers * RRDENG_BLOCK_SIZE))) { out_of_space = 1; } datafile = ctx->datafiles.last; @@ -557,9 +913,9 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) ++ctx->last_fileno; } } - if (unlikely(out_of_space)) { + if (unlikely(out_of_space && NO_QUIESCE == ctx->quiesce)) { /* delete old data */ - if (wc->now_deleting.data) { + if (wc->now_deleting_files) { /* already deleting data */ return; } @@ -571,8 +927,39 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) } info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); - wc->now_deleting.data = ctx; - assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data)); + wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files)); + wc->cleanup_thread_deleting_files = 0; + + error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + freez(wc->now_deleting_files); + wc->now_deleting_files = NULL; + } + } +} + +static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc) +{ + if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) { + return 1; + } + return 0; +} + +static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + + if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) { + after_invalidate_oldest_committed(wc); + } + if (unlikely(wc->cleanup_thread_deleting_files)) { + after_delete_old_data(wc); + } + if (unlikely(SET_QUIESCE == ctx->quiesce && !rrdeng_threads_alive(wc))) { + ctx->quiesce = QUIESCED; + complete(&ctx->rrdengine_completion); } } @@ -591,8 +978,8 @@ void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc) { wc->cmd_queue.head = wc->cmd_queue.tail = 0; wc->queue_size = 0; - assert(0 == uv_cond_init(&wc->cmd_cond)); - assert(0 == uv_mutex_init(&wc->cmd_mutex)); + fatal_assert(0 == uv_cond_init(&wc->cmd_cond)); + fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); } void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) @@ -604,7 +991,7 @@ void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) { uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); } - assert(queue_size < RRDENG_CMD_Q_MAX_SIZE); + fatal_assert(queue_size < RRDENG_CMD_Q_MAX_SIZE); /* enqueue command */ wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ? @@ -613,7 +1000,7 @@ void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) uv_mutex_unlock(&wc->cmd_mutex); /* wake up event loop */ - assert(0 == uv_async_send(&wc->async)); + fatal_assert(0 == uv_async_send(&wc->async)); } struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc) @@ -657,39 +1044,44 @@ void async_cb(uv_async_t *handle) void timer_cb(uv_timer_t* handle) { struct rrdengine_worker_config* wc = handle->data; + struct rrdengine_instance *ctx = wc->ctx; uv_stop(handle->loop); uv_update_time(handle->loop); + if (unlikely(!ctx->metalog_ctx->initialized)) + return; /* Wait for the metadata log to initialize */ rrdeng_test_quota(wc); debug(D_RRDENGINE, "%s: timeout reached.", __func__); - if (likely(!wc->now_deleting.data)) { - /* There is free space so we can write to disk */ - struct rrdengine_instance *ctx = wc->ctx; + if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) { + /* There is free space so we can write to disk and we are not actively deleting dirty buffers */ struct page_cache *pg_cache = &ctx->pg_cache; unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark, high_watermark; - uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; - uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); - producers = ctx->stats.metric_API_producers; + uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); + producers = ctx->metric_API_max_producers; /* are flushable pages more than 25% of the maximum page cache size */ high_watermark = (ctx->max_cache_pages * 25LLU) / 100; low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */ - if (nr_committed_pages > producers && - /* committed to be written pages are more than the produced number */ - nr_committed_pages - producers > high_watermark) { - /* Flushing speed must increase to stop page cache from filling with dirty pages */ - bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE; - } - bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write); + /* Flush more pages only if disk can keep up */ + if (wc->inflight_dirty_pages < high_watermark + producers) { + if (nr_committed_pages > producers && + /* committed to be written pages are more than the produced number */ + nr_committed_pages - producers > high_watermark) { + /* Flushing speed must increase to stop page cache from filling with dirty pages */ + bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE; + } + bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write); - debug(D_RRDENGINE, "Flushing pages to disk."); - for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ; - bytes_written && (total_bytes < bytes_to_write) ; - total_bytes += bytes_written) { - bytes_written = do_flush_pages(wc, 0, NULL); + debug(D_RRDENGINE, "Flushing pages to disk."); + for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL); + bytes_written && (total_bytes < bytes_to_write); + total_bytes += bytes_written) { + bytes_written = do_flush_pages(wc, 0, NULL); + } } } #ifdef NETDATA_INTERNAL_CHECKS @@ -730,7 +1122,12 @@ void rrdeng_worker(void* arg) } wc->async.data = wc; - wc->now_deleting.data = NULL; + wc->now_deleting_files = NULL; + wc->cleanup_thread_deleting_files = 0; + + wc->now_invalidating_dirty_pages = NULL; + wc->cleanup_thread_invalidating_dirty_pages = 0; + wc->inflight_dirty_pages = 0; /* dirty page flushing timer */ ret = uv_timer_init(loop, &timer_req); @@ -744,10 +1141,11 @@ void rrdeng_worker(void* arg) /* wake up initialization thread */ complete(&ctx->rrdengine_completion); - assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); + fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); shutdown = 0; - while (shutdown == 0 || uv_loop_alive(loop)) { + while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) { uv_run(loop, UV_RUN_DEFAULT); + rrdeng_cleanup_finished_threads(wc); /* wait for commands */ cmd_batch_size = 0; @@ -769,14 +1167,20 @@ void rrdeng_worker(void* arg) break; case RRDENG_SHUTDOWN: shutdown = 1; - /* - * uv_async_send after uv_close does not seem to crash in linux at the moment, - * it is however undocumented behaviour and we need to be aware if this becomes - * an issue in the future. - */ - uv_close((uv_handle_t *)&wc->async, NULL); - assert(0 == uv_timer_stop(&timer_req)); + break; + case RRDENG_QUIESCE: + ctx->drop_metrics_under_page_cache_pressure = 0; + ctx->quiesce = SET_QUIESCE; + fatal_assert(0 == uv_timer_stop(&timer_req)); uv_close((uv_handle_t *)&timer_req, NULL); + while (do_flush_pages(wc, 1, NULL)) { + ; /* Force flushing of all committed pages. */ + } + wal_flush_transaction_buffer(wc); + if (!rrdeng_threads_alive(wc)) { + ctx->quiesce = QUIESCED; + complete(&ctx->rrdengine_completion); + } break; case RRDENG_READ_PAGE: do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0); @@ -788,16 +1192,16 @@ void rrdeng_worker(void* arg) do_commit_transaction(wc, STORE_DATA, NULL); break; case RRDENG_FLUSH_PAGES: { - unsigned bytes_written; - - /* First I/O should be enough to call completion */ - bytes_written = do_flush_pages(wc, 1, cmd.completion); - if (bytes_written) { - while (do_flush_pages(wc, 1, NULL) && likely(!wc->now_deleting.data)) { - ; /* Force flushing of all committed pages if there is free space. */ - } + if (wc->now_invalidating_dirty_pages) { + /* Do not flush if the disk cannot keep up */ + complete(cmd.completion); + } else { + (void)do_flush_pages(wc, 1, cmd.completion); } break; + case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE: + rrdeng_invalidate_oldest_committed(wc); + break; } default: debug(D_RRDENGINE, "%s: default.", __func__); @@ -805,11 +1209,17 @@ void rrdeng_worker(void* arg) } } while (opcode != RRDENG_NOOP); } + /* cleanup operations of the event loop */ - if (unlikely(wc->now_deleting.data)) { - info("Postponing shutting RRD engine event loop down until after datafile deletion is finished."); - } info("Shutting down RRD engine event loop."); + + /* + * uv_async_send after uv_close does not seem to crash in linux at the moment, + * it is however undocumented behaviour and we need to be aware if this becomes + * an issue in the future. + */ + uv_close((uv_handle_t *)&wc->async, NULL); + while (do_flush_pages(wc, 1, NULL)) { ; /* Force flushing of all committed pages. */ } @@ -820,7 +1230,7 @@ void rrdeng_worker(void* arg) /* TODO: don't let the API block by waiting to enqueue commands */ uv_cond_destroy(&wc->cmd_cond); /* uv_mutex_destroy(&wc->cmd_mutex); */ - assert(0 == uv_loop_close(loop)); + fatal_assert(0 == uv_loop_close(loop)); freez(loop); return; @@ -828,7 +1238,7 @@ void rrdeng_worker(void* arg) error_after_timer_init: uv_close((uv_handle_t *)&wc->async, NULL); error_after_async_init: - assert(0 == uv_loop_close(loop)); + fatal_assert(0 == uv_loop_close(loop)); error_after_loop_init: freez(loop); @@ -845,11 +1255,12 @@ void rrdengine_main(void) int ret; struct rrdengine_instance *ctx; - ret = rrdeng_init(&ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB); + sanity_check(); + ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB); if (ret) { exit(ret); } rrdeng_exit(ctx); fprintf(stderr, "Hello world!"); exit(0); -}
\ No newline at end of file +} |