summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-07-20 04:49:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-07-20 04:49:55 +0000
commitab1bb5b7f1c3c3a7b240ab7fc8661459ecd7decb (patch)
tree7a900833aad3ccc685712c6c2a7d87576d54f427 /database/engine/rrdengine.c
parentAdding upstream version 1.40.1. (diff)
downloadnetdata-upstream/1.41.0.tar.xz
netdata-upstream/1.41.0.zip
Adding upstream version 1.41.0.upstream/1.41.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c123
1 files changed, 68 insertions, 55 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 7811a5eaa..ce363183d 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -351,7 +351,7 @@ static struct {
static void wal_cleanup1(void) {
WAL *wal = NULL;
- if(!netdata_spinlock_trylock(&wal_globals.protected.spinlock))
+ if(!spinlock_trylock(&wal_globals.protected.spinlock))
return;
if(wal_globals.protected.available_items && wal_globals.protected.available > storage_tiers) {
@@ -360,7 +360,7 @@ static void wal_cleanup1(void) {
wal_globals.protected.available--;
}
- netdata_spinlock_unlock(&wal_globals.protected.spinlock);
+ spinlock_unlock(&wal_globals.protected.spinlock);
if(wal) {
posix_memfree(wal->buf);
@@ -375,7 +375,7 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) {
WAL *wal = NULL;
- netdata_spinlock_lock(&wal_globals.protected.spinlock);
+ spinlock_lock(&wal_globals.protected.spinlock);
if(likely(wal_globals.protected.available_items)) {
wal = wal_globals.protected.available_items;
@@ -384,7 +384,7 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) {
}
uint64_t transaction_id = __atomic_fetch_add(&ctx->atomic.transaction_id, 1, __ATOMIC_RELAXED);
- netdata_spinlock_unlock(&wal_globals.protected.spinlock);
+ spinlock_unlock(&wal_globals.protected.spinlock);
if(unlikely(!wal)) {
wal = mallocz(sizeof(WAL));
@@ -416,10 +416,10 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) {
void wal_release(WAL *wal) {
if(unlikely(!wal)) return;
- netdata_spinlock_lock(&wal_globals.protected.spinlock);
+ spinlock_lock(&wal_globals.protected.spinlock);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next);
wal_globals.protected.available++;
- netdata_spinlock_unlock(&wal_globals.protected.spinlock);
+ spinlock_unlock(&wal_globals.protected.spinlock);
}
// ----------------------------------------------------------------------------
@@ -459,7 +459,7 @@ void rrdeng_dequeue_epdl_cmd(struct rrdeng_cmd *cmd) {
}
void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY priority) {
- netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
struct rrdeng_cmd *cmd = get_cmd_cb(data);
if(cmd) {
@@ -472,7 +472,7 @@ void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY
}
}
- netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
}
void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, void *data, struct completion *completion,
@@ -489,12 +489,12 @@ void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, v
cmd->priority = priority;
cmd->dequeue_cb = dequeue_cb;
- netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next);
rrdeng_main.cmd_queue.unsafe.waiting++;
if(enqueue_cb)
enqueue_cb(cmd);
- netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
fatal_assert(0 == uv_async_send(&rrdeng_main.async));
}
@@ -532,7 +532,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) {
}
// find an opcode to execute from the queue
- netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
for(STORAGE_PRIORITY priority = min_priority; priority <= max_priority ; priority++) {
cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority];
if(cmd) {
@@ -559,7 +559,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) {
cmd->dequeue_cb = NULL;
}
- netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
struct rrdeng_cmd ret;
if(cmd) {
@@ -712,9 +712,9 @@ static void *extent_flushed_to_open_tp_worker(struct rrdengine_instance *ctx __m
posix_memfree(xt_io_descr->buf);
extent_io_descriptor_release(xt_io_descr);
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
datafile->writers.flushed_to_open_running--;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
if(datafile->fileno != ctx_last_fileno_get(ctx) && still_running)
// we just finished a flushing on a datafile that is not the active one
@@ -733,15 +733,15 @@ static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) {
if (uv_fs_request->result < 0) {
ctx_io_error(ctx);
- error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result));
+ netdata_log_error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result));
}
journalfile_v1_extent_write(ctx, xt_io_descr->datafile, xt_io_descr->wal, &rrdeng_main.loop);
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
datafile->writers.running--;
datafile->writers.flushed_to_open_running++;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
rrdeng_enq_cmd(xt_io_descr->ctx,
RRDENG_OPCODE_FLUSHED_TO_OPEN,
@@ -756,12 +756,12 @@ static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) {
static bool datafile_is_full(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) {
bool ret = false;
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
if(ctx_is_available_for_queries(ctx) && datafile->pos > rrdeng_target_data_file_size(ctx))
ret = true;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
return ret;
}
@@ -773,9 +773,9 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
datafile = ctx->datafiles.first->prev;
// become a writer on this datafile, to prevent it from vanishing
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
datafile->writers.running++;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
if(datafile_is_full(ctx, datafile)) {
@@ -791,7 +791,7 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_
datafile = ctx->datafiles.first->prev;
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
- if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx) == 0)
+ if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx, true) == 0)
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL,
NULL);
@@ -801,15 +801,15 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
datafile = ctx->datafiles.first->prev;
// become a writer on this datafile, to prevent it from vanishing
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
datafile->writers.running++;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
// release the writers on the old datafile
- netdata_spinlock_lock(&old_datafile->writers.spinlock);
+ spinlock_lock(&old_datafile->writers.spinlock);
old_datafile->writers.running--;
- netdata_spinlock_unlock(&old_datafile->writers.spinlock);
+ spinlock_unlock(&old_datafile->writers.spinlock);
}
return datafile;
@@ -921,11 +921,11 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
real_io_size = ALIGN_BYTES_CEILING(size_bytes);
datafile = get_datafile_to_write_extent(ctx);
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
xt_io_descr->datafile = datafile;
xt_io_descr->pos = datafile->pos;
datafile->pos += real_io_size;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
xt_io_descr->bytes = size_bytes;
xt_io_descr->uv_fs_request.data = xt_io_descr;
@@ -998,12 +998,14 @@ struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struc
return next_datafile;
}
-void find_uuid_first_time(
+time_t find_uuid_first_time(
struct rrdengine_instance *ctx,
struct rrdengine_datafile *datafile,
struct uuid_first_time_s *uuid_first_entry_list,
size_t count)
{
+ time_t global_first_time_s = LONG_MAX;
+
// acquire the datafile to work with it
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
while(datafile && !datafile_acquire(datafile, DATAFILE_ACQUIRE_RETENTION))
@@ -1011,7 +1013,7 @@ void find_uuid_first_time(
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
if (unlikely(!datafile))
- return;
+ return global_first_time_s;
unsigned journalfile_count = 0;
size_t binary_match = 0;
@@ -1025,6 +1027,10 @@ void find_uuid_first_time(
}
time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
+
+ if(journal_start_time_s < global_first_time_s)
+ global_first_time_s = journal_start_time_s;
+
struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
struct uuid_first_time_s *uuid_original_entry;
@@ -1137,9 +1143,13 @@ void find_uuid_first_time(
without_retention,
without_metric
);
+
+ return global_first_time_s;
}
static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) {
+ time_t global_first_time_s = LONG_MAX;
+
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS);
@@ -1174,7 +1184,7 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
added++;
}
- info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u",
+ netdata_log_info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u",
ctx->config.tier, count, first_datafile_remaining->fileno);
journalfile_v2_data_release(journalfile);
@@ -1184,12 +1194,12 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_FIND_REMAINING_RETENTION);
- find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added);
+ global_first_time_s = find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added);
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG);
- info("DBENGINE: updating tier %d metrics registry retention for %zu metrics",
+ netdata_log_info("DBENGINE: updating tier %d metrics registry retention for %zu metrics",
ctx->config.tier, added);
size_t deleted_metrics = 0, zero_retention_referenced = 0, zero_disk_retention = 0, zero_disk_but_live = 0;
@@ -1223,6 +1233,9 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
"DBENGINE: deleted %zu metrics, zero retention but referenced %zu (out of %zu total, of which %zu have main cache retention) zero on-disk retention tier %d metrics from metrics registry",
deleted_metrics, zero_retention_referenced, zero_disk_retention, zero_disk_but_live, ctx->config.tier);
+ if(global_first_time_s != LONG_MAX)
+ __atomic_store_n(&ctx->atomic.first_time_s, global_first_time_s, __ATOMIC_RELAXED);
+
if(worker)
worker_is_idle();
}
@@ -1243,7 +1256,7 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *
datafile_got_for_deletion = datafile_acquire_for_deletion(datafile);
if (!datafile_got_for_deletion) {
- info("DBENGINE: waiting for data file '%s/"
+ netdata_log_info("DBENGINE: waiting for data file '%s/"
DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION
"' to be available for deletion, "
"it is in use currently by %u users.",
@@ -1255,7 +1268,7 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *
}
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_started, 1, __ATOMIC_RELAXED);
- info("DBENGINE: deleting data file '%s/"
+ netdata_log_info("DBENGINE: deleting data file '%s/"
DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION
"'.",
ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
@@ -1277,26 +1290,26 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *
journal_file_bytes = journalfile_current_size(journal_file);
deleted_bytes = journalfile_v2_data_size_get(journal_file);
- info("DBENGINE: deleting data and journal files to maintain disk quota");
+ netdata_log_info("DBENGINE: deleting data and journal files to maintain disk quota");
ret = journalfile_destroy_unsafe(journal_file, datafile);
if (!ret) {
journalfile_v1_generate_path(datafile, path, sizeof(path));
- info("DBENGINE: deleted journal file \"%s\".", path);
+ netdata_log_info("DBENGINE: deleted journal file \"%s\".", path);
journalfile_v2_generate_path(datafile, path, sizeof(path));
- info("DBENGINE: deleted journal file \"%s\".", path);
+ netdata_log_info("DBENGINE: deleted journal file \"%s\".", path);
deleted_bytes += journal_file_bytes;
}
ret = destroy_data_file_unsafe(datafile);
if (!ret) {
generate_datafilepath(datafile, path, sizeof(path));
- info("DBENGINE: deleted data file \"%s\".", path);
+ netdata_log_info("DBENGINE: deleted data file \"%s\".", path);
deleted_bytes += datafile_bytes;
}
freez(journal_file);
freez(datafile);
ctx_current_disk_space_decrease(ctx, deleted_bytes);
- info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes);
+ netdata_log_info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes);
}
static void *database_rotate_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
@@ -1334,11 +1347,11 @@ static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unuse
// find a datafile to work
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) {
- if(!netdata_spinlock_trylock(&datafile->populate_mrg.spinlock))
+ if(!spinlock_trylock(&datafile->populate_mrg.spinlock))
continue;
if(datafile->populate_mrg.populated) {
- netdata_spinlock_unlock(&datafile->populate_mrg.spinlock);
+ spinlock_unlock(&datafile->populate_mrg.spinlock);
continue;
}
@@ -1352,7 +1365,7 @@ static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unuse
journalfile_v2_populate_retention_to_mrg(ctx, datafile->journalfile);
datafile->populate_mrg.populated = true;
- netdata_spinlock_unlock(&datafile->populate_mrg.spinlock);
+ spinlock_unlock(&datafile->populate_mrg.spinlock);
} while(1);
@@ -1376,7 +1389,7 @@ static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unuse
__atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) {
if(!logged) {
logged = true;
- info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...",
+ netdata_log_info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...",
__atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED),
(ctx->config.legacy) ? -1 : ctx->config.tier);
}
@@ -1444,7 +1457,7 @@ void async_cb(uv_async_t *handle)
{
uv_stop(handle->loop);
uv_update_time(handle->loop);
- debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
+ netdata_log_debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
}
#define TIMER_PERIOD_MS (1000)
@@ -1496,17 +1509,17 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb
continue;
}
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
if(!available) {
- info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno);
+ netdata_log_info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno);
datafile = datafile->next;
continue;
}
- info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
+ netdata_log_info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
@@ -1623,21 +1636,21 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
static bool spawned = false;
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
- netdata_spinlock_lock(&spinlock);
+ spinlock_lock(&spinlock);
if(!spawned) {
int ret;
ret = uv_loop_init(&rrdeng_main.loop);
if (ret) {
- error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret));
return false;
}
rrdeng_main.loop.data = &rrdeng_main;
ret = uv_async_init(&rrdeng_main.loop, &rrdeng_main.async, async_cb);
if (ret) {
- error("DBENGINE: uv_async_init(): %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_async_init(): %s", uv_strerror(ret));
fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
return false;
}
@@ -1645,7 +1658,7 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.timer);
if (ret) {
- error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret));
uv_close((uv_handle_t *)&rrdeng_main.async, NULL);
fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
return false;
@@ -1658,7 +1671,7 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
spawned = true;
}
- netdata_spinlock_unlock(&spinlock);
+ spinlock_unlock(&spinlock);
return true;
}
@@ -1860,7 +1873,7 @@ void dbengine_event_loop(void* arg) {
}
/* cleanup operations of the event loop */
- info("DBENGINE: shutting down dbengine thread");
+ netdata_log_info("DBENGINE: shutting down dbengine thread");
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,