From 2d852004321781e79bb7f59bf61603d66000daae Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 5 Dec 2022 17:29:34 +0100 Subject: Merging upstream version 1.37.1. Signed-off-by: Daniel Baumann --- .github/data/distros.yml | 2 +- CHANGELOG.md | 25 +- collectors/diskspace.plugin/plugin_diskspace.c | 2 +- collectors/proc.plugin/ipc.c | 8 +- daemon/global_statistics.c | 267 ++++++++++++++------- daemon/service.c | 6 +- database/engine/pagecache.c | 9 +- database/engine/pagecache.h | 1 - database/engine/rrdengine.c | 36 ++- database/engine/rrdengine.h | 2 + database/engine/rrdengineapi.c | 98 ++++---- database/engine/rrdengineapi.h | 10 +- database/ram/rrddim_mem.c | 8 +- database/ram/rrddim_mem.h | 6 +- database/rrd.h | 6 +- database/rrdcontext.c | 7 +- database/rrddim.c | 12 +- database/sqlite/sqlite_context.c | 2 + database/sqlite/sqlite_functions.c | 2 + libnetdata/dictionary/dictionary.c | 42 ++-- libnetdata/procfile/procfile.c | 67 +++--- libnetdata/procfile/procfile.h | 15 +- libnetdata/socket/security.c | 38 ++- libnetdata/string/string.c | 9 +- libnetdata/worker_utilization/worker_utilization.c | 94 +++++--- libnetdata/worker_utilization/worker_utilization.h | 4 +- packaging/version | 2 +- streaming/replication.c | 213 +++++++++------- streaming/rrdpush.h | 12 +- streaming/sender.c | 11 +- web/api/queries/query.c | 10 +- 31 files changed, 624 insertions(+), 402 deletions(-) diff --git a/.github/data/distros.yml b/.github/data/distros.yml index 0f5718645..cc5275298 100644 --- a/.github/data/distros.yml +++ b/.github/data/distros.yml @@ -104,7 +104,7 @@ include: dnf remove -y json-c-devel packages: &fedora_packages type: rpm - repo_distro: fedora/36 + repo_distro: fedora/37 arches: - x86_64 - aarch64 diff --git a/CHANGELOG.md b/CHANGELOG.md index a46b77108..7619153da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,22 @@ # Changelog +## [v1.37.1](https://github.com/netdata/netdata/tree/v1.37.1) (2022-12-05) + +[Full Changelog](https://github.com/netdata/netdata/compare/v1.37.0...v1.37.1) + +**Merged pull requests:** + +- fix v1.37 dbengine page alignment crashes [\#14086](https://github.com/netdata/netdata/pull/14086) ([ktsaou](https://github.com/ktsaou)) +- Fix \_\_atomic\_compare\_exchange\_n\(\) atomics [\#14085](https://github.com/netdata/netdata/pull/14085) ([ktsaou](https://github.com/ktsaou)) +- Fix 1.37 crashes [\#14081](https://github.com/netdata/netdata/pull/14081) ([stelfrag](https://github.com/stelfrag)) +- add basic dashboard info for NGINX Plus [\#14080](https://github.com/netdata/netdata/pull/14080) ([ilyam8](https://github.com/ilyam8)) +- replication fixes 9 [\#14079](https://github.com/netdata/netdata/pull/14079) ([ktsaou](https://github.com/ktsaou)) +- optimize workers statistics performance [\#14077](https://github.com/netdata/netdata/pull/14077) ([ktsaou](https://github.com/ktsaou)) +- fix SSL related crashes [\#14076](https://github.com/netdata/netdata/pull/14076) ([ktsaou](https://github.com/ktsaou)) +- remove python.d/springboot [\#14075](https://github.com/netdata/netdata/pull/14075) ([ilyam8](https://github.com/ilyam8)) +- fix backfilling statistics [\#14074](https://github.com/netdata/netdata/pull/14074) ([ktsaou](https://github.com/ktsaou)) +- Add workflow dispatch trigger for parent/child with cloud integration smoke tests [\#14070](https://github.com/netdata/netdata/pull/14070) ([dimko](https://github.com/dimko)) + ## [v1.37.0](https://github.com/netdata/netdata/tree/v1.37.0) (2022-11-30) [Full Changelog](https://github.com/netdata/netdata/compare/v1.36.1...v1.37.0) @@ -298,7 +315,6 @@ - Add Fedora 37 to CI and package builds. [\#13489](https://github.com/netdata/netdata/pull/13489) ([Ferroin](https://github.com/Ferroin)) - Overhaul handling of installation of Netdata as a system service. [\#13451](https://github.com/netdata/netdata/pull/13451) ([Ferroin](https://github.com/Ferroin)) - reduce memcpy and memory usage on mqtt5 [\#13450](https://github.com/netdata/netdata/pull/13450) ([underhood](https://github.com/underhood)) -- Remove Alpine 3.13 from CI and official support. [\#13415](https://github.com/netdata/netdata/pull/13415) ([Ferroin](https://github.com/Ferroin)) ## [v1.36.1](https://github.com/netdata/netdata/tree/v1.36.1) (2022-08-15) @@ -347,13 +363,6 @@ - Dont duplicate buffered bytes [\#13435](https://github.com/netdata/netdata/pull/13435) ([vlvkobal](https://github.com/vlvkobal)) - Show last 15 alerts in notification [\#13434](https://github.com/netdata/netdata/pull/13434) ([MrZammler](https://github.com/MrZammler)) - Query queue only for queries [\#13431](https://github.com/netdata/netdata/pull/13431) ([underhood](https://github.com/underhood)) -- Remove octopus from demo-sites [\#13423](https://github.com/netdata/netdata/pull/13423) ([cakrit](https://github.com/cakrit)) -- Tiering statistics API endpoint [\#13420](https://github.com/netdata/netdata/pull/13420) ([ktsaou](https://github.com/ktsaou)) -- add discord, youtube, linkedin links to README [\#13419](https://github.com/netdata/netdata/pull/13419) ([andrewm4894](https://github.com/andrewm4894)) -- add ML bullet point to features section on README [\#13418](https://github.com/netdata/netdata/pull/13418) ([andrewm4894](https://github.com/andrewm4894)) -- Set value to SN\_EMPTY\_SLOT if flags is SN\_EMPTY\_SLOT [\#13417](https://github.com/netdata/netdata/pull/13417) ([MrZammler](https://github.com/MrZammler)) -- Add missing comma \(handle coverity warning CID 379360\) [\#13413](https://github.com/netdata/netdata/pull/13413) ([stelfrag](https://github.com/stelfrag)) -- codacy/lgtm ignore judy sources [\#13411](https://github.com/netdata/netdata/pull/13411) ([underhood](https://github.com/underhood)) ## [v1.35.1](https://github.com/netdata/netdata/tree/v1.35.1) (2022-06-10) diff --git a/collectors/diskspace.plugin/plugin_diskspace.c b/collectors/diskspace.plugin/plugin_diskspace.c index 5f610983b..e806a3360 100644 --- a/collectors/diskspace.plugin/plugin_diskspace.c +++ b/collectors/diskspace.plugin/plugin_diskspace.c @@ -320,7 +320,7 @@ static inline void do_disk_space_stats(struct mountinfo *mi, int update_every) { , SIMPLE_PATTERN_EXACT ); - dict_mountpoints = dictionary_create(DICT_OPTION_SINGLE_THREADED); + dict_mountpoints = dictionary_create(DICT_OPTION_NONE); } struct mount_point_metadata *m = dictionary_get(dict_mountpoints, mi->mount_point); diff --git a/collectors/proc.plugin/ipc.c b/collectors/proc.plugin/ipc.c index 9185894eb..7d3d2ecbb 100644 --- a/collectors/proc.plugin/ipc.c +++ b/collectors/proc.plugin/ipc.c @@ -195,7 +195,7 @@ int ipc_msq_get_info(char *msg_filename, struct message_queue **message_queue_ro size_t words = 0; if(unlikely(lines < 2)) { - error("Cannot read %s. Expected 2 or more lines, read %zu.", ff->filename, lines); + error("Cannot read %s. Expected 2 or more lines, read %zu.", procfile_filename(ff), lines); return 1; } @@ -205,7 +205,7 @@ int ipc_msq_get_info(char *msg_filename, struct message_queue **message_queue_ro words = procfile_linewords(ff, l); if(unlikely(words < 2)) continue; if(unlikely(words < 14)) { - error("Cannot read %s line. Expected 14 params, read %zu.", ff->filename, words); + error("Cannot read %s line. Expected 14 params, read %zu.", procfile_filename(ff), words); continue; } @@ -250,7 +250,7 @@ int ipc_shm_get_info(char *shm_filename, struct shm_stats *shm) { size_t words = 0; if(unlikely(lines < 2)) { - error("Cannot read %s. Expected 2 or more lines, read %zu.", ff->filename, lines); + error("Cannot read %s. Expected 2 or more lines, read %zu.", procfile_filename(ff), lines); return 1; } @@ -263,7 +263,7 @@ int ipc_shm_get_info(char *shm_filename, struct shm_stats *shm) { words = procfile_linewords(ff, l); if(unlikely(words < 2)) continue; if(unlikely(words < 16)) { - error("Cannot read %s line. Expected 16 params, read %zu.", ff->filename, words); + error("Cannot read %s line. Expected 16 params, read %zu.", procfile_filename(ff), words); continue; } diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c index 53fd6c45a..a4e9d321f 100644 --- a/daemon/global_statistics.c +++ b/daemon/global_statistics.c @@ -12,9 +12,10 @@ #define WORKER_JOB_STRINGS 5 #define WORKER_JOB_DICTIONARIES 6 #define WORKER_JOB_MALLOC_TRACE 7 +#define WORKER_JOB_SQLITE3 8 -#if WORKER_UTILIZATION_MAX_JOB_TYPES < 8 -#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 8 +#if WORKER_UTILIZATION_MAX_JOB_TYPES < 9 +#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 9 #endif bool global_statistics_enabled = true; @@ -60,21 +61,6 @@ static struct global_statistics { uint64_t db_points_stored_per_tier[RRD_STORAGE_TIERS]; - uint64_t sqlite3_queries_made; - uint64_t sqlite3_queries_ok; - uint64_t sqlite3_queries_failed; - uint64_t sqlite3_queries_failed_busy; - uint64_t sqlite3_queries_failed_locked; - uint64_t sqlite3_rows; - uint64_t sqlite3_metadata_cache_hit; - uint64_t sqlite3_context_cache_hit; - uint64_t sqlite3_metadata_cache_miss; - uint64_t sqlite3_context_cache_miss; - uint64_t sqlite3_metadata_cache_spill; - uint64_t sqlite3_context_cache_spill; - uint64_t sqlite3_metadata_cache_write; - uint64_t sqlite3_context_cache_write; - } global_statistics = { .connected_clients = 0, .web_requests = 0, @@ -112,27 +98,6 @@ void global_statistics_backfill_query_completed(size_t points_read) { __atomic_fetch_add(&global_statistics.backfill_db_points_read, points_read, __ATOMIC_RELAXED); } -void global_statistics_sqlite3_query_completed(bool success, bool busy, bool locked) { - __atomic_fetch_add(&global_statistics.sqlite3_queries_made, 1, __ATOMIC_RELAXED); - - if(success) { - __atomic_fetch_add(&global_statistics.sqlite3_queries_ok, 1, __ATOMIC_RELAXED); - } - else { - __atomic_fetch_add(&global_statistics.sqlite3_queries_failed, 1, __ATOMIC_RELAXED); - - if(busy) - __atomic_fetch_add(&global_statistics.sqlite3_queries_failed_busy, 1, __ATOMIC_RELAXED); - - if(locked) - __atomic_fetch_add(&global_statistics.sqlite3_queries_failed_locked, 1, __ATOMIC_RELAXED); - } -} - -void global_statistics_sqlite3_row_completed(void) { - __atomic_fetch_add(&global_statistics.sqlite3_rows, 1, __ATOMIC_RELAXED); -} - void global_statistics_rrdr_query_completed(size_t queries, uint64_t db_points_read, uint64_t result_points_generated, QUERY_SOURCE query_source) { switch(query_source) { case QUERY_SOURCE_API_DATA: @@ -241,25 +206,6 @@ static inline void global_statistics_copy(struct global_statistics *gs, uint8_t uint64_t n = 0; __atomic_compare_exchange(&global_statistics.web_usec_max, (uint64_t *) &gs->web_usec_max, &n, 1, __ATOMIC_RELAXED, __ATOMIC_RELAXED); } - - gs->sqlite3_queries_made = __atomic_load_n(&global_statistics.sqlite3_queries_made, __ATOMIC_RELAXED); - gs->sqlite3_queries_ok = __atomic_load_n(&global_statistics.sqlite3_queries_ok, __ATOMIC_RELAXED); - gs->sqlite3_queries_failed = __atomic_load_n(&global_statistics.sqlite3_queries_failed, __ATOMIC_RELAXED); - gs->sqlite3_queries_failed_busy = __atomic_load_n(&global_statistics.sqlite3_queries_failed_busy, __ATOMIC_RELAXED); - gs->sqlite3_queries_failed_locked = __atomic_load_n(&global_statistics.sqlite3_queries_failed_locked, __ATOMIC_RELAXED); - gs->sqlite3_rows = __atomic_load_n(&global_statistics.sqlite3_rows, __ATOMIC_RELAXED); - - gs->sqlite3_metadata_cache_hit = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_HIT); - gs->sqlite3_context_cache_hit = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_HIT); - - gs->sqlite3_metadata_cache_miss = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_MISS); - gs->sqlite3_context_cache_miss = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_MISS); - - gs->sqlite3_metadata_cache_spill = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_SPILL); - gs->sqlite3_context_cache_spill = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_SPILL); - - gs->sqlite3_metadata_cache_write = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_WRITE); - gs->sqlite3_context_cache_write = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_WRITE); } static void global_statistics_charts(void) { @@ -707,8 +653,129 @@ static void global_statistics_charts(void) { rrdset_done(st_points_stored); } +} - // ---------------------------------------------------------------- +// ---------------------------------------------------------------------------- +// sqlite3 statistics + +struct sqlite3_statistics { + uint64_t sqlite3_queries_made; + uint64_t sqlite3_queries_ok; + uint64_t sqlite3_queries_failed; + uint64_t sqlite3_queries_failed_busy; + uint64_t sqlite3_queries_failed_locked; + uint64_t sqlite3_rows; + uint64_t sqlite3_metadata_cache_hit; + uint64_t sqlite3_context_cache_hit; + uint64_t sqlite3_metadata_cache_miss; + uint64_t sqlite3_context_cache_miss; + uint64_t sqlite3_metadata_cache_spill; + uint64_t sqlite3_context_cache_spill; + uint64_t sqlite3_metadata_cache_write; + uint64_t sqlite3_context_cache_write; + +} sqlite3_statistics = { }; + +void global_statistics_sqlite3_query_completed(bool success, bool busy, bool locked) { + __atomic_fetch_add(&sqlite3_statistics.sqlite3_queries_made, 1, __ATOMIC_RELAXED); + + if(success) { + __atomic_fetch_add(&sqlite3_statistics.sqlite3_queries_ok, 1, __ATOMIC_RELAXED); + } + else { + __atomic_fetch_add(&sqlite3_statistics.sqlite3_queries_failed, 1, __ATOMIC_RELAXED); + + if(busy) + __atomic_fetch_add(&sqlite3_statistics.sqlite3_queries_failed_busy, 1, __ATOMIC_RELAXED); + + if(locked) + __atomic_fetch_add(&sqlite3_statistics.sqlite3_queries_failed_locked, 1, __ATOMIC_RELAXED); + } +} + +void global_statistics_sqlite3_row_completed(void) { + __atomic_fetch_add(&sqlite3_statistics.sqlite3_rows, 1, __ATOMIC_RELAXED); +} + +static inline void sqlite3_statistics_copy(struct sqlite3_statistics *gs) { + static usec_t last_run = 0; + + gs->sqlite3_queries_made = __atomic_load_n(&sqlite3_statistics.sqlite3_queries_made, __ATOMIC_RELAXED); + gs->sqlite3_queries_ok = __atomic_load_n(&sqlite3_statistics.sqlite3_queries_ok, __ATOMIC_RELAXED); + gs->sqlite3_queries_failed = __atomic_load_n(&sqlite3_statistics.sqlite3_queries_failed, __ATOMIC_RELAXED); + gs->sqlite3_queries_failed_busy = __atomic_load_n(&sqlite3_statistics.sqlite3_queries_failed_busy, __ATOMIC_RELAXED); + gs->sqlite3_queries_failed_locked = __atomic_load_n(&sqlite3_statistics.sqlite3_queries_failed_locked, __ATOMIC_RELAXED); + gs->sqlite3_rows = __atomic_load_n(&sqlite3_statistics.sqlite3_rows, __ATOMIC_RELAXED); + + usec_t timeout = default_rrd_update_every * USEC_PER_SEC + default_rrd_update_every * USEC_PER_SEC / 3; + usec_t now = now_monotonic_usec(); + if(!last_run) + last_run = now; + usec_t delta = now - last_run; + bool query_sqlite3 = delta < timeout; + + if(query_sqlite3 && now_monotonic_usec() - last_run < timeout) + gs->sqlite3_metadata_cache_hit = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_HIT); + else { + gs->sqlite3_metadata_cache_hit = UINT64_MAX; + query_sqlite3 = false; + } + + if(query_sqlite3 && now_monotonic_usec() - last_run < timeout) + gs->sqlite3_context_cache_hit = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_HIT); + else { + gs->sqlite3_context_cache_hit = UINT64_MAX; + query_sqlite3 = false; + } + + if(query_sqlite3 && now_monotonic_usec() - last_run < timeout) + gs->sqlite3_metadata_cache_miss = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_MISS); + else { + gs->sqlite3_metadata_cache_miss = UINT64_MAX; + query_sqlite3 = false; + } + + if(query_sqlite3 && now_monotonic_usec() - last_run < timeout) + gs->sqlite3_context_cache_miss = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_MISS); + else { + gs->sqlite3_context_cache_miss = UINT64_MAX; + query_sqlite3 = false; + } + + if(query_sqlite3 && now_monotonic_usec() - last_run < timeout) + gs->sqlite3_metadata_cache_spill = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_SPILL); + else { + gs->sqlite3_metadata_cache_spill = UINT64_MAX; + query_sqlite3 = false; + } + + if(query_sqlite3 && now_monotonic_usec() - last_run < timeout) + gs->sqlite3_context_cache_spill = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_SPILL); + else { + gs->sqlite3_context_cache_spill = UINT64_MAX; + query_sqlite3 = false; + } + + if(query_sqlite3 && now_monotonic_usec() - last_run < timeout) + gs->sqlite3_metadata_cache_write = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_WRITE); + else { + gs->sqlite3_metadata_cache_write = UINT64_MAX; + query_sqlite3 = false; + } + + if(query_sqlite3 && now_monotonic_usec() - last_run < timeout) + gs->sqlite3_context_cache_write = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_WRITE); + else { + gs->sqlite3_context_cache_write = UINT64_MAX; + query_sqlite3 = false; + } + + last_run = now_monotonic_usec(); +} + +static void sqlite3_statistics_charts(void) { + struct sqlite3_statistics gs; + sqlite3_statistics_copy(&gs); if(gs.sqlite3_queries_made) { static RRDSET *st_sqlite3_queries = NULL; @@ -833,10 +900,17 @@ static void global_statistics_charts(void) { rd_cache_write = rrddim_add(st_sqlite3_cache, "cache_write", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); } - rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_hit, (collected_number)gs.sqlite3_metadata_cache_hit); - rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_miss, (collected_number)gs.sqlite3_metadata_cache_miss); - rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_spill, (collected_number)gs.sqlite3_metadata_cache_spill); - rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_write, (collected_number)gs.sqlite3_metadata_cache_write); + if(gs.sqlite3_metadata_cache_hit != UINT64_MAX) + rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_hit, (collected_number)gs.sqlite3_metadata_cache_hit); + + if(gs.sqlite3_metadata_cache_miss != UINT64_MAX) + rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_miss, (collected_number)gs.sqlite3_metadata_cache_miss); + + if(gs.sqlite3_metadata_cache_spill != UINT64_MAX) + rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_spill, (collected_number)gs.sqlite3_metadata_cache_spill); + + if(gs.sqlite3_metadata_cache_write != UINT64_MAX) + rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_write, (collected_number)gs.sqlite3_metadata_cache_write); rrdset_done(st_sqlite3_cache); } @@ -870,10 +944,17 @@ static void global_statistics_charts(void) { rd_cache_write = rrddim_add(st_sqlite3_cache, "cache_write", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); } - rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_hit, (collected_number)gs.sqlite3_context_cache_hit); - rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_miss, (collected_number)gs.sqlite3_context_cache_miss); - rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_spill, (collected_number)gs.sqlite3_context_cache_spill); - rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_write, (collected_number)gs.sqlite3_context_cache_write); + if(gs.sqlite3_context_cache_hit != UINT64_MAX) + rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_hit, (collected_number)gs.sqlite3_context_cache_hit); + + if(gs.sqlite3_context_cache_miss != UINT64_MAX) + rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_miss, (collected_number)gs.sqlite3_context_cache_miss); + + if(gs.sqlite3_context_cache_spill != UINT64_MAX) + rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_spill, (collected_number)gs.sqlite3_context_cache_spill); + + if(gs.sqlite3_context_cache_write != UINT64_MAX) + rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_write, (collected_number)gs.sqlite3_context_cache_write); rrdset_done(st_sqlite3_cache); } @@ -2322,7 +2403,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) { snprintf(context, RRD_ID_LENGTH_MAX, "netdata.workers.%s.value.%s", wu->name_lowercase, job_name_sanitized); char title[1000 + 1]; - snprintf(title, 1000, "Netdata Workers %s Value of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name)); + snprintf(title, 1000, "Netdata Workers %s value of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name)); wu->per_job_type[i].st = rrdset_create_localhost( "netdata" @@ -2334,7 +2415,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) { , (wu->per_job_type[i].units)?string2str(wu->per_job_type[i].units):"value" , "netdata" , "stats" - , wu->priority + 5 + , wu->priority + 5 + i , localhost->rrd_update_every , RRDSET_TYPE_LINE ); @@ -2378,7 +2459,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) { snprintf(context, RRD_ID_LENGTH_MAX, "netdata.workers.%s.rate.%s", wu->name_lowercase, job_name_sanitized); char title[1000 + 1]; - snprintf(title, 1000, "Netdata Workers %s Rate of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name)); + snprintf(title, 1000, "Netdata Workers %s rate of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name)); wu->per_job_type[i].st = rrdset_create_localhost( "netdata" @@ -2390,7 +2471,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) { , (wu->per_job_type[i].units)?string2str(wu->per_job_type[i].units):"rate" , "netdata" , "stats" - , wu->priority + 5 + , wu->priority + 5 + i , localhost->rrd_update_every , RRDSET_TYPE_LINE ); @@ -2447,21 +2528,37 @@ static void workers_utilization_reset_statistics(struct worker_utilization *wu) } } +#define TASK_STAT_PREFIX "/proc/self/task/" +#define TASK_STAT_SUFFIX "/stat" + static int read_thread_cpu_time_from_proc_stat(pid_t pid __maybe_unused, kernel_uint_t *utime __maybe_unused, kernel_uint_t *stime __maybe_unused) { #ifdef __linux__ - char filename[200 + 1]; - snprintfz(filename, 200, "/proc/self/task/%d/stat", pid); + static char filename[sizeof(TASK_STAT_PREFIX) + sizeof(TASK_STAT_SUFFIX) + 20] = TASK_STAT_PREFIX; + static size_t start_pos = sizeof(TASK_STAT_PREFIX) - 1; + static procfile *ff = NULL; - procfile *ff = procfile_open(filename, " ", PROCFILE_FLAG_NO_ERROR_ON_FILE_IO); - if(!ff) return -1; + // construct the filename + size_t end_pos = snprintfz(&filename[start_pos], 20, "%d", pid); + strcpy(&filename[start_pos + end_pos], TASK_STAT_SUFFIX); + // (re)open the procfile to the new filename + bool set_quotes = (ff == NULL) ? true : false; + ff = procfile_reopen(ff, filename, NULL, PROCFILE_FLAG_DEFAULT); + if(unlikely(!ff)) return -1; + + if(set_quotes) + procfile_set_open_close(ff, "(", ")"); + + // read the entire file and split it to lines and words ff = procfile_readall(ff); - if(!ff) return -1; + if(unlikely(!ff)) return -1; + // parse the numbers we are interested *utime = str2kernel_uint_t(procfile_lineword(ff, 0, 13)); *stime = str2kernel_uint_t(procfile_lineword(ff, 0, 14)); - procfile_close(ff); + // leave the file open for the next iteration + return 0; #else // TODO: add here cpu time detection per thread, for FreeBSD and MacOS @@ -2474,8 +2571,6 @@ static int read_thread_cpu_time_from_proc_stat(pid_t pid __maybe_unused, kernel_ static Pvoid_t workers_by_pid_JudyL_array = NULL; static void workers_threads_cleanup(struct worker_utilization *wu) { - netdata_thread_disable_cancelability(); - struct worker_thread *t = wu->threads; while(t) { struct worker_thread *next = t->next; @@ -2487,8 +2582,6 @@ static void workers_threads_cleanup(struct worker_utilization *wu) { } t = next; } - - netdata_thread_enable_cancelability(); } static struct worker_thread *worker_thread_find(struct worker_utilization *wu __maybe_unused, pid_t pid) { @@ -2621,16 +2714,20 @@ static void worker_utilization_charts(void) { static size_t iterations = 0; iterations++; - int i; - for(i = 0; all_workers_utilization[i].name ;i++) { + for(int i = 0; all_workers_utilization[i].name ;i++) { workers_utilization_reset_statistics(&all_workers_utilization[i]); + + netdata_thread_disable_cancelability(); workers_foreach(all_workers_utilization[i].name, worker_utilization_charts_callback, &all_workers_utilization[i]); + netdata_thread_enable_cancelability(); // skip the first iteration, so that we don't accumulate startup utilization to our charts if(likely(iterations > 1)) workers_utilization_update_chart(&all_workers_utilization[i]); + netdata_thread_disable_cancelability(); workers_threads_cleanup(&all_workers_utilization[i]); + netdata_thread_enable_cancelability(); } workers_total_cpu_utilization_chart(); @@ -2672,11 +2769,12 @@ static void global_statistics_register_workers(void) { worker_register("STATS"); worker_register_job_name(WORKER_JOB_GLOBAL, "global"); worker_register_job_name(WORKER_JOB_REGISTRY, "registry"); - worker_register_job_name(WORKER_JOB_WORKERS, "workers"); worker_register_job_name(WORKER_JOB_DBENGINE, "dbengine"); worker_register_job_name(WORKER_JOB_STRINGS, "strings"); worker_register_job_name(WORKER_JOB_DICTIONARIES, "dictionaries"); worker_register_job_name(WORKER_JOB_MALLOC_TRACE, "malloc_trace"); + worker_register_job_name(WORKER_JOB_WORKERS, "workers"); + worker_register_job_name(WORKER_JOB_SQLITE3, "sqlite3"); } static void global_statistics_cleanup(void *ptr) @@ -2719,6 +2817,9 @@ void *global_statistics_main(void *ptr) worker_is_busy(WORKER_JOB_GLOBAL); global_statistics_charts(); + worker_is_busy(WORKER_JOB_SQLITE3); + sqlite3_statistics_charts(); + worker_is_busy(WORKER_JOB_REGISTRY); registry_statistics(); diff --git a/daemon/service.c b/daemon/service.c index a7db7ceb7..6db2ef69f 100644 --- a/daemon/service.c +++ b/daemon/service.c @@ -46,19 +46,19 @@ static void svc_rrddim_obsolete_to_archive(RRDDIM *rd) { /* only a collector can mark a chart as obsolete, so we must remove the reference */ - size_t tiers_available = 0, tiers_said_yes = 0; + size_t tiers_available = 0, tiers_said_no_retention = 0; for(size_t tier = 0; tier < storage_tiers ;tier++) { if(rd->tiers[tier]) { tiers_available++; if(rd->tiers[tier]->collect_ops->finalize(rd->tiers[tier]->db_collection_handle)) - tiers_said_yes++; + tiers_said_no_retention++; rd->tiers[tier]->db_collection_handle = NULL; } } - if (tiers_available == tiers_said_yes && tiers_said_yes) { + if (tiers_available == tiers_said_no_retention && tiers_said_no_retention) { /* This metric has no data and no references */ metaqueue_delete_dimension_uuid(&rd->metric_uuid); } diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index d65cb35a5..4f5da7084 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -524,6 +524,14 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d } rrdeng_page_descr_mutex_unlock(ctx, descr); + while (unlikely(pg_cache_descr->flags & RRD_PAGE_READ_PENDING)) { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "%s: Found page with READ PENDING, waiting for read to complete", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr, "", true); + pg_cache_wait_event_unsafe(descr); + } + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { /* only after locking can it be safely deleted from LRU */ pg_cache_replaceQ_delete(ctx, descr); @@ -1196,7 +1204,6 @@ struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_insta page_index->refcount = 0; page_index->writers = 0; page_index->ctx = ctx; - page_index->alignment = NULL; page_index->latest_update_every_s = default_rrd_update_every; return page_index; diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index 2f4d6b332..635b02123 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -112,7 +112,6 @@ struct pg_cache_page_index { usec_t latest_time_ut; struct rrdengine_instance *ctx; - struct pg_alignment *alignment; uint32_t latest_update_every_s; struct pg_cache_page_index *prev; diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index e4cd37e98..a6840f38c 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -272,9 +272,19 @@ static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) } } +struct rrdeng_page_descr *get_descriptor(struct pg_cache_page_index *page_index, time_t start_time_s) +{ + uv_rwlock_rdlock(&page_index->lock); + Pvoid_t *PValue = JudyLGet(page_index->JudyL_array, start_time_s, PJE0); + struct rrdeng_page_descr *descr = unlikely(NULL == PValue) ? NULL : *PValue; + uv_rwlock_rdunlock(&page_index->lock); + return descr; +}; + static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed) { struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; int ret; @@ -365,19 +375,30 @@ after_crc_check: } } + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, xt_io_descr->descr_array[0]->id, sizeof(uuid_t)); + struct pg_cache_page_index *page_index = likely( NULL != PValue) ? *PValue : NULL; + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + + for (i = 0, page_offset = 0; i < count; page_offset += header->descr[i++].page_length) { uint8_t is_prefetched_page; descr = NULL; for (j = 0 ; j < xt_io_descr->descr_count; ++j) { - struct rrdeng_page_descr *descrj; + struct rrdeng_page_descr descrj; - descrj = xt_io_descr->descr_array[j]; + descrj = xt_io_descr->descr_read_array[j]; /* care, we don't hold the descriptor mutex */ - if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj->id) && - header->descr[i].page_length == descrj->page_length && - header->descr[i].start_time_ut == descrj->start_time_ut && - header->descr[i].end_time_ut == descrj->end_time_ut) { - descr = descrj; + if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj.id) && + header->descr[i].page_length == descrj.page_length && + header->descr[i].start_time_ut == descrj.start_time_ut && + header->descr[i].end_time_ut == descrj.end_time_ut) { + //descr = descrj; + descr = get_descriptor(page_index, (time_t) (descrj.start_time_ut / USEC_PER_SEC)); + if (unlikely(!descr)) { + error_limit_static_thread_var(erl, 1, 0); + error_limit(&erl, "%s: Required descriptor is not in the page index anymore", __FUNCTION__); + } break; } } @@ -506,6 +527,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc, pg_cache_descr->flags |= RRD_PAGE_READ_PENDING; rrdeng_page_descr_mutex_unlock(ctx, descr[i]); xt_io_descr->descr_array[i] = descr[i]; + xt_io_descr->descr_read_array[i] = *(descr[i]); } xt_io_descr->descr_count = count; xt_io_descr->file = datafile->file; diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index fedadbe86..521d2521a 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -41,6 +41,7 @@ struct rrdeng_collect_handle { unsigned long page_correlation_id; // set to 1 when this dimension is not page aligned with the other dimensions in the chart uint8_t unaligned_page; + struct pg_alignment *alignment; }; struct rrdeng_query_handle { @@ -117,6 +118,7 @@ struct extent_io_descriptor { unsigned descr_count; int release_descr; struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT]; + struct rrdeng_page_descr descr_read_array[MAX_PAGES_PER_EXTENT]; Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */ }; diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 27503baee..4525b041f 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -39,21 +39,35 @@ uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1; // ---------------------------------------------------------------------------- // metrics groups +static inline void rrdeng_page_alignment_acquire(struct pg_alignment *pa) { + if(unlikely(!pa)) return; + __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST); +} + +static inline bool rrdeng_page_alignment_release(struct pg_alignment *pa) { + if(unlikely(!pa)) return true; + + if(__atomic_sub_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST) == 0) { + freez(pa); + return true; + } + + return false; +} + +// charts call this STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) { - return callocz(1, sizeof(struct pg_alignment)); + struct pg_alignment *pa = callocz(1, sizeof(struct pg_alignment)); + rrdeng_page_alignment_acquire(pa); + return (STORAGE_METRICS_GROUP *)pa; } -void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) { - if(!smg) return; +// charts call this +void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg) { + if(unlikely(!smg)) return; - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; struct pg_alignment *pa = (struct pg_alignment *)smg; - struct page_cache *pg_cache = &ctx->pg_cache; - - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - if(pa->refcount == 0) - freez(pa); - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + rrdeng_page_alignment_release(pa); } // ---------------------------------------------------------------------------- @@ -93,10 +107,10 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu memcpy(ret_uuid, hash_value, sizeof(uuid_t)); } -STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg) { +STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id) { uuid_t legacy_uuid; rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid); - return rrdeng_metric_get(db_instance, &legacy_uuid, smg); + return rrdeng_metric_get(db_instance, &legacy_uuid); } // ---------------------------------------------------------------------------- @@ -105,11 +119,7 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, c void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) { struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; - unsigned short refcount = __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); - if(refcount == 0 && page_index->alignment) { - __atomic_sub_fetch(&page_index->alignment->refcount, 1, __ATOMIC_SEQ_CST); - page_index->alignment = NULL; - } + __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); } STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) { @@ -118,9 +128,8 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle return db_metric_handle; } -STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) { +STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid) { struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; - struct pg_alignment *pa = (struct pg_alignment *)smg; struct page_cache *pg_cache = &ctx->pg_cache; struct pg_cache_page_index *page_index = NULL; @@ -130,27 +139,16 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t * page_index = *PValue; uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (likely(page_index)) { + if (likely(page_index)) __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); - if(pa) { - if(page_index->alignment && page_index->alignment != pa && page_index->writers > 0) - fatal("DBENGINE: page_index has a different alignment (page_index refcount is %u, writers is %u).", - page_index->refcount, page_index->writers); - - page_index->alignment = pa; - __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST); - } - } - return (STORAGE_METRIC_HANDLE *)page_index; } -STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) { +STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) { internal_fatal(!db_instance, "DBENGINE: db_instance is NULL"); struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; - struct pg_alignment *pa = (struct pg_alignment *)smg; struct pg_cache_page_index *page_index; struct page_cache *pg_cache = &ctx->pg_cache; @@ -160,28 +158,25 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_ *PValue = page_index = create_page_index(uuid, ctx); page_index->prev = pg_cache->metrics_index.last_page_index; pg_cache->metrics_index.last_page_index = page_index; - page_index->alignment = pa; page_index->refcount = 1; - if(pa) - pa->refcount++; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); return (STORAGE_METRIC_HANDLE *)page_index; } -STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) { +STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance) { STORAGE_METRIC_HANDLE *db_metric_handle; - db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid, smg); + db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid); if(!db_metric_handle) { - db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset), smg); + db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset)); if(db_metric_handle) { struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; uuid_copy(rd->metric_uuid, page_index->id); } } if(!db_metric_handle) - db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid, smg); + db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid); #ifdef NETDATA_INTERNAL_CHECKS struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; @@ -210,19 +205,19 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE * Gets a handle for storing metrics to the database. * The handle must be released with rrdeng_store_metric_final(). */ -STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every) { +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) { struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; struct rrdeng_collect_handle *handle; - if(!page_index->alignment) - fatal("DBENGINE: metric group is required for collect operations"); - handle = callocz(1, sizeof(struct rrdeng_collect_handle)); handle->page_index = page_index; handle->descr = NULL; handle->unaligned_page = 0; page_index->latest_update_every_s = update_every; + handle->alignment = (struct pg_alignment *)smg; + rrdeng_page_alignment_acquire(handle->alignment); + uv_rwlock_wrlock(&page_index->lock); ++page_index->writers; uv_rwlock_wrunlock(&page_index->lock); @@ -331,18 +326,18 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection } #endif - if (descr->page_length == page_index->alignment->page_length) { + if (descr->page_length == handle->alignment->page_length) { /* this is the leading dimension that defines chart alignment */ perfect_page_alignment = 1; } /* is the metric far enough out of alignment with the others? */ - if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < page_index->alignment->page_length)) { + if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < handle->alignment->page_length)) { handle->unaligned_page = 1; print_page_cache_descr(descr, "Metric page is not aligned with chart", true); } if (unlikely(handle->unaligned_page && /* did the other metrics change page? */ - page_index->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) { + handle->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) { print_page_cache_descr(descr, "must_flush_unaligned_page = 1", true); must_flush_unaligned_page = 1; handle->unaligned_page = 0; @@ -365,7 +360,7 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1); - if (0 == page_index->alignment->page_length) { + if (0 == handle->alignment->page_length) { /* this is the leading dimension that defines chart alignment */ perfect_page_alignment = 1; } @@ -403,7 +398,7 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection pg_cache_atomic_set_pg_info(descr, point_in_time_ut, descr->page_length + PAGE_POINT_SIZE_BYTES(descr)); if (perfect_page_alignment) - page_index->alignment->page_length = descr->page_length; + handle->alignment->page_length = descr->page_length; if (unlikely(INVALID_TIME == descr->start_time_ut)) { unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers; descr->start_time_ut = point_in_time_ut; @@ -530,10 +525,13 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { rrdeng_store_metric_flush_current_page(collection_handle); uv_rwlock_wrlock(&page_index->lock); - if (!--page_index->writers && !page_index->page_count) { + + if (!--page_index->writers && !page_index->page_count) can_delete_metric = 1; - } + uv_rwlock_wrunlock(&page_index->lock); + + rrdeng_page_alignment_release(handle->alignment); freez(handle); return can_delete_metric; diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index 85375044f..3acee4ec6 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -42,14 +42,14 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu uuid_t *ret_uuid); -STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); -STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg); -STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg); -STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg); +STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance); +STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id); void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle); STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle); -STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every); +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle); void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE n, diff --git a/database/ram/rrddim_mem.c b/database/ram/rrddim_mem.c index 43f32350b..299b6557a 100644 --- a/database/ram/rrddim_mem.c +++ b/database/ram/rrddim_mem.c @@ -22,8 +22,8 @@ void rrddim_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused, // RRDDIM legacy data collection functions STORAGE_METRIC_HANDLE * -rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg __maybe_unused) { - STORAGE_METRIC_HANDLE *t = rrddim_metric_get(db_instance, &rd->metric_uuid, smg); +rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance __maybe_unused) { + STORAGE_METRIC_HANDLE *t = rrddim_metric_get(db_instance, &rd->metric_uuid); if(!t) { netdata_rwlock_wrlock(&rrddim_JudyHS_rwlock); Pvoid_t *PValue = JudyHSIns(&rrddim_JudyHS_array, &rd->metric_uuid, sizeof(uuid_t), PJE0); @@ -40,7 +40,7 @@ rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance __maybe_un } STORAGE_METRIC_HANDLE * -rrddim_metric_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid, STORAGE_METRICS_GROUP *smg __maybe_unused) { +rrddim_metric_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid) { RRDDIM *rd = NULL; netdata_rwlock_rdlock(&rrddim_JudyHS_rwlock); Pvoid_t *PValue = JudyHSGet(rrddim_JudyHS_array, uuid, sizeof(uuid_t)); @@ -67,7 +67,7 @@ void rrddim_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *col rrddim_store_metric_flush(collection_handle); } -STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every __maybe_unused) { +STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every __maybe_unused, STORAGE_METRICS_GROUP *smg __maybe_unused) { RRDDIM *rd = (RRDDIM *)db_metric_handle; rd->db[rd->rrdset->current_entry] = pack_storage_number(NAN, SN_FLAG_NONE); struct mem_collect_handle *ch = callocz(1, sizeof(struct mem_collect_handle)); diff --git a/database/ram/rrddim_mem.h b/database/ram/rrddim_mem.h index 297388f51..79c59f110 100644 --- a/database/ram/rrddim_mem.h +++ b/database/ram/rrddim_mem.h @@ -20,15 +20,15 @@ struct mem_query_handle { size_t last_slot; }; -STORAGE_METRIC_HANDLE *rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); -STORAGE_METRIC_HANDLE *rrddim_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg); +STORAGE_METRIC_HANDLE *rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance); +STORAGE_METRIC_HANDLE *rrddim_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); STORAGE_METRIC_HANDLE *rrddim_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle); void rrddim_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle); STORAGE_METRICS_GROUP *rrddim_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); void rrddim_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); -STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every); +STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); void rrddim_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number, NETDATA_DOUBLE min_value, diff --git a/database/rrd.h b/database/rrd.h index f071ee254..0796ff901 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -409,7 +409,7 @@ typedef struct storage_query_handle STORAGE_QUERY_HANDLE; // function pointers that handle data collection struct storage_engine_collect_ops { // an initialization function to run before starting collection - STORAGE_COLLECT_HANDLE *(*init)(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every); + STORAGE_COLLECT_HANDLE *(*init)(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); // run this to store each metric into the database void (*store_metric)(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number, NETDATA_DOUBLE min_value, @@ -464,8 +464,8 @@ typedef struct storage_engine STORAGE_ENGINE; // function pointers for all APIs provided by a storage engine typedef struct storage_engine_api { // metric management - STORAGE_METRIC_HANDLE *(*metric_get)(STORAGE_INSTANCE *instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg); - STORAGE_METRIC_HANDLE *(*metric_get_or_create)(RRDDIM *rd, STORAGE_INSTANCE *instance, STORAGE_METRICS_GROUP *smg); + STORAGE_METRIC_HANDLE *(*metric_get)(STORAGE_INSTANCE *instance, uuid_t *uuid); + STORAGE_METRIC_HANDLE *(*metric_get_or_create)(RRDDIM *rd, STORAGE_INSTANCE *instance); void (*metric_release)(STORAGE_METRIC_HANDLE *); STORAGE_METRIC_HANDLE *(*metric_dup)(STORAGE_METRIC_HANDLE *); diff --git a/database/rrdcontext.c b/database/rrdcontext.c index cfa8af3e0..3413d1ea8 100644 --- a/database/rrdcontext.c +++ b/database/rrdcontext.c @@ -29,7 +29,7 @@ #define WORKER_JOB_PP_QUEUE_SIZE 13 -typedef enum { +typedef enum __attribute__ ((__packed__)) { RRD_FLAG_NONE = 0, RRD_FLAG_DELETED = (1 << 0), // this is a deleted object (metrics, instances, contexts) RRD_FLAG_COLLECTED = (1 << 1), // this object is currently being collected @@ -115,6 +115,7 @@ typedef enum { static inline void rrd_flag_add_remove_atomic(RRD_FLAGS *flags, RRD_FLAGS check, RRD_FLAGS conditionally_add, RRD_FLAGS always_remove) { RRD_FLAGS expected, desired; + do { expected = *flags; @@ -2435,7 +2436,7 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED if(rm->rrddim && rm->rrddim->tiers[tier] && rm->rrddim->tiers[tier]->db_metric_handle) tier_retention[tier].db_metric_handle = eng->api.metric_dup(rm->rrddim->tiers[tier]->db_metric_handle); else - tier_retention[tier].db_metric_handle = eng->api.metric_get(qtl->host->db[tier].instance, &rm->uuid, NULL); + tier_retention[tier].db_metric_handle = eng->api.metric_get(qtl->host->db[tier].instance, &rm->uuid); if(tier_retention[tier].db_metric_handle) { tier_retention[tier].db_first_time_t = tier_retention[tier].eng->api.query_ops.oldest_time(tier_retention[tier].db_metric_handle); @@ -3253,8 +3254,6 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo "RRDCONTEXT: context '%s' of host '%s', deleted from rrdmetrics dictionary.", string2str(rc->id), rrdhost_hostname(host)); - - fprintf(stderr, "RRDCONTEXT: deleted context '%s'", string2str(rc->id)); } // the item is referenced in the dictionary diff --git a/database/rrddim.c b/database/rrddim.c index 1b3d9952c..2d909a701 100644 --- a/database/rrddim.c +++ b/database/rrddim.c @@ -112,7 +112,7 @@ static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v rd->tiers[tier]->tier_grouping = host->db[tier].tier_grouping; rd->tiers[tier]->collect_ops = &eng->api.collect_ops; rd->tiers[tier]->query_ops = &eng->api.query_ops; - rd->tiers[tier]->db_metric_handle = eng->api.metric_get_or_create(rd, host->db[tier].instance, rd->rrdset->storage_metrics_groups[tier]); + rd->tiers[tier]->db_metric_handle = eng->api.metric_get_or_create(rd, host->db[tier].instance); storage_point_unset(rd->tiers[tier]->virtual_point); initialized++; @@ -131,7 +131,7 @@ static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v size_t initialized = 0; for (size_t tier = 0; tier < storage_tiers; tier++) { if (rd->tiers[tier]) { - rd->tiers[tier]->db_collection_handle = rd->tiers[tier]->collect_ops->init(rd->tiers[tier]->db_metric_handle, st->rrdhost->db[tier].tier_grouping * st->update_every); + rd->tiers[tier]->db_collection_handle = rd->tiers[tier]->collect_ops->init(rd->tiers[tier]->db_metric_handle, st->rrdhost->db[tier].tier_grouping * st->update_every, rd->rrdset->storage_metrics_groups[tier]); initialized++; } } @@ -195,19 +195,19 @@ static void rrddim_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v debug(D_RRD_CALLS, "rrddim_free() %s.%s", rrdset_name(st), rrddim_name(rd)); - size_t tiers_available = 0, tiers_said_yes = 0; + size_t tiers_available = 0, tiers_said_no_retention = 0; for(size_t tier = 0; tier < storage_tiers ;tier++) { if(rd->tiers[tier] && rd->tiers[tier]->db_collection_handle) { tiers_available++; if(rd->tiers[tier]->collect_ops->finalize(rd->tiers[tier]->db_collection_handle)) - tiers_said_yes++; + tiers_said_no_retention++; rd->tiers[tier]->db_collection_handle = NULL; } } - if (tiers_available == tiers_said_yes && tiers_said_yes && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) { + if (tiers_available == tiers_said_no_retention && tiers_said_no_retention && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) { /* This metric has no data and no references */ metaqueue_delete_dimension_uuid(&rd->metric_uuid); } @@ -261,7 +261,7 @@ static bool rrddim_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, for(size_t tier = 0; tier < storage_tiers ;tier++) { if (rd->tiers[tier] && !rd->tiers[tier]->db_collection_handle) rd->tiers[tier]->db_collection_handle = - rd->tiers[tier]->collect_ops->init(rd->tiers[tier]->db_metric_handle, st->rrdhost->db[tier].tier_grouping * st->update_every); + rd->tiers[tier]->collect_ops->init(rd->tiers[tier]->db_metric_handle, st->rrdhost->db[tier].tier_grouping * st->update_every, rd->rrdset->storage_metrics_groups[tier]); } if(rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) { diff --git a/database/sqlite/sqlite_context.c b/database/sqlite/sqlite_context.c index 9c7a61c6e..deca84584 100644 --- a/database/sqlite/sqlite_context.c +++ b/database/sqlite/sqlite_context.c @@ -449,7 +449,9 @@ skip_delete: int sql_context_cache_stats(int op) { int count, dummy; + netdata_thread_disable_cancelability(); sqlite3_db_status(db_context_meta, op, &count, &dummy, 0); + netdata_thread_enable_cancelability(); return count; } diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c index eeb3c3822..ce5487fbf 100644 --- a/database/sqlite/sqlite_functions.c +++ b/database/sqlite/sqlite_functions.c @@ -1263,7 +1263,9 @@ int bind_text_null(sqlite3_stmt *res, int position, const char *text, bool can_b int sql_metadata_cache_stats(int op) { int count, dummy; + netdata_thread_disable_cancelability(); sqlite3_db_status(db_meta, op, &count, &dummy, 0); + netdata_thread_enable_cancelability(); return count; } diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c index f03da25ab..0277e067f 100644 --- a/libnetdata/dictionary/dictionary.c +++ b/libnetdata/dictionary/dictionary.c @@ -3,10 +3,9 @@ #define DICTIONARY_INTERNALS #include "../libnetdata.h" -#include // runtime flags of the dictionary - must be checked with atomics -typedef enum { +typedef enum __attribute__ ((__packed__)) { DICT_FLAG_NONE = 0, DICT_FLAG_DESTROYED = (1 << 0), // this dictionary has been destroyed } DICT_FLAGS; @@ -23,14 +22,14 @@ typedef enum { #define is_view_dictionary(dict) ((dict)->master) #define is_master_dictionary(dict) (!is_view_dictionary(dict)) -typedef enum item_options { +typedef enum __attribute__ ((__packed__)) item_options { ITEM_OPTION_NONE = 0, ITEM_OPTION_ALLOCATED_NAME = (1 << 0), // the name pointer is a STRING // IMPORTANT: This is 1-bit - to add more change ITEM_OPTIONS_BITS } ITEM_OPTIONS; -typedef enum item_flags { +typedef enum __attribute__ ((__packed__)) item_flags { ITEM_FLAG_NONE = 0, ITEM_FLAG_DELETED = (1 << 0), // this item is marked deleted, so it is not available for traversal (deleted from the index too) ITEM_FLAG_BEING_CREATED = (1 << 1), // this item is currently being created - this flag is removed when construction finishes @@ -623,7 +622,7 @@ static void dictionary_execute_delete_callback(DICTIONARY *dict, DICTIONARY_ITEM if(likely(!dict->hooks || !dict->hooks->del_callback)) return; - // We may execute the delete callback on items deleted from a view, + // We may execute delete callback on items deleted from a view, // because we may have references to it, after the master is gone // so, the shared structure will remain until the last reference is released. @@ -782,7 +781,7 @@ static void garbage_collect_pending_deletes(DICTIONARY *dict) { while(item) { examined++; - // this will cleanup + // this will clean up item_next = item->next; int rc = item_check_and_acquire_advanced(dict, item, is_view); @@ -882,7 +881,7 @@ static void item_acquire(DICTIONARY *dict, DICTIONARY_ITEM *item) { static void item_release(DICTIONARY *dict, DICTIONARY_ITEM *item) { // this function may be called without any lock on the dictionary - // or even when someone else has write lock on the dictionary + // or even when someone else has 'write' lock on the dictionary bool is_deleted; REFCOUNT refcount; @@ -934,11 +933,11 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it int ret = RC_ITEM_OK; + refcount = DICTIONARY_ITEM_REFCOUNT_GET(dict, item); + do { spins++; - refcount = DICTIONARY_ITEM_REFCOUNT_GET(dict, item); - if(refcount < 0) { // we can't use this item ret = RC_ITEM_IS_CURRENTLY_BEING_DELETED; @@ -953,8 +952,7 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it desired = refcount + 1; - } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, - false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); + } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); // if ret == ITEM_OK, we acquired the item @@ -971,7 +969,7 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it else pointer_del(dict, item); - // mark it in our dictionary as deleted too + // mark it in our dictionary as deleted too, // this is safe to be done here, because we have got // a reference counter on item dict_item_set_deleted(dict, item); @@ -1013,11 +1011,11 @@ static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY int ret = RC_ITEM_OK; + refcount = DICTIONARY_ITEM_REFCOUNT_GET(dict, item); + do { spins++; - refcount = DICTIONARY_ITEM_REFCOUNT_GET(dict, item); - if(refcount < 0) { // we can't use this item ret = RC_ITEM_IS_CURRENTLY_BEING_DELETED; @@ -1035,8 +1033,7 @@ static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY ret = RC_ITEM_IS_CURRENTLY_BEING_CREATED; break; } - } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, - false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); + } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); #ifdef NETDATA_INTERNAL_CHECKS if(ret == RC_ITEM_OK) @@ -1055,8 +1052,7 @@ static inline bool item_shared_release_and_check_if_it_can_be_freed(DICTIONARY * // if we can set refcount to REFCOUNT_DELETING, we can delete this item REFCOUNT links = __atomic_sub_fetch(&item->shared->links, 1, __ATOMIC_SEQ_CST); - if(links == 0 && __atomic_compare_exchange_n(&item->shared->links, &links, REFCOUNT_DELETING, - false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { + if(links == 0 && __atomic_compare_exchange_n(&item->shared->links, &links, REFCOUNT_DELETING, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { // we can delete it return true; @@ -1430,16 +1426,16 @@ static void dict_item_shared_set_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item static bool dict_item_set_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item) { ITEM_FLAGS expected, desired; + expected = __atomic_load_n(&item->flags, __ATOMIC_SEQ_CST); + do { - expected = __atomic_load_n(&item->flags, __ATOMIC_SEQ_CST); if (expected & ITEM_FLAG_DELETED) return false; desired = expected | ITEM_FLAG_DELETED; - } while(!__atomic_compare_exchange_n(&item->flags, &expected, desired, - false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); + } while(!__atomic_compare_exchange_n(&item->flags, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); DICTIONARY_ENTRIES_MINUS1(dict); return true; @@ -1474,7 +1470,7 @@ static inline void dict_item_free_or_mark_deleted(DICTIONARY *dict, DICTIONARY_I } // this is used by traversal functions to remove the current item -// if it is deleted and it has zero references. This will eliminate +// if it is deleted, and it has zero references. This will eliminate // the need for the garbage collector to kick-in later. // Most deletions happen during traversal, so this is a nice hack // to speed up everything! @@ -1601,7 +1597,7 @@ static DICTIONARY_ITEM *dict_item_add_or_reset_value_and_acquire(DICTIONARY *dic hashtable_inserted_item_unsafe(dict, item); // unlock the index lock, before we add it to the linked list - // DONT DO IT THE OTHER WAY AROUND - DO NOT CROSS THE LOCKS! + // DON'T DO IT THE OTHER WAY AROUND - DO NOT CROSS THE LOCKS! dictionary_index_wrlock_unlock(dict); item_linked_list_add(dict, item); diff --git a/libnetdata/procfile/procfile.c b/libnetdata/procfile/procfile.c index b4ca025ec..eb04316c3 100644 --- a/libnetdata/procfile/procfile.c +++ b/libnetdata/procfile/procfile.c @@ -22,16 +22,21 @@ size_t procfile_max_allocation = PROCFILE_INCREMENT_BUFFER; // ---------------------------------------------------------------------------- char *procfile_filename(procfile *ff) { - if(ff->filename[0]) return ff->filename; + if(ff->filename) + return ff->filename; + char filename[FILENAME_MAX + 1]; char buffer[FILENAME_MAX + 1]; snprintfz(buffer, FILENAME_MAX, "/proc/self/fd/%d", ff->fd); - ssize_t l = readlink(buffer, ff->filename, FILENAME_MAX); + ssize_t l = readlink(buffer, filename, FILENAME_MAX); if(unlikely(l == -1)) - snprintfz(ff->filename, FILENAME_MAX, "unknown filename for fd %d", ff->fd); + snprintfz(filename, FILENAME_MAX, "unknown filename for fd %d", ff->fd); else - ff->filename[l] = '\0'; + filename[l] = '\0'; + + + ff->filename = strdupz(filename); // on non-linux systems, something like this will be needed // fcntl(ff->fd, F_GETPATH, ff->filename) @@ -141,8 +146,9 @@ void procfile_close(procfile *ff) { debug(D_PROCFILE, PF_PREFIX ": Closing file '%s'", procfile_filename(ff)); - if(likely(ff->lines)) procfile_lines_free(ff->lines); - if(likely(ff->words)) procfile_words_free(ff->words); + freez(ff->filename); + procfile_lines_free(ff->lines); + procfile_words_free(ff->words); if(likely(ff->fd != -1)) close(ff->fd); freez(ff); @@ -319,40 +325,31 @@ procfile *procfile_readall(procfile *ff) { return ff; } -NOINLINE -static void procfile_set_separators(procfile *ff, const char *separators) { - static PF_CHAR_TYPE def[256]; - static char initialized = 0; - - if(unlikely(!initialized)) { - // this is thread safe - // if initialized is zero, multiple threads may be executing - // this code at the same time, setting in def[] the exact same values - int i = 256; - while(i--) { - if(unlikely(i == '\n' || i == '\r')) - def[i] = PF_CHAR_IS_NEWLINE; +static PF_CHAR_TYPE procfile_default_separators[256]; +__attribute__((constructor)) void procfile_initialize_default_separators(void) { + int i = 256; + while(i--) { + if(unlikely(i == '\n' || i == '\r')) + procfile_default_separators[i] = PF_CHAR_IS_NEWLINE; - else if(unlikely(isspace(i) || !isprint(i))) - def[i] = PF_CHAR_IS_SEPARATOR; + else if(unlikely(isspace(i) || !isprint(i))) + procfile_default_separators[i] = PF_CHAR_IS_SEPARATOR; - else - def[i] = PF_CHAR_IS_WORD; - } - - initialized = 1; + else + procfile_default_separators[i] = PF_CHAR_IS_WORD; } +} - // copy the default - PF_CHAR_TYPE *ffs = ff->separators, *ffd = def, *ffe = &def[256]; - while(ffd != ffe) - *ffs++ = *ffd++; - +NOINLINE +static void procfile_set_separators(procfile *ff, const char *separators) { // set the separators if(unlikely(!separators)) separators = " \t=|"; - ffs = ff->separators; + // copy the default + memcpy(ff->separators, procfile_default_separators, 256 * sizeof(PF_CHAR_TYPE)); + + PF_CHAR_TYPE *ffs = ff->separators; const char *s = separators; while(*s) ffs[(int)*s++] = PF_CHAR_IS_SEPARATOR; @@ -416,8 +413,7 @@ procfile *procfile_open(const char *filename, const char *separators, uint32_t f procfile *ff = mallocz(sizeof(procfile) + size); //strncpyz(ff->filename, filename, FILENAME_MAX); - ff->filename[0] = '\0'; - + ff->filename = NULL; ff->fd = fd; ff->size = size; ff->len = 0; @@ -449,7 +445,8 @@ procfile *procfile_reopen(procfile *ff, const char *filename, const char *separa // info("PROCFILE: opened '%s' on fd %d", filename, ff->fd); //strncpyz(ff->filename, filename, FILENAME_MAX); - ff->filename[0] = '\0'; + freez(ff->filename); + ff->filename = NULL; ff->flags = flags; // do not do the separators again if NULL is given diff --git a/libnetdata/procfile/procfile.h b/libnetdata/procfile/procfile.h index 5d45e4028..cae4ad484 100644 --- a/libnetdata/procfile/procfile.h +++ b/libnetdata/procfile/procfile.h @@ -37,7 +37,7 @@ typedef struct { #define PROCFILE_FLAG_DEFAULT 0x00000000 #define PROCFILE_FLAG_NO_ERROR_ON_FILE_IO 0x00000001 -typedef enum procfile_separator { +typedef enum __attribute__ ((__packed__)) procfile_separator { PF_CHAR_IS_SEPARATOR, PF_CHAR_IS_NEWLINE, PF_CHAR_IS_WORD, @@ -46,17 +46,16 @@ typedef enum procfile_separator { PF_CHAR_IS_CLOSE } PF_CHAR_TYPE; -typedef struct { - char filename[FILENAME_MAX + 1]; // not populated until profile_filename() is called - +typedef struct procfile { + char *filename; // not populated until procfile_filename() is called uint32_t flags; - int fd; // the file descriptor - size_t len; // the bytes we have placed into data - size_t size; // the bytes we have allocated for data + int fd; // the file descriptor + size_t len; // the bytes we have placed into data + size_t size; // the bytes we have allocated for data pflines *lines; pfwords *words; PF_CHAR_TYPE separators[256]; - char data[]; // allocated buffer to keep file contents + char data[]; // allocated buffer to keep file contents } procfile; // close the proc file and free all related memory diff --git a/libnetdata/socket/security.c b/libnetdata/socket/security.c index f7b44049b..88b3f6d93 100644 --- a/libnetdata/socket/security.c +++ b/libnetdata/socket/security.c @@ -204,31 +204,43 @@ static SSL_CTX * security_initialize_openssl_server() { * NETDATA_SSL_CONTEXT_EXPORTING - Starts the OpenTSDB context */ void security_start_ssl(int selector) { + static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER; + netdata_spinlock_lock(&sp); + switch (selector) { case NETDATA_SSL_CONTEXT_SERVER: { - struct stat statbuf; - if (stat(netdata_ssl_security_key, &statbuf) || stat(netdata_ssl_security_cert, &statbuf)) { - info("To use encryption it is necessary to set \"ssl certificate\" and \"ssl key\" in [web] !\n"); - return; + if(!netdata_ssl_srv_ctx) { + struct stat statbuf; + if (stat(netdata_ssl_security_key, &statbuf) || stat(netdata_ssl_security_cert, &statbuf)) + info("To use encryption it is necessary to set \"ssl certificate\" and \"ssl key\" in [web] !\n"); + else { + netdata_ssl_srv_ctx = security_initialize_openssl_server(); + SSL_CTX_set_mode(netdata_ssl_srv_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE); + } } - - netdata_ssl_srv_ctx = security_initialize_openssl_server(); - SSL_CTX_set_mode(netdata_ssl_srv_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE); break; } + case NETDATA_SSL_CONTEXT_STREAMING: { - netdata_ssl_client_ctx = security_initialize_openssl_client(); - //This is necessary for the stream, because it is working sometimes with nonblock socket. - //It returns the bitmask after to change, there is not any description of errors in the documentation - SSL_CTX_set_mode( - netdata_ssl_client_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE |SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER |SSL_MODE_AUTO_RETRY); + if(!netdata_ssl_client_ctx) { + netdata_ssl_client_ctx = security_initialize_openssl_client(); + //This is necessary for the stream, because it is working sometimes with nonblock socket. + //It returns the bitmask after to change, there is not any description of errors in the documentation + SSL_CTX_set_mode(netdata_ssl_client_ctx, + SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER | + SSL_MODE_AUTO_RETRY); + } break; } + case NETDATA_SSL_CONTEXT_EXPORTING: { - netdata_ssl_exporting_ctx = security_initialize_openssl_client(); + if(!netdata_ssl_exporting_ctx) + netdata_ssl_exporting_ctx = security_initialize_openssl_client(); break; } } + + netdata_spinlock_unlock(&sp); } /** diff --git a/libnetdata/string/string.c b/libnetdata/string/string.c index a3f74b4ef..d2db8aab4 100644 --- a/libnetdata/string/string.c +++ b/libnetdata/string/string.c @@ -71,11 +71,12 @@ void string_statistics(size_t *inserts, size_t *deletes, size_t *searches, size_ static inline bool string_entry_check_and_acquire(STRING *se) { REFCOUNT expected, desired, count = 0; + + expected = __atomic_load_n(&se->refcount, __ATOMIC_SEQ_CST); + do { count++; - expected = __atomic_load_n(&se->refcount, __ATOMIC_SEQ_CST); - if(expected <= 0) { // We cannot use this. // The reference counter reached value zero, @@ -85,8 +86,8 @@ static inline bool string_entry_check_and_acquire(STRING *se) { } desired = expected + 1; - } - while(!__atomic_compare_exchange_n(&se->refcount, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); + + } while(!__atomic_compare_exchange_n(&se->refcount, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); string_internal_stats_add(spins, count - 1); diff --git a/libnetdata/worker_utilization/worker_utilization.c b/libnetdata/worker_utilization/worker_utilization.c index 14b8926e0..afaff209b 100644 --- a/libnetdata/worker_utilization/worker_utilization.c +++ b/libnetdata/worker_utilization/worker_utilization.c @@ -44,35 +44,55 @@ struct worker { struct worker *prev; }; -static netdata_mutex_t workers_base_lock = NETDATA_MUTEX_INITIALIZER; -static __thread struct worker *worker = NULL; -static Pvoid_t workers_per_workname_JudyHS_array = NULL; +struct workers_workname { // this is what we add to JudyHS + SPINLOCK spinlock; + struct worker *base; +}; + +static struct workers_globals { + SPINLOCK spinlock; + Pvoid_t worknames_JudyHS; + +} workers_globals = { // workers globals, the base of all worknames + .spinlock = NETDATA_SPINLOCK_INITIALIZER, // a lock for the worknames index + .worknames_JudyHS = NULL, // the worknames index +}; -void worker_register(const char *workname) { +static __thread struct worker *worker = NULL; // the current thread worker + +void worker_register(const char *name) { if(unlikely(worker)) return; worker = callocz(1, sizeof(struct worker)); worker->pid = gettid(); worker->tag = strdupz(netdata_thread_tag()); - worker->workname = strdupz(workname); + worker->workname = strdupz(name); usec_t now = now_monotonic_usec(); worker->statistics_last_checkpoint = now; worker->last_action_timestamp = now; worker->last_action = WORKER_IDLE; - size_t workname_size = strlen(workname) + 1; - netdata_mutex_lock(&workers_base_lock); + size_t name_size = strlen(name) + 1; + netdata_spinlock_lock(&workers_globals.spinlock); - Pvoid_t *PValue = JudyHSGet(workers_per_workname_JudyHS_array, (void *)workname, workname_size); + Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)name, name_size); if(!PValue) - PValue = JudyHSIns(&workers_per_workname_JudyHS_array, (void *)workname, workname_size, PJE0); + PValue = JudyHSIns(&workers_globals.worknames_JudyHS, (void *)name, name_size, PJE0); + + struct workers_workname *workname = *PValue; + if(!workname) { + workname = mallocz(sizeof(struct workers_workname)); + workname->spinlock = NETDATA_SPINLOCK_INITIALIZER; + workname->base = NULL; + *PValue = workname; + } - struct worker *base = *PValue; - DOUBLE_LINKED_LIST_APPEND_UNSAFE(base, worker, prev, next); - *PValue = base; + netdata_spinlock_lock(&workname->spinlock); + DOUBLE_LINKED_LIST_APPEND_UNSAFE(workname->base, worker, prev, next); + netdata_spinlock_unlock(&workname->spinlock); - netdata_mutex_unlock(&workers_base_lock); + netdata_spinlock_unlock(&workers_globals.spinlock); } void worker_register_job_custom_metric(size_t job_id, const char *name, const char *units, WORKER_METRIC_TYPE type) { @@ -105,17 +125,20 @@ void worker_unregister(void) { if(unlikely(!worker)) return; size_t workname_size = strlen(worker->workname) + 1; - netdata_mutex_lock(&workers_base_lock); - Pvoid_t *PValue = JudyHSGet(workers_per_workname_JudyHS_array, (void *)worker->workname, workname_size); + netdata_spinlock_lock(&workers_globals.spinlock); + Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)worker->workname, workname_size); if(PValue) { - struct worker *base = *PValue; - DOUBLE_LINKED_LIST_REMOVE_UNSAFE(base, worker, prev, next); - *PValue = base; - - if(!base) - JudyHSDel(&workers_per_workname_JudyHS_array, (void *)worker->workname, workname_size, PJE0); + struct workers_workname *workname = *PValue; + netdata_spinlock_lock(&workname->spinlock); + DOUBLE_LINKED_LIST_REMOVE_UNSAFE(workname->base, worker, prev, next); + netdata_spinlock_unlock(&workname->spinlock); + + if(!workname->base) { + JudyHSDel(&workers_globals.worknames_JudyHS, (void *) worker->workname, workname_size, PJE0); + freez(workname); + } } - netdata_mutex_unlock(&workers_base_lock); + netdata_spinlock_unlock(&workers_globals.spinlock); for(int i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES ;i++) { string_freez(worker->per_job_type[i].name); @@ -187,7 +210,7 @@ void worker_set_metric(size_t job_id, NETDATA_DOUBLE value) { // statistics interface -void workers_foreach(const char *workname, void (*callback)( +void workers_foreach(const char *name, void (*callback)( void *data , pid_t pid , const char *thread_tag @@ -203,18 +226,27 @@ void workers_foreach(const char *workname, void (*callback)( , NETDATA_DOUBLE *job_custom_values ) , void *data) { - netdata_mutex_lock(&workers_base_lock); + netdata_spinlock_lock(&workers_globals.spinlock); usec_t busy_time, delta; size_t i, jobs_started, jobs_running; - size_t workname_size = strlen(workname) + 1; - struct worker *base = NULL; - Pvoid_t *PValue = JudyHSGet(workers_per_workname_JudyHS_array, (void *)workname, workname_size); - if(PValue) - base = *PValue; + size_t workname_size = strlen(name) + 1; + struct workers_workname *workname; + Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)name, workname_size); + if(PValue) { + workname = *PValue; + netdata_spinlock_lock(&workname->spinlock); + } + else + workname = NULL; + + netdata_spinlock_unlock(&workers_globals.spinlock); + + if(!workname) + return; struct worker *p; - DOUBLE_LINKED_LIST_FOREACH_FORWARD(base, p, prev, next) { + DOUBLE_LINKED_LIST_FOREACH_FORWARD(workname->base, p, prev, next) { usec_t now = now_monotonic_usec(); // find per job type statistics @@ -326,5 +358,5 @@ void workers_foreach(const char *workname, void (*callback)( ); } - netdata_mutex_unlock(&workers_base_lock); + netdata_spinlock_unlock(&workname->spinlock); } diff --git a/libnetdata/worker_utilization/worker_utilization.h b/libnetdata/worker_utilization/worker_utilization.h index 04d24f1f7..f1412e6b4 100644 --- a/libnetdata/worker_utilization/worker_utilization.h +++ b/libnetdata/worker_utilization/worker_utilization.h @@ -15,7 +15,7 @@ typedef enum { WORKER_METRIC_INCREMENTAL_TOTAL = 4, } WORKER_METRIC_TYPE; -void worker_register(const char *workname); +void worker_register(const char *name); void worker_register_job_name(size_t job_id, const char *name); void worker_register_job_custom_metric(size_t job_id, const char *name, const char *units, WORKER_METRIC_TYPE type); void worker_unregister(void); @@ -26,7 +26,7 @@ void worker_set_metric(size_t job_id, NETDATA_DOUBLE value); // statistics interface -void workers_foreach(const char *workname, void (*callback)( +void workers_foreach(const char *name, void (*callback)( void *data , pid_t pid , const char *thread_tag diff --git a/packaging/version b/packaging/version index b909b32cd..094f172cf 100644 --- a/packaging/version +++ b/packaging/version @@ -1 +1 @@ -v1.37.0 +v1.37.1 diff --git a/streaming/replication.c b/streaming/replication.c index 8fa501061..d659d701d 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -4,7 +4,7 @@ #include "Judy.h" #define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 -#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20 +#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 #define WORKER_JOB_FIND_NEXT 1 @@ -14,17 +14,17 @@ #define WORKER_JOB_CHECK_CONSISTENCY 5 #define WORKER_JOB_BUFFER_COMMIT 6 #define WORKER_JOB_CLEANUP 7 +#define WORKER_JOB_WAIT 8 // master thread worker jobs -#define WORKER_JOB_STATISTICS 8 -#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 9 -#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 10 -#define WORKER_JOB_CUSTOM_METRIC_ADDED 11 -#define WORKER_JOB_CUSTOM_METRIC_DONE 12 -#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 13 -#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 14 -#define WORKER_JOB_CUSTOM_METRIC_WAITS 15 -#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16 +#define WORKER_JOB_STATISTICS 9 +#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 10 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 11 +#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 12 +#define WORKER_JOB_CUSTOM_METRIC_ADDED 13 +#define WORKER_JOB_CUSTOM_METRIC_DONE 14 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 15 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 16 #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30 #define SECONDS_TO_RESET_POINT_IN_TIME 10 @@ -591,6 +591,7 @@ struct replication_request { Word_t unique_id; // auto-increment, later requests have bigger bool found; // used as a result boolean for the find call bool indexed_in_judy; // true when the request is indexed in judy + bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full }; // replication sort entry in JudyL array @@ -605,7 +606,7 @@ struct replication_sort_entry { // the global variables for the replication thread static struct replication_thread { - netdata_mutex_t mutex; + SPINLOCK spinlock; struct { size_t pending; // number of requests pending in the queue @@ -614,9 +615,8 @@ static struct replication_thread { // statistics size_t added; // number of requests added to the queue size_t removed; // number of requests removed from the queue - size_t skipped_not_connected; // number of requests skipped, because the sender is not connected to a parent - size_t skipped_no_room; // number of requests skipped, because the sender has no room for responses -// size_t skipped_no_room_since_last_reset; + size_t pending_no_room; // number of requests skipped, because the sender has no room for responses + size_t senders_full; // number of times a sender reset our last position in the queue size_t sender_resets; // number of times a sender reset our last position in the queue time_t first_time_t; // the minimum 'after' we encountered @@ -634,7 +634,6 @@ static struct replication_thread { } atomic; // access should be with atomic operations struct { - size_t waits; size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time netdata_thread_t **threads_ptrs; @@ -642,17 +641,16 @@ static struct replication_thread { } main_thread; // access is allowed only by the main thread } replication_globals = { - .mutex = NETDATA_MUTEX_INITIALIZER, + .spinlock = NETDATA_SPINLOCK_INITIALIZER, .unsafe = { .pending = 0, .unique_id = 0, .added = 0, .removed = 0, - .skipped_not_connected = 0, - .skipped_no_room = 0, -// .skipped_no_room_since_last_reset = 0, + .pending_no_room = 0, .sender_resets = 0, + .senders_full = 0, .first_time_t = 0, @@ -667,7 +665,6 @@ static struct replication_thread { .latest_first_time = 0, }, .main_thread = { - .waits = 0, .last_executed = 0, .threads = 0, .threads_ptrs = NULL, @@ -682,11 +679,11 @@ static inline bool replication_recursive_lock_mode(char mode) { if(mode == 'L') { // (L)ock if(++recursions == 1) - netdata_mutex_lock(&replication_globals.mutex); + netdata_spinlock_lock(&replication_globals.spinlock); } else if(mode == 'U') { // (U)nlock if(--recursions == 0) - netdata_mutex_unlock(&replication_globals.mutex); + netdata_spinlock_unlock(&replication_globals.spinlock); } else if(mode == 'C') { // (C)heck if(recursions > 0) @@ -736,6 +733,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc // save the unique id into the request, to be able to delete it later rq->unique_id = rse->unique_id; rq->indexed_in_judy = false; + rq->not_indexed_buffer_full = false; return rse; } @@ -743,9 +741,20 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { freez(rse); } -static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) { +static void replication_sort_entry_add(struct replication_request *rq) { replication_recursive_lock(); + if(rrdpush_sender_replication_buffer_full_get(rq->sender)) { + rq->indexed_in_judy = false; + rq->not_indexed_buffer_full = true; + replication_globals.unsafe.pending_no_room++; + replication_recursive_unlock(); + return; + } + + if(rq->not_indexed_buffer_full) + replication_globals.unsafe.pending_no_room--; + struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq); // if(rq->after < (time_t)replication_globals.protected.queue.after && @@ -770,13 +779,12 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); *item = rse; rq->indexed_in_judy = true; + rq->not_indexed_buffer_full = false; if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t) replication_globals.unsafe.first_time_t = rq->after; replication_recursive_unlock(); - - return rse; } static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) { @@ -806,7 +814,7 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor return inner_judy_deleted; } -static void replication_sort_entry_del(struct replication_request *rq) { +static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) { Pvoid_t *inner_judy_pptr; struct replication_sort_entry *rse_to_delete = NULL; @@ -819,6 +827,11 @@ static void replication_sort_entry_del(struct replication_request *rq) { if (our_item_pptr) { rse_to_delete = *our_item_pptr; replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr); + + if(buffer_full) { + replication_globals.unsafe.pending_no_room++; + rq->not_indexed_buffer_full = true; + } } } @@ -877,44 +890,17 @@ static struct replication_request replication_request_get_first_available() { while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) { struct replication_sort_entry *rse = *our_item_pptr; struct replication_request *rq = rse->rq; - struct sender_state *s = rq->sender; - - if (likely(rrdpush_sender_get_buffer_used_percent(s) <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) { - // there is room for this request in the sender buffer - - bool sender_is_connected = - rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); - - bool sender_has_been_flushed_since_this_request = - rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s); - if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) { - // skip this request, the sender is not connected, or it has reconnected + // copy the request to return it + rq_to_return = *rq; + rq_to_return.chart_id = string_dup(rq_to_return.chart_id); - replication_globals.unsafe.skipped_not_connected++; - if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) - // we removed the item from the outer JudyL - break; - } - else { - // this request is good to execute + // set the return result to found + rq_to_return.found = true; - // copy the request to return it - rq_to_return = *rq; - rq_to_return.chart_id = string_dup(rq_to_return.chart_id); - - // set the return result to found - rq_to_return.found = true; - - if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) - // we removed the item from the outer JudyL - break; - } - } - else { - replication_globals.unsafe.skipped_no_room++; -// replication_globals.protected.skipped_no_room_since_last_reset++; - } + if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + // we removed the item from the outer JudyL + break; } // call JudyLNext from now on @@ -959,7 +945,20 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ replication_recursive_lock(); - if(!rq->indexed_in_judy) { + if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) { + // we can replace this command + internal_error( + true, + "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), + (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); + + rq->after = rq_new->after; + rq->before = rq_new->before; + rq->start_streaming = rq_new->start_streaming; + } + else if(!rq->indexed_in_judy) { replication_sort_entry_add(rq); internal_error( true, @@ -991,7 +990,13 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma rrdpush_sender_replicating_charts_minus_one(rq->sender); if(rq->indexed_in_judy) - replication_sort_entry_del(rq); + replication_sort_entry_del(rq, false); + + else if(rq->not_indexed_buffer_full) { + replication_recursive_lock(); + replication_globals.unsafe.pending_no_room--; + replication_recursive_unlock(); + } string_freez(rq->chart_id); } @@ -1046,6 +1051,7 @@ static bool replication_execute_request(struct replication_request *rq, bool wor cleanup: string_freez(rq->chart_id); + worker_is_idle(); return ret; } @@ -1060,6 +1066,8 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, .before = before, .start_streaming = start_streaming, .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender), + .indexed_in_judy = false, + .not_indexed_buffer_full = false, }; if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) @@ -1094,15 +1102,36 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer); size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size; - if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) - s->replication.unsafe.reached_max = true; + if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) { + rrdpush_sender_replication_buffer_full_set(s, true); + + struct replication_request *rq; + dfe_start_read(s->replication.requests, rq) { + if(rq->indexed_in_judy && !rq->not_indexed_buffer_full) { + replication_sort_entry_del(rq, true); + } + } + dfe_done(rq); - if(s->replication.unsafe.reached_max && - percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) { - s->replication.unsafe.reached_max = false; replication_recursive_lock(); -// replication_set_next_point_in_time(0, 0); + replication_globals.unsafe.senders_full++; + replication_recursive_unlock(); + } + else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) { + rrdpush_sender_replication_buffer_full_set(s, false); + + struct replication_request *rq; + dfe_start_read(s->replication.requests, rq) { + if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) { + replication_sort_entry_add(rq); + } + } + dfe_done(rq); + + replication_recursive_lock(); + replication_globals.unsafe.senders_full--; replication_globals.unsafe.sender_resets++; + // replication_set_next_point_in_time(0, 0); replication_recursive_unlock(); } @@ -1188,17 +1217,17 @@ static void replication_initialize_workers(bool master) { worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency"); worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit"); worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup"); + worker_register_job_name(WORKER_JOB_WAIT, "wait"); if(master) { worker_register_job_name(WORKER_JOB_STATISTICS, "statistics"); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); - worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); - worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL); - worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE); } } @@ -1210,8 +1239,10 @@ static int replication_execute_next_pending_request(void) { worker_is_busy(WORKER_JOB_FIND_NEXT); struct replication_request rq = replication_request_get_first_available(); - if(unlikely(!rq.found)) + if(unlikely(!rq.found)) { + worker_is_idle(); return REQUEST_QUEUE_EMPTY; + } // delete the request from the dictionary worker_is_busy(WORKER_JOB_DELETE_ENTRY); @@ -1221,9 +1252,12 @@ static int replication_execute_next_pending_request(void) { replication_set_latest_first_time(rq.after); - if(unlikely(!replication_execute_request(&rq, true))) + if(unlikely(!replication_execute_request(&rq, true))) { + worker_is_idle(); return REQUEST_CHART_NOT_FOUND; + } + worker_is_idle(); return REQUEST_OK; } @@ -1238,6 +1272,7 @@ static void *replication_worker_thread(void *ptr) { while(!netdata_exit) { if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + worker_is_busy(WORKER_JOB_WAIT); worker_is_idle(); sleep_usec(1 * USEC_PER_SEC); } @@ -1305,6 +1340,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) { last_now_mono_ut = now_mono_ut; + worker_is_busy(WORKER_JOB_STATISTICS); replication_recursive_lock(); size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED); @@ -1321,19 +1357,21 @@ void *replication_thread_main(void *ptr __maybe_unused) { replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; } - if(!replication_globals.unsafe.pending && --run_verification_countdown == 0) { - // reset the statistics about completion percentage - replication_globals.unsafe.first_time_t = 0; - replication_set_latest_first_time(0); + if(--run_verification_countdown == 0) { + if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) { + // reset the statistics about completion percentage + replication_globals.unsafe.first_time_t = 0; + replication_set_latest_first_time(0); - verify_all_hosts_charts_are_streaming_now(); + verify_all_hosts_charts_are_streaming_now(); - run_verification_countdown = LONG_MAX; - slow = true; + run_verification_countdown = LONG_MAX; + slow = true; + } + else + run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION; } - worker_is_busy(WORKER_JOB_STATISTICS); - time_t latest_first_time_t = replication_get_latest_first_time(); if(latest_first_time_t && replication_globals.unsafe.pending) { // completion percentage statistics @@ -1349,15 +1387,17 @@ void *replication_thread_main(void *ptr __maybe_unused) { worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED)); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_not_connected); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_no_room); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)replication_globals.main_thread.waits); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full); replication_recursive_unlock(); + worker_is_idle(); } if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + + worker_is_busy(WORKER_JOB_WAIT); replication_recursive_lock(); // the timeout also defines now frequently we will traverse all the pending requests @@ -1388,7 +1428,6 @@ void *replication_thread_main(void *ptr __maybe_unused) { last_sender_resets = replication_globals.unsafe.sender_resets; } - replication_globals.main_thread.waits++; replication_recursive_unlock(); worker_is_idle(); diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index c5f7618c1..a0c7e8de2 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -168,12 +168,9 @@ struct sender_state { struct { size_t pending_requests; // the currently outstanding replication requests size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart) + bool reached_max; // true when the sender buffer should not get more replication responses } atomic; - struct { - bool reached_max; // used to avoid resetting the replication thread too frequently - } unsafe; // protected by sender mutex - } replication; struct { @@ -182,10 +179,13 @@ struct sender_state { } atomic; }; -#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED); +#define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST) +#define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST) + +#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED) #define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED) -#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED); +#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED) #define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED) #define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED) diff --git a/streaming/sender.c b/streaming/sender.c index 8e637d2bd..62097e39f 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -1114,9 +1114,14 @@ void *rrdpush_sender_thread(void *ptr) { } #ifdef ENABLE_HTTPS - if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ){ - security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING); - ssl_security_location_for_context(netdata_ssl_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path); + if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ) { + static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER; + netdata_spinlock_lock(&sp); + if(!netdata_ssl_client_ctx) { + security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING); + ssl_security_location_for_context(netdata_ssl_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path); + } + netdata_spinlock_unlock(&sp); } #endif diff --git a/web/api/queries/query.c b/web/api/queries/query.c index ccd195135..0365b6e96 100644 --- a/web/api/queries/query.c +++ b/web/api/queries/query.c @@ -1496,15 +1496,15 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now) struct storage_engine_query_handle handle; // for each lower tier - for(int tr = (int)tier - 1; tr >= 0 ;tr--){ - time_t smaller_tier_first_time = rd->tiers[tr]->query_ops->oldest_time(rd->tiers[tr]->db_metric_handle); - time_t smaller_tier_last_time = rd->tiers[tr]->query_ops->latest_time(rd->tiers[tr]->db_metric_handle); + for(int read_tier = (int)tier - 1; read_tier >= 0 ; read_tier--){ + time_t smaller_tier_first_time = rd->tiers[read_tier]->query_ops->oldest_time(rd->tiers[read_tier]->db_metric_handle); + time_t smaller_tier_last_time = rd->tiers[read_tier]->query_ops->latest_time(rd->tiers[read_tier]->db_metric_handle); if(smaller_tier_last_time <= latest_time_t) continue; // it is as bad as we are long after_wanted = (latest_time_t < smaller_tier_first_time) ? smaller_tier_first_time : latest_time_t; long before_wanted = smaller_tier_last_time; - struct rrddim_tier *tmp = rd->tiers[tr]; + struct rrddim_tier *tmp = rd->tiers[read_tier]; tmp->query_ops->init(tmp->db_metric_handle, &handle, after_wanted, before_wanted); size_t points_read = 0; @@ -1516,7 +1516,7 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now) if(sp.end_time > latest_time_t) { latest_time_t = sp.end_time; - store_metric_at_tier(rd, tr, t, sp, sp.end_time * USEC_PER_SEC); + store_metric_at_tier(rd, tier, t, sp, sp.end_time * USEC_PER_SEC); } } -- cgit v1.2.3