summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c587
1 files changed, 499 insertions, 88 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index b6b6548ec..43135ff01 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -6,9 +6,10 @@
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
-rrdeng_stats_t global_flushing_errors = 0;
+rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
+rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
-void sanity_check(void)
+static void sanity_check(void)
{
/* Magic numbers must fit in the super-blocks */
BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
@@ -26,10 +27,188 @@ void sanity_check(void)
/* page count must fit in 8 bits */
BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
+ /* extent cache count must fit in 32 bits */
+ BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32);
+
/* page info scratch space must be able to hold 2 32-bit integers */
BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t));
}
+/* always inserts into tail */
+static inline void xt_cache_replaceQ_insert(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+
+ xt_cache_elem->prev = NULL;
+ xt_cache_elem->next = NULL;
+
+ if (likely(NULL != xt_cache->replaceQ_tail)) {
+ xt_cache_elem->prev = xt_cache->replaceQ_tail;
+ xt_cache->replaceQ_tail->next = xt_cache_elem;
+ }
+ if (unlikely(NULL == xt_cache->replaceQ_head)) {
+ xt_cache->replaceQ_head = xt_cache_elem;
+ }
+ xt_cache->replaceQ_tail = xt_cache_elem;
+}
+
+static inline void xt_cache_replaceQ_delete(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *prev, *next;
+
+ prev = xt_cache_elem->prev;
+ next = xt_cache_elem->next;
+
+ if (likely(NULL != prev)) {
+ prev->next = next;
+ }
+ if (likely(NULL != next)) {
+ next->prev = prev;
+ }
+ if (unlikely(xt_cache_elem == xt_cache->replaceQ_head)) {
+ xt_cache->replaceQ_head = next;
+ }
+ if (unlikely(xt_cache_elem == xt_cache->replaceQ_tail)) {
+ xt_cache->replaceQ_tail = prev;
+ }
+ xt_cache_elem->prev = xt_cache_elem->next = NULL;
+}
+
+static inline void xt_cache_replaceQ_set_hot(struct rrdengine_worker_config* wc,
+ struct extent_cache_element *xt_cache_elem)
+{
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ xt_cache_replaceQ_insert(wc, xt_cache_elem);
+}
+
+/* Returns the index of the cached extent if it was successfully inserted in the extent cache, otherwise -1 */
+static int try_insert_into_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ unsigned idx;
+ int ret;
+
+ ret = find_first_zero(xt_cache->allocation_bitmap);
+ if (-1 == ret || ret >= MAX_CACHED_EXTENTS) {
+ for (xt_cache_elem = xt_cache->replaceQ_head ; NULL != xt_cache_elem ; xt_cache_elem = xt_cache_elem->next) {
+ idx = xt_cache_elem - xt_cache->extent_array;
+ if (!check_bit(xt_cache->inflight_bitmap, idx)) {
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ break;
+ }
+ }
+ if (NULL == xt_cache_elem)
+ return -1;
+ } else {
+ idx = (unsigned)ret;
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ }
+ xt_cache_elem->extent = extent;
+ xt_cache_elem->fileno = extent->datafile->fileno;
+ xt_cache_elem->inflight_io_descr = NULL;
+ xt_cache_replaceQ_insert(wc, xt_cache_elem);
+ modify_bit(&xt_cache->allocation_bitmap, idx, 1);
+
+ return (int)idx;
+}
+
+/**
+ * Returns 0 if the cached extent was found in the extent cache, 1 otherwise.
+ * Sets *idx to point to the position of the extent inside the cache.
+ **/
+static uint8_t lookup_in_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent, unsigned *idx)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ unsigned i;
+
+ for (i = 0 ; i < MAX_CACHED_EXTENTS ; ++i) {
+ xt_cache_elem = &xt_cache->extent_array[i];
+ if (check_bit(xt_cache->allocation_bitmap, i) && xt_cache_elem->extent == extent &&
+ xt_cache_elem->fileno == extent->datafile->fileno) {
+ *idx = i;
+ return 0;
+ }
+ }
+ return 1;
+}
+
+#if 0 /* disabled code */
+static void delete_from_xt_cache(struct rrdengine_worker_config* wc, unsigned idx)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ xt_cache_replaceQ_delete(wc, xt_cache_elem);
+ xt_cache_elem->extent = NULL;
+ modify_bit(&wc->xt_cache.allocation_bitmap, idx, 0); /* invalidate it */
+ modify_bit(&wc->xt_cache.inflight_bitmap, idx, 0); /* not in-flight anymore */
+}
+#endif
+
+void enqueue_inflight_read_to_xt_cache(struct rrdengine_worker_config* wc, unsigned idx,
+ struct extent_io_descriptor *xt_io_descr)
+{
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem;
+ struct extent_io_descriptor *old_next;
+
+ xt_cache_elem = &xt_cache->extent_array[idx];
+ old_next = xt_cache_elem->inflight_io_descr->next;
+ xt_cache_elem->inflight_io_descr->next = xt_io_descr;
+ xt_io_descr->next = old_next;
+}
+
+void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, struct extent_io_descriptor *xt_io_descr)
+{
+ unsigned i, j, page_offset;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ void *page;
+ struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
+
+ for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
+ page = mallocz(RRDENG_BLOCK_SIZE);
+ descr = xt_io_descr->descr_array[i];
+ for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) {
+ /* care, we don't hold the descriptor mutex */
+ if (!uuid_compare(*extent->pages[j]->id, *descr->id) &&
+ extent->pages[j]->page_length == descr->page_length &&
+ extent->pages[j]->start_time == descr->start_time &&
+ extent->pages[j]->end_time == descr->end_time) {
+ break;
+ }
+ page_offset += extent->pages[j]->page_length;
+
+ }
+ /* care, we don't hold the descriptor mutex */
+ (void) memcpy(page, wc->xt_cache.extent_array[idx].pages + page_offset, descr->page_length);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags |= RRD_PAGE_POPULATED;
+ pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ pg_cache_replaceQ_insert(ctx, descr);
+ if (xt_io_descr->release_descr) {
+ pg_cache_put(ctx, descr);
+ } else {
+ debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ pg_cache_wake_up_waiters(ctx, descr);
+ }
+ }
+ if (xt_io_descr->completion)
+ complete(xt_io_descr->completion);
+ freez(xt_io_descr);
+}
+
void read_extent_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
@@ -40,7 +219,7 @@ void read_extent_cb(uv_fs_t* req)
int ret;
unsigned i, j, count;
void *page, *uncompressed_buf = NULL;
- uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length;
+ uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length = 0;
uint8_t have_read_error = 0;
/* persistent structures */
struct rrdeng_df_extent_header *header;
@@ -98,6 +277,33 @@ after_crc_check:
debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret);
/* care, we don't hold the descriptor mutex */
}
+ {
+ uint8_t xt_is_cached = 0;
+ unsigned xt_idx;
+ struct extent_info *extent = xt_io_descr->descr_array[0]->extent;
+
+ xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
+ if (xt_is_cached && check_bit(wc->xt_cache.inflight_bitmap, xt_idx)) {
+ struct extent_cache *xt_cache = &wc->xt_cache;
+ struct extent_cache_element *xt_cache_elem = &xt_cache->extent_array[xt_idx];
+ struct extent_io_descriptor *curr, *next;
+
+ if (have_read_error) {
+ memset(xt_cache_elem->pages, 0, sizeof(xt_cache_elem->pages));
+ } else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ (void)memcpy(xt_cache_elem->pages, xt_io_descr->buf + payload_offset, payload_length);
+ } else {
+ (void)memcpy(xt_cache_elem->pages, uncompressed_buf, uncompressed_payload_length);
+ }
+ /* complete all connected in-flight read requests */
+ for (curr = xt_cache_elem->inflight_io_descr->next ; curr ; curr = next) {
+ next = curr->next;
+ read_cached_extent_cb(wc, xt_idx, curr);
+ }
+ xt_cache_elem->inflight_io_descr = NULL;
+ modify_bit(&xt_cache->inflight_bitmap, xt_idx, 0); /* not in-flight anymore */
+ }
+ }
for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
page = mallocz(RRDENG_BLOCK_SIZE);
@@ -121,19 +327,19 @@ after_crc_check:
} else {
(void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
}
- pg_cache_replaceQ_insert(ctx, descr);
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
pg_cache_descr->page = page;
pg_cache_descr->flags |= RRD_PAGE_POPULATED;
pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
- debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ pg_cache_replaceQ_insert(ctx, descr);
if (xt_io_descr->release_descr) {
- pg_cache_put_unsafe(descr);
+ pg_cache_put(ctx, descr);
} else {
- pg_cache_wake_up_waiters_unsafe(descr);
+ debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ pg_cache_wake_up_waiters(ctx, descr);
}
- rrdeng_page_descr_mutex_unlock(ctx, descr);
}
if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
freez(uncompressed_buf);
@@ -158,18 +364,15 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
// uint32_t payload_length;
struct extent_io_descriptor *xt_io_descr;
struct rrdengine_datafile *datafile;
+ struct extent_info *extent = descr[0]->extent;
+ uint8_t xt_is_cached = 0, xt_is_inflight = 0;
+ unsigned xt_idx;
- datafile = descr[0]->extent->datafile;
- pos = descr[0]->extent->offset;
- size_bytes = descr[0]->extent->size;
+ datafile = extent->datafile;
+ pos = extent->offset;
+ size_bytes = extent->size;
- xt_io_descr = mallocz(sizeof(*xt_io_descr));
- ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
- if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
- /* freez(xt_io_descr);
- return;*/
- }
+ xt_io_descr = callocz(1, sizeof(*xt_io_descr));
for (i = 0 ; i < count; ++i) {
rrdeng_page_descr_mutex_lock(ctx, descr[i]);
pg_cache_descr = descr[i]->pg_cache_descr;
@@ -187,10 +390,34 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
/* xt_io_descr->descr_commit_idx_array[0] */
xt_io_descr->release_descr = release_descr;
+ xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
+ if (xt_is_cached) {
+ xt_cache_replaceQ_set_hot(wc, &wc->xt_cache.extent_array[xt_idx]);
+ xt_is_inflight = check_bit(wc->xt_cache.inflight_bitmap, xt_idx);
+ if (xt_is_inflight) {
+ enqueue_inflight_read_to_xt_cache(wc, xt_idx, xt_io_descr);
+ return;
+ }
+ return read_cached_extent_cb(wc, xt_idx, xt_io_descr);
+ } else {
+ ret = try_insert_into_xt_cache(wc, extent);
+ if (-1 != ret) {
+ xt_idx = (unsigned)ret;
+ modify_bit(&wc->xt_cache.inflight_bitmap, xt_idx, 1);
+ wc->xt_cache.extent_array[xt_idx].inflight_io_descr = xt_io_descr;
+ }
+ }
+
+ ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ /* freez(xt_io_descr);
+ return;*/
+ }
real_io_size = ALIGN_BYTES_CEILING(size_bytes);
xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb);
- assert (-1 != ret);
+ fatal_assert(-1 != ret);
ctx->stats.io_read_bytes += real_io_size;
++ctx->stats.io_read_requests;
ctx->stats.io_read_extent_bytes += real_io_size;
@@ -243,11 +470,117 @@ static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t ty
commit_data_extent(wc, (struct extent_io_descriptor *)data);
break;
default:
- assert(type == STORE_DATA);
+ fatal_assert(type == STORE_DATA);
break;
}
}
+static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
+{
+ int error;
+
+ error = uv_thread_join(wc->now_invalidating_dirty_pages);
+ if (error) {
+ error("uv_thread_join(): %s", uv_strerror(error));
+ }
+ freez(wc->now_invalidating_dirty_pages);
+ wc->now_invalidating_dirty_pages = NULL;
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+}
+
+static void invalidate_oldest_committed(void *arg)
+{
+ struct rrdengine_instance *ctx = arg;
+ struct rrdengine_worker_config *wc = &ctx->worker_config;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ int ret;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ Pvoid_t *PValue;
+ Word_t Index;
+ unsigned nr_committed_pages;
+
+ do {
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ for (Index = 0,
+ PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+
+ descr != NULL;
+
+ PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ fatal_assert(0 != descr->page_length);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) {
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
+ fatal_assert(1 == ret);
+ break;
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+
+ if (!descr) {
+ info("Failed to invalidate any dirty pages to relieve page cache pressure.");
+
+ goto out;
+ }
+ pg_cache_punch_hole(ctx, descr, 1, 1, NULL);
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1);
+ rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1);
+
+ } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx));
+out:
+ wc->cleanup_thread_invalidating_dirty_pages = 1;
+ /* wake up event loop */
+ fatal_assert(0 == uv_async_send(&wc->async));
+}
+
+void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned nr_committed_pages;
+ int error;
+
+ if (unlikely(ctx->quiesce != NO_QUIESCE)) /* Shutting down */
+ return;
+
+ uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
+
+ if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
+ /* delete the oldest page in memory */
+ if (wc->now_invalidating_dirty_pages) {
+ /* already deleting a page */
+ return;
+ }
+ errno = 0;
+ error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
+ "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path);
+
+ wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages));
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+
+ error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ freez(wc->now_invalidating_dirty_pages);
+ wc->now_invalidating_dirty_pages = NULL;
+ }
+ }
+}
+
void flush_pages_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
@@ -294,6 +627,7 @@ void flush_pages_cb(uv_fs_t* req)
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
pg_cache->committed_page_index.nr_committed_pages -= count;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ wc->inflight_dirty_pages -= count;
}
/*
@@ -338,7 +672,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
uint8_t page_write_pending;
- assert(0 != descr->page_length);
+ fatal_assert(0 != descr->page_length);
page_write_pending = 0;
rrdeng_page_descr_mutex_lock(ctx, descr);
@@ -355,7 +689,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
if (page_write_pending) {
ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
- assert(1 == ret);
+ fatal_assert(1 == ret);
}
}
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
@@ -366,6 +700,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
complete(completion);
return 0;
}
+ wc->inflight_dirty_pages += count;
+
xt_io_descr = mallocz(sizeof(*xt_io_descr));
payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
switch (compression_algorithm) {
@@ -373,7 +709,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
break;
default: /* Compress */
- assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
+ fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
compressed_buf = mallocz(max_compressed_size);
size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
@@ -453,7 +789,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
real_io_size = ALIGN_BYTES_CEILING(size_bytes);
xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb);
- assert (-1 != ret);
+ fatal_assert(-1 != ret);
ctx->stats.io_write_bytes += real_io_size;
++ctx->stats.io_write_requests;
ctx->stats.io_write_extent_bytes += real_io_size;
@@ -466,17 +802,15 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
return ALIGN_BYTES_CEILING(size_bytes);
}
-static void after_delete_old_data(uv_work_t *req, int status)
+static void after_delete_old_data(struct rrdengine_worker_config* wc)
{
- struct rrdengine_instance *ctx = req->data;
- struct rrdengine_worker_config* wc = &ctx->worker_config;
+ struct rrdengine_instance *ctx = wc->ctx;
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
unsigned deleted_bytes, journalfile_bytes, datafile_bytes;
- int ret;
+ int ret, error;
char path[RRDENG_PATH_MAX];
- (void)status;
datafile = ctx->datafiles.first;
journalfile = datafile->journalfile;
datafile_bytes = datafile->pos;
@@ -503,19 +837,30 @@ static void after_delete_old_data(uv_work_t *req, int status)
ctx->disk_space -= deleted_bytes;
info("Reclaimed %u bytes of disk space.", deleted_bytes);
+ error = uv_thread_join(wc->now_deleting_files);
+ if (error) {
+ error("uv_thread_join(): %s", uv_strerror(error));
+ }
+ freez(wc->now_deleting_files);
/* unfreeze command processing */
- wc->now_deleting.data = NULL;
- /* wake up event loop */
- assert(0 == uv_async_send(&wc->async));
+ wc->now_deleting_files = NULL;
+
+ wc->cleanup_thread_deleting_files = 0;
+
+ /* interrupt event loop */
+ uv_stop(wc->loop);
}
-static void delete_old_data(uv_work_t *req)
+static void delete_old_data(void *arg)
{
- struct rrdengine_instance *ctx = req->data;
+ struct rrdengine_instance *ctx = arg;
+ struct rrdengine_worker_config* wc = &ctx->worker_config;
struct rrdengine_datafile *datafile;
struct extent_info *extent, *next;
struct rrdeng_page_descr *descr;
unsigned count, i;
+ uint8_t can_delete_metric;
+ uuid_t metric_id;
/* Safe to use since it will be deleted after we are done */
datafile = ctx->datafiles.first;
@@ -524,11 +869,21 @@ static void delete_old_data(uv_work_t *req)
count = extent->number_of_pages;
for (i = 0 ; i < count ; ++i) {
descr = extent->pages[i];
- pg_cache_punch_hole(ctx, descr, 0);
+ can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id);
+ if (unlikely(can_delete_metric && ctx->metalog_ctx->initialized)) {
+ /*
+ * If the metric is empty, has no active writers and if the metadata log has been initialized then
+ * attempt to delete the corresponding netdata dimension.
+ */
+ metalog_delete_dimension_by_uuid(ctx->metalog_ctx, &metric_id);
+ }
}
next = extent->next;
freez(extent);
}
+ wc->cleanup_thread_deleting_files = 1;
+ /* wake up event loop */
+ fatal_assert(0 == uv_async_send(&wc->async));
}
void rrdeng_test_quota(struct rrdengine_worker_config* wc)
@@ -537,10 +892,11 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
struct rrdengine_datafile *datafile;
unsigned current_size, target_size;
uint8_t out_of_space, only_one_datafile;
- int ret;
+ int ret, error;
out_of_space = 0;
- if (unlikely(ctx->disk_space > ctx->max_disk_space)) {
+ /* Do not allow the pinned pages to exceed the disk space quota to avoid deadlocks */
+ if (unlikely(ctx->disk_space > MAX(ctx->max_disk_space, 2 * ctx->metric_API_max_producers * RRDENG_BLOCK_SIZE))) {
out_of_space = 1;
}
datafile = ctx->datafiles.last;
@@ -557,9 +913,9 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
++ctx->last_fileno;
}
}
- if (unlikely(out_of_space)) {
+ if (unlikely(out_of_space && NO_QUIESCE == ctx->quiesce)) {
/* delete old data */
- if (wc->now_deleting.data) {
+ if (wc->now_deleting_files) {
/* already deleting data */
return;
}
@@ -571,8 +927,39 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
}
info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
- wc->now_deleting.data = ctx;
- assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data));
+ wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files));
+ wc->cleanup_thread_deleting_files = 0;
+
+ error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ freez(wc->now_deleting_files);
+ wc->now_deleting_files = NULL;
+ }
+ }
+}
+
+static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc)
+{
+ if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) {
+ return 1;
+ }
+ return 0;
+}
+
+static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) {
+ after_invalidate_oldest_committed(wc);
+ }
+ if (unlikely(wc->cleanup_thread_deleting_files)) {
+ after_delete_old_data(wc);
+ }
+ if (unlikely(SET_QUIESCE == ctx->quiesce && !rrdeng_threads_alive(wc))) {
+ ctx->quiesce = QUIESCED;
+ complete(&ctx->rrdengine_completion);
}
}
@@ -591,8 +978,8 @@ void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc)
{
wc->cmd_queue.head = wc->cmd_queue.tail = 0;
wc->queue_size = 0;
- assert(0 == uv_cond_init(&wc->cmd_cond));
- assert(0 == uv_mutex_init(&wc->cmd_mutex));
+ fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
+ fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
}
void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd)
@@ -604,7 +991,7 @@ void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd)
while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) {
uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
}
- assert(queue_size < RRDENG_CMD_Q_MAX_SIZE);
+ fatal_assert(queue_size < RRDENG_CMD_Q_MAX_SIZE);
/* enqueue command */
wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ?
@@ -613,7 +1000,7 @@ void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd)
uv_mutex_unlock(&wc->cmd_mutex);
/* wake up event loop */
- assert(0 == uv_async_send(&wc->async));
+ fatal_assert(0 == uv_async_send(&wc->async));
}
struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc)
@@ -657,39 +1044,44 @@ void async_cb(uv_async_t *handle)
void timer_cb(uv_timer_t* handle)
{
struct rrdengine_worker_config* wc = handle->data;
+ struct rrdengine_instance *ctx = wc->ctx;
uv_stop(handle->loop);
uv_update_time(handle->loop);
+ if (unlikely(!ctx->metalog_ctx->initialized))
+ return; /* Wait for the metadata log to initialize */
rrdeng_test_quota(wc);
debug(D_RRDENGINE, "%s: timeout reached.", __func__);
- if (likely(!wc->now_deleting.data)) {
- /* There is free space so we can write to disk */
- struct rrdengine_instance *ctx = wc->ctx;
+ if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) {
+ /* There is free space so we can write to disk and we are not actively deleting dirty buffers */
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark,
high_watermark;
- uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
- uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
- producers = ctx->stats.metric_API_producers;
+ uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
+ producers = ctx->metric_API_max_producers;
/* are flushable pages more than 25% of the maximum page cache size */
high_watermark = (ctx->max_cache_pages * 25LLU) / 100;
low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */
- if (nr_committed_pages > producers &&
- /* committed to be written pages are more than the produced number */
- nr_committed_pages - producers > high_watermark) {
- /* Flushing speed must increase to stop page cache from filling with dirty pages */
- bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
- }
- bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
+ /* Flush more pages only if disk can keep up */
+ if (wc->inflight_dirty_pages < high_watermark + producers) {
+ if (nr_committed_pages > producers &&
+ /* committed to be written pages are more than the produced number */
+ nr_committed_pages - producers > high_watermark) {
+ /* Flushing speed must increase to stop page cache from filling with dirty pages */
+ bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
+ }
+ bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
- debug(D_RRDENGINE, "Flushing pages to disk.");
- for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ;
- bytes_written && (total_bytes < bytes_to_write) ;
- total_bytes += bytes_written) {
- bytes_written = do_flush_pages(wc, 0, NULL);
+ debug(D_RRDENGINE, "Flushing pages to disk.");
+ for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL);
+ bytes_written && (total_bytes < bytes_to_write);
+ total_bytes += bytes_written) {
+ bytes_written = do_flush_pages(wc, 0, NULL);
+ }
}
}
#ifdef NETDATA_INTERNAL_CHECKS
@@ -730,7 +1122,12 @@ void rrdeng_worker(void* arg)
}
wc->async.data = wc;
- wc->now_deleting.data = NULL;
+ wc->now_deleting_files = NULL;
+ wc->cleanup_thread_deleting_files = 0;
+
+ wc->now_invalidating_dirty_pages = NULL;
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+ wc->inflight_dirty_pages = 0;
/* dirty page flushing timer */
ret = uv_timer_init(loop, &timer_req);
@@ -744,10 +1141,11 @@ void rrdeng_worker(void* arg)
/* wake up initialization thread */
complete(&ctx->rrdengine_completion);
- assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
+ fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
shutdown = 0;
- while (shutdown == 0 || uv_loop_alive(loop)) {
+ while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) {
uv_run(loop, UV_RUN_DEFAULT);
+ rrdeng_cleanup_finished_threads(wc);
/* wait for commands */
cmd_batch_size = 0;
@@ -769,14 +1167,20 @@ void rrdeng_worker(void* arg)
break;
case RRDENG_SHUTDOWN:
shutdown = 1;
- /*
- * uv_async_send after uv_close does not seem to crash in linux at the moment,
- * it is however undocumented behaviour and we need to be aware if this becomes
- * an issue in the future.
- */
- uv_close((uv_handle_t *)&wc->async, NULL);
- assert(0 == uv_timer_stop(&timer_req));
+ break;
+ case RRDENG_QUIESCE:
+ ctx->drop_metrics_under_page_cache_pressure = 0;
+ ctx->quiesce = SET_QUIESCE;
+ fatal_assert(0 == uv_timer_stop(&timer_req));
uv_close((uv_handle_t *)&timer_req, NULL);
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all committed pages. */
+ }
+ wal_flush_transaction_buffer(wc);
+ if (!rrdeng_threads_alive(wc)) {
+ ctx->quiesce = QUIESCED;
+ complete(&ctx->rrdengine_completion);
+ }
break;
case RRDENG_READ_PAGE:
do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
@@ -788,16 +1192,16 @@ void rrdeng_worker(void* arg)
do_commit_transaction(wc, STORE_DATA, NULL);
break;
case RRDENG_FLUSH_PAGES: {
- unsigned bytes_written;
-
- /* First I/O should be enough to call completion */
- bytes_written = do_flush_pages(wc, 1, cmd.completion);
- if (bytes_written) {
- while (do_flush_pages(wc, 1, NULL) && likely(!wc->now_deleting.data)) {
- ; /* Force flushing of all committed pages if there is free space. */
- }
+ if (wc->now_invalidating_dirty_pages) {
+ /* Do not flush if the disk cannot keep up */
+ complete(cmd.completion);
+ } else {
+ (void)do_flush_pages(wc, 1, cmd.completion);
}
break;
+ case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE:
+ rrdeng_invalidate_oldest_committed(wc);
+ break;
}
default:
debug(D_RRDENGINE, "%s: default.", __func__);
@@ -805,11 +1209,17 @@ void rrdeng_worker(void* arg)
}
} while (opcode != RRDENG_NOOP);
}
+
/* cleanup operations of the event loop */
- if (unlikely(wc->now_deleting.data)) {
- info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
- }
info("Shutting down RRD engine event loop.");
+
+ /*
+ * uv_async_send after uv_close does not seem to crash in linux at the moment,
+ * it is however undocumented behaviour and we need to be aware if this becomes
+ * an issue in the future.
+ */
+ uv_close((uv_handle_t *)&wc->async, NULL);
+
while (do_flush_pages(wc, 1, NULL)) {
; /* Force flushing of all committed pages. */
}
@@ -820,7 +1230,7 @@ void rrdeng_worker(void* arg)
/* TODO: don't let the API block by waiting to enqueue commands */
uv_cond_destroy(&wc->cmd_cond);
/* uv_mutex_destroy(&wc->cmd_mutex); */
- assert(0 == uv_loop_close(loop));
+ fatal_assert(0 == uv_loop_close(loop));
freez(loop);
return;
@@ -828,7 +1238,7 @@ void rrdeng_worker(void* arg)
error_after_timer_init:
uv_close((uv_handle_t *)&wc->async, NULL);
error_after_async_init:
- assert(0 == uv_loop_close(loop));
+ fatal_assert(0 == uv_loop_close(loop));
error_after_loop_init:
freez(loop);
@@ -845,11 +1255,12 @@ void rrdengine_main(void)
int ret;
struct rrdengine_instance *ctx;
- ret = rrdeng_init(&ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB);
+ sanity_check();
+ ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB);
if (ret) {
exit(ret);
}
rrdeng_exit(ctx);
fprintf(stderr, "Hello world!");
exit(0);
-} \ No newline at end of file
+}