summaryrefslogtreecommitdiffstats
path: root/src/database/engine/rrdengine.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:53:24 +0000
commitb5f8ee61a7f7e9bd291dd26b0585d03eb686c941 (patch)
treed4d31289c39fc00da064a825df13a0b98ce95b10 /src/database/engine/rrdengine.c
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-upstream.tar.xz
netdata-upstream.zip
Adding upstream version 1.46.3.upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/database/engine/rrdengine.c (renamed from database/engine/rrdengine.c)348
1 files changed, 281 insertions, 67 deletions
diff --git a/database/engine/rrdengine.c b/src/database/engine/rrdengine.c
index b82cc1ad1..2d6583ead 100644
--- a/database/engine/rrdengine.c
+++ b/src/database/engine/rrdengine.c
@@ -3,6 +3,7 @@
#include "rrdengine.h"
#include "pdc.h"
+#include "dbengine-compression.h"
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
@@ -10,7 +11,7 @@ rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
-unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT;
+unsigned rrdeng_pages_per_extent = DEFAULT_PAGES_PER_EXTENT;
#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2)
#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2)
@@ -39,6 +40,7 @@ struct rrdeng_main {
uv_loop_t loop;
uv_async_t async;
uv_timer_t timer;
+ uv_timer_t retention_timer;
pid_t tid;
bool shutdown;
@@ -110,16 +112,10 @@ static void sanity_check(void)
/* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */
BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0);
- BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */
+ BUILD_BUG_ON(sizeof(nd_uuid_t) != UUID_SZ); /* check UUID size */
/* page count must fit in 8 bits */
BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
-
- /* extent cache count must fit in 32 bits */
-// BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32);
-
- /* page info scratch space must be able to hold 2 32-bit integers */
- BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t));
}
// ----------------------------------------------------------------------------
@@ -229,10 +225,10 @@ static void after_work_standard_callback(uv_work_t* req, int status) {
worker_is_idle();
}
-static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb work_cb, after_work_cb after_work_cb) {
+static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb do_work_cb, after_work_cb do_after_work_cb) {
struct rrdeng_work *work_request = NULL;
- internal_fatal(rrdeng_main.tid != gettid(), "work_dispatch() can only be run from the event loop thread");
+ internal_fatal(rrdeng_main.tid != gettid_cached(), "work_dispatch() can only be run from the event loop thread");
work_request = aral_mallocz(rrdeng_main.work_cmd.ar);
memset(work_request, 0, sizeof(struct rrdeng_work));
@@ -240,8 +236,8 @@ static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct com
work_request->ctx = ctx;
work_request->data = data;
work_request->completion = completion;
- work_request->work_cb = work_cb;
- work_request->after_work_cb = after_work_cb;
+ work_request->work_cb = do_work_cb;
+ work_request->after_work_cb = do_after_work_cb;
work_request->opcode = opcode;
if(uv_queue_work(&rrdeng_main.loop, &work_request->req, work_standard_worker, after_work_standard_callback)) {
@@ -772,13 +768,10 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_
*/
static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) {
int ret;
- int compressed_size, max_compressed_size = 0;
unsigned i, count, size_bytes, pos, real_io_size;
- uint32_t uncompressed_payload_length, payload_offset;
+ uint32_t uncompressed_payload_length, max_compressed_size, payload_offset;
struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
struct extent_io_descriptor *xt_io_descr;
- struct extent_buffer *eb = NULL;
- void *compressed_buf = NULL;
Word_t Index;
uint8_t compression_algorithm = ctx->config.global_compress_alg;
struct rrdengine_datafile *datafile;
@@ -807,20 +800,8 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
xt_io_descr = extent_io_descriptor_get();
xt_io_descr->ctx = ctx;
payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
- switch (compression_algorithm) {
- case RRD_NO_COMPRESSION:
- size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
- break;
-
- default: /* Compress */
- fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
- max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
- eb = extent_buffer_get(max_compressed_size);
- compressed_buf = eb->data;
- size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
- break;
- }
-
+ max_compressed_size = dbengine_max_compressed_size(uncompressed_payload_length, compression_algorithm);
+ size_bytes = payload_offset + MAX(uncompressed_payload_length, max_compressed_size) + sizeof(*trailer);
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("DBENGINE: posix_memalign:%s", strerror(ret));
@@ -832,23 +813,22 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
pos = 0;
header = xt_io_descr->buf;
- header->compression_algorithm = compression_algorithm;
header->number_of_pages = count;
pos += sizeof(*header);
for (i = 0 ; i < count ; ++i) {
descr = xt_io_descr->descr_array[i];
header->descr[i].type = descr->type;
- uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
+ uuid_copy(*(nd_uuid_t *)header->descr[i].uuid, *descr->id);
header->descr[i].page_length = descr->page_length;
header->descr[i].start_time_ut = descr->start_time_ut;
switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
header->descr[i].end_time_ut = descr->end_time_ut;
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC);
header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd);
break;
@@ -858,29 +838,40 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
pos += sizeof(header->descr[i]);
}
+
+ // build the extent payload
for (i = 0 ; i < count ; ++i) {
descr = xt_io_descr->descr_array[i];
pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length);
pos += descr->page_length;
}
- if(likely(compression_algorithm == RRD_LZ4)) {
- compressed_size = LZ4_compress_default(
- xt_io_descr->buf + payload_offset,
- compressed_buf,
- (int)uncompressed_payload_length,
- max_compressed_size);
+ // compress the payload
+ size_t compressed_size =
+ (int)dbengine_compress(xt_io_descr->buf + payload_offset,
+ uncompressed_payload_length,
+ compression_algorithm);
- __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED);
- __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED);
+ internal_fatal(compressed_size > max_compressed_size, "DBENGINE: compression returned more data than the max allowed");
+ internal_fatal(compressed_size > uncompressed_payload_length, "DBENGINE: compression returned more data than the uncompressed extent");
- (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
- extent_buffer_release(eb);
- size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+ if(compressed_size) {
+ header->compression_algorithm = compression_algorithm;
header->payload_length = compressed_size;
}
- else { // RRD_NO_COMPRESSION
- header->payload_length = uncompressed_payload_length;
+ else {
+ // compression failed, or generated bigger pages
+ // so it didn't touch our uncompressed buffer
+ header->compression_algorithm = RRDENG_COMPRESSION_NONE;
+ header->payload_length = compressed_size = uncompressed_payload_length;
+ }
+
+ // set the correct size
+ size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+
+ if(compression_algorithm != RRDENG_COMPRESSION_NONE) {
+ __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED);
}
real_io_size = ALIGN_BYTES_CEILING(size_bytes);
@@ -939,7 +930,7 @@ static void after_database_rotate(struct rrdengine_instance *ctx __maybe_unused,
}
struct uuid_first_time_s {
- uuid_t *uuid;
+ nd_uuid_t *uuid;
time_t first_time_s;
METRIC *metric;
size_t pages_found;
@@ -1171,7 +1162,17 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
for (size_t index = 0; index < added; ++index) {
uuid_first_t_entry = &uuid_first_entry_list[index];
if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) {
- mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s);
+
+ time_t old_first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric);
+
+ bool changed = mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s);
+ if (changed) {
+ uint32_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric);
+ if (update_every_s && old_first_time_s && uuid_first_t_entry->first_time_s > old_first_time_s) {
+ uint64_t remove_samples = (uuid_first_t_entry->first_time_s - old_first_time_s) / update_every_s;
+ __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED);
+ }
+ }
mrg_metric_release(main_mrg, uuid_first_t_entry->metric);
}
else {
@@ -1180,6 +1181,14 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
// there is no retention for this metric
bool has_retention = mrg_metric_zero_disk_retention(main_mrg, uuid_first_t_entry->metric);
if (!has_retention) {
+ time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric);
+ time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, uuid_first_t_entry->metric);
+ time_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric);
+ if (update_every_s && first_time_s && last_time_s) {
+ uint64_t remove_samples = (first_time_s - last_time_s) / update_every_s;
+ __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED);
+ }
+
bool deleted = mrg_metric_release_and_delete(main_mrg, uuid_first_t_entry->metric);
if(deleted)
deleted_metrics++;
@@ -1280,7 +1289,7 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *
static void *database_rotate_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
datafile_delete(ctx, ctx->datafiles.first, ctx_is_available_for_queries(ctx), true);
- if (rrdeng_ctx_exceeded_disk_quota(ctx))
+ if (rrdeng_ctx_tier_cap_exceeded(ctx))
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
rrdcontext_db_rotation();
@@ -1352,8 +1361,7 @@ static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unuse
if(!logged) {
logged = true;
netdata_log_info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...",
- __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED),
- (ctx->config.legacy) ? -1 : ctx->config.tier);
+ __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED), ctx->config.tier);
}
sleep_usec(1 * USEC_PER_MS);
}
@@ -1390,26 +1398,27 @@ static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused,
}
uint64_t rrdeng_target_data_file_size(struct rrdengine_instance *ctx) {
- uint64_t target_size = ctx->config.max_disk_space / TARGET_DATAFILES;
+ uint64_t target_size = ctx->config.max_disk_space ? ctx->config.max_disk_space / TARGET_DATAFILES : MAX_DATAFILE_SIZE;
target_size = MIN(target_size, MAX_DATAFILE_SIZE);
target_size = MAX(target_size, MIN_DATAFILE_SIZE);
return target_size;
}
-bool rrdeng_ctx_exceeded_disk_quota(struct rrdengine_instance *ctx)
+time_t get_datafile_end_time(struct rrdengine_instance *ctx)
{
- if(!ctx->datafiles.first)
- // no datafiles available
- return false;
+ time_t last_time_s = 0;
- if(!ctx->datafiles.first->next)
- // only 1 datafile available
- return false;
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+ struct rrdengine_datafile *datafile = ctx->datafiles.first;
- uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) -
- (ctx->datafiles.first->prev ? ctx->datafiles.first->prev->pos : 0);
+ if (datafile) {
+ last_time_s = datafile->journalfile->v2.last_time_s;
+ if (!last_time_s)
+ last_time_s = datafile->journalfile->v2.first_time_s;
+ }
- return estimated_disk_space > ctx->config.max_disk_space;
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+ return last_time_s;
}
/* return 0 on success */
@@ -1580,6 +1589,80 @@ static void *cleanup_tp_worker(struct rrdengine_instance *ctx __maybe_unused, vo
return data;
}
+uint64_t get_used_disk_space(struct rrdengine_instance *ctx)
+{
+ uint64_t active_space = 0;
+
+ if (ctx->datafiles.first && ctx->datafiles.first->prev)
+ active_space = ctx->datafiles.first->prev->pos;
+
+ uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) - active_space;
+
+ uint64_t database_space = get_total_database_space();
+ uint64_t adjusted_database_space = database_space * ctx->config.disk_percentage / 100 ;
+ estimated_disk_space += adjusted_database_space;
+
+ return estimated_disk_space;
+}
+
+static time_t get_tier_retention(struct rrdengine_instance *ctx)
+{
+ time_t retention = 0;
+ if (localhost) {
+ STORAGE_ENGINE *eng = localhost->db[ctx->config.tier].eng;
+ if (eng) {
+ time_t first_time_s = get_datafile_end_time(ctx);
+ if (first_time_s)
+ retention = now_realtime_sec() - first_time_s;
+ }
+ }
+ return retention;
+}
+
+// Check if disk or retention time cap reached
+bool rrdeng_ctx_tier_cap_exceeded(struct rrdengine_instance *ctx)
+{
+ if(!ctx->datafiles.first)
+ // no datafiles available
+ return false;
+
+ if(!ctx->datafiles.first->next)
+ // only 1 datafile available
+ return false;
+
+ uint64_t estimated_disk_space = get_used_disk_space(ctx);
+ time_t retention = get_tier_retention(ctx);
+
+ if (ctx->config.max_retention_s && retention > ctx->config.max_retention_s)
+ return true;
+
+ if (ctx->config.max_disk_space && estimated_disk_space > ctx->config.max_disk_space)
+ return true;
+
+ return false;
+}
+
+void retention_timer_cb(uv_timer_t *handle)
+{
+ if (!localhost)
+ return;
+
+ worker_is_busy(RRDENG_TIMER_CB);
+ uv_stop(handle->loop);
+ uv_update_time(handle->loop);
+
+ for (size_t tier = 0; tier < storage_tiers; tier++) {
+ STORAGE_ENGINE *eng = localhost->db[tier].eng;
+ if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE)
+ continue;
+ bool cleanup = rrdeng_ctx_tier_cap_exceeded(multidb_ctx[tier]);
+ if (cleanup)
+ rrdeng_enq_cmd(multidb_ctx[tier], RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
+ }
+
+ worker_is_idle();
+}
+
void timer_cb(uv_timer_t* handle) {
worker_is_busy(RRDENG_TIMER_CB);
uv_stop(handle->loop);
@@ -1643,7 +1726,17 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
return false;
}
+
+ ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.retention_timer);
+ if (ret) {
+ netdata_log_error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret));
+ uv_close((uv_handle_t *)&rrdeng_main.async, NULL);
+ fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
+ return false;
+ }
+
rrdeng_main.timer.data = &rrdeng_main;
+ rrdeng_main.retention_timer.data = &rrdeng_main;
dbengine_initialize_structures();
@@ -1675,9 +1768,125 @@ static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_w
work_dispatch(ctx, pdc, NULL, cmd.opcode, query_prep_tp_worker, NULL);
}
+uint64_t get_directory_free_bytes_space(struct rrdengine_instance *ctx)
+{
+ uint64_t free_bytes = 0;
+ struct statvfs buff_statvfs;
+ if (statvfs(ctx->config.dbfiles_path, &buff_statvfs) == 0)
+ free_bytes = buff_statvfs.f_bavail * buff_statvfs.f_bsize;
+
+ return (free_bytes - (free_bytes * 5 / 100));
+}
+
+void calculate_tier_disk_space_percentage(void)
+{
+ uint64_t tier_space[RRD_STORAGE_TIERS];
+
+ if (!localhost)
+ return;
+
+ uint64_t total_diskspace = 0;
+ for(size_t tier = 0; tier < storage_tiers ;tier++) {
+ STORAGE_ENGINE *eng = localhost->db[tier].eng;
+ if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE) {
+ tier_space[tier] = 0;
+ continue;
+ }
+ uint64_t tier_disk_space = multidb_ctx[tier]->config.max_disk_space ?
+ multidb_ctx[tier]->config.max_disk_space :
+ get_directory_free_bytes_space(multidb_ctx[tier]);
+ total_diskspace += tier_disk_space;
+ tier_space[tier] = tier_disk_space;
+ }
+
+ if (total_diskspace) {
+ for (size_t tier = 0; tier < storage_tiers; tier++) {
+ multidb_ctx[tier]->config.disk_percentage = (100 * tier_space[tier] / total_diskspace);
+ }
+ }
+}
+
+void dbengine_retention_statistics(void)
+{
+ static bool init = false;
+ static DBENGINE_TIER_STATS stats[RRD_STORAGE_TIERS];
+
+ if (!localhost)
+ return;
+
+ calculate_tier_disk_space_percentage();
+
+ for (size_t tier = 0; tier < storage_tiers; tier++) {
+ STORAGE_ENGINE *eng = localhost->db[tier].eng;
+ if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE)
+ continue;
+
+ if (init == false) {
+ char id[200];
+ snprintfz(id, sizeof(id) - 1, "dbengine_retention_tier%zu", tier);
+ stats[tier].st = rrdset_create_localhost(
+ "netdata",
+ id,
+ NULL,
+ "dbengine retention",
+ "netdata.dbengine_tier_retention",
+ "dbengine space and time retention",
+ "%",
+ "netdata",
+ "stats",
+ 134900, // before "dbengine memory" (dbengine2_statistics_charts)
+ 10,
+ RRDSET_TYPE_LINE);
+
+ stats[tier].rd_space = rrddim_add(stats[tier].st, "space", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ stats[tier].rd_time = rrddim_add(stats[tier].st, "time", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+
+ char tier_str[5];
+ snprintfz(tier_str, 4, "%zu", tier);
+ rrdlabels_add(stats[tier].st->rrdlabels, "tier", tier_str, RRDLABEL_SRC_AUTO);
+
+ rrdset_flag_set(stats[tier].st, RRDSET_FLAG_METADATA_UPDATE);
+ rrdhost_flag_set(stats[tier].st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
+ rrdset_metadata_updated(stats[tier].st);
+ }
+
+ time_t first_time_s = storage_engine_global_first_time_s(eng->seb, localhost->db[tier].si);
+ time_t retention = first_time_s ? now_realtime_sec() - first_time_s : 0;
+
+ //
+ // Note: storage_engine_disk_space_used is the exact diskspace (as reported by api/v2/node_instances
+ // get_used_disk_space is used to determine if database cleanup (file rotation should happen)
+ // and adds to the disk space used the desired file size of the active
+ // datafile
+ uint64_t disk_space = get_used_disk_space(multidb_ctx[tier]);
+ //uint64_t disk_space = storage_engine_disk_space_used(eng->seb, localhost->db[tier].si);
+
+ uint64_t config_disk_space = storage_engine_disk_space_max(eng->seb, localhost->db[tier].si);
+ if (!config_disk_space) {
+ config_disk_space = get_directory_free_bytes_space(multidb_ctx[tier]);
+ config_disk_space += disk_space;
+ }
+
+ collected_number disk_percentage = (collected_number) (config_disk_space ? 100 * disk_space / config_disk_space : 0);
+
+ collected_number retention_percentage = (collected_number)multidb_ctx[tier]->config.max_retention_s ?
+ 100 * retention / multidb_ctx[tier]->config.max_retention_s :
+ 0;
+
+ if (retention_percentage > 100)
+ retention_percentage = 100;
+
+ rrddim_set_by_pointer(stats[tier].st, stats[tier].rd_space, (collected_number) disk_percentage);
+ rrddim_set_by_pointer(stats[tier].st, stats[tier].rd_time, (collected_number) retention_percentage);
+
+ rrdset_done(stats[tier].st);
+ }
+ init = true;
+}
+
void dbengine_event_loop(void* arg) {
sanity_check();
- uv_thread_set_name_np(pthread_self(), "DBENGINE");
+ uv_thread_set_name_np("DBENGINE");
service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
worker_register("DBENGINE");
@@ -1721,9 +1930,10 @@ void dbengine_event_loop(void* arg) {
struct rrdeng_main *main = arg;
enum rrdeng_opcode opcode;
struct rrdeng_cmd cmd;
- main->tid = gettid();
+ main->tid = gettid_cached();
fatal_assert(0 == uv_timer_start(&main->timer, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
+ fatal_assert(0 == uv_timer_start(&main->retention_timer, retention_timer_cb, TIMER_PERIOD_MS * 60, TIMER_PERIOD_MS * 60));
bool shutdown = false;
while (likely(!shutdown)) {
@@ -1804,7 +2014,7 @@ void dbengine_event_loop(void* arg) {
if (!__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) &&
ctx->datafiles.first->next != NULL &&
ctx->datafiles.first->next->next != NULL &&
- rrdeng_ctx_exceeded_disk_quota(ctx)) {
+ rrdeng_ctx_tier_cap_exceeded(ctx)) {
__atomic_store_n(&ctx->atomic.now_deleting_files, true, __ATOMIC_RELAXED);
work_dispatch(ctx, NULL, NULL, opcode, database_rotate_tp_worker, after_database_rotate);
@@ -1841,7 +2051,11 @@ void dbengine_event_loop(void* arg) {
uv_close((uv_handle_t *)&main->async, NULL);
(void) uv_timer_stop(&main->timer);
uv_close((uv_handle_t *)&main->timer, NULL);
+
+ (void) uv_timer_stop(&main->retention_timer);
+ uv_close((uv_handle_t *)&main->retention_timer, NULL);
shutdown = true;
+ break;
}
case RRDENG_OPCODE_NOOP: {