summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.github/data/distros.yml2
-rw-r--r--CHANGELOG.md25
-rw-r--r--collectors/diskspace.plugin/plugin_diskspace.c2
-rw-r--r--collectors/proc.plugin/ipc.c8
-rw-r--r--daemon/global_statistics.c267
-rw-r--r--daemon/service.c6
-rw-r--r--database/engine/pagecache.c9
-rw-r--r--database/engine/pagecache.h1
-rw-r--r--database/engine/rrdengine.c36
-rw-r--r--database/engine/rrdengine.h2
-rwxr-xr-xdatabase/engine/rrdengineapi.c98
-rw-r--r--database/engine/rrdengineapi.h10
-rw-r--r--database/ram/rrddim_mem.c8
-rw-r--r--database/ram/rrddim_mem.h6
-rw-r--r--database/rrd.h6
-rw-r--r--database/rrdcontext.c7
-rw-r--r--database/rrddim.c12
-rw-r--r--database/sqlite/sqlite_context.c2
-rw-r--r--database/sqlite/sqlite_functions.c2
-rw-r--r--libnetdata/dictionary/dictionary.c42
-rw-r--r--libnetdata/procfile/procfile.c67
-rw-r--r--libnetdata/procfile/procfile.h15
-rw-r--r--libnetdata/socket/security.c38
-rw-r--r--libnetdata/string/string.c9
-rw-r--r--libnetdata/worker_utilization/worker_utilization.c94
-rw-r--r--libnetdata/worker_utilization/worker_utilization.h4
-rw-r--r--packaging/version2
-rw-r--r--streaming/replication.c213
-rw-r--r--streaming/rrdpush.h12
-rw-r--r--streaming/sender.c11
-rw-r--r--web/api/queries/query.c10
31 files changed, 624 insertions, 402 deletions
diff --git a/.github/data/distros.yml b/.github/data/distros.yml
index 0f5718645..cc5275298 100644
--- a/.github/data/distros.yml
+++ b/.github/data/distros.yml
@@ -104,7 +104,7 @@ include:
dnf remove -y json-c-devel
packages: &fedora_packages
type: rpm
- repo_distro: fedora/36
+ repo_distro: fedora/37
arches:
- x86_64
- aarch64
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a46b77108..7619153da 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,22 @@
# Changelog
+## [v1.37.1](https://github.com/netdata/netdata/tree/v1.37.1) (2022-12-05)
+
+[Full Changelog](https://github.com/netdata/netdata/compare/v1.37.0...v1.37.1)
+
+**Merged pull requests:**
+
+- fix v1.37 dbengine page alignment crashes [\#14086](https://github.com/netdata/netdata/pull/14086) ([ktsaou](https://github.com/ktsaou))
+- Fix \_\_atomic\_compare\_exchange\_n\(\) atomics [\#14085](https://github.com/netdata/netdata/pull/14085) ([ktsaou](https://github.com/ktsaou))
+- Fix 1.37 crashes [\#14081](https://github.com/netdata/netdata/pull/14081) ([stelfrag](https://github.com/stelfrag))
+- add basic dashboard info for NGINX Plus [\#14080](https://github.com/netdata/netdata/pull/14080) ([ilyam8](https://github.com/ilyam8))
+- replication fixes 9 [\#14079](https://github.com/netdata/netdata/pull/14079) ([ktsaou](https://github.com/ktsaou))
+- optimize workers statistics performance [\#14077](https://github.com/netdata/netdata/pull/14077) ([ktsaou](https://github.com/ktsaou))
+- fix SSL related crashes [\#14076](https://github.com/netdata/netdata/pull/14076) ([ktsaou](https://github.com/ktsaou))
+- remove python.d/springboot [\#14075](https://github.com/netdata/netdata/pull/14075) ([ilyam8](https://github.com/ilyam8))
+- fix backfilling statistics [\#14074](https://github.com/netdata/netdata/pull/14074) ([ktsaou](https://github.com/ktsaou))
+- Add workflow dispatch trigger for parent/child with cloud integration smoke tests [\#14070](https://github.com/netdata/netdata/pull/14070) ([dimko](https://github.com/dimko))
+
## [v1.37.0](https://github.com/netdata/netdata/tree/v1.37.0) (2022-11-30)
[Full Changelog](https://github.com/netdata/netdata/compare/v1.36.1...v1.37.0)
@@ -298,7 +315,6 @@
- Add Fedora 37 to CI and package builds. [\#13489](https://github.com/netdata/netdata/pull/13489) ([Ferroin](https://github.com/Ferroin))
- Overhaul handling of installation of Netdata as a system service. [\#13451](https://github.com/netdata/netdata/pull/13451) ([Ferroin](https://github.com/Ferroin))
- reduce memcpy and memory usage on mqtt5 [\#13450](https://github.com/netdata/netdata/pull/13450) ([underhood](https://github.com/underhood))
-- Remove Alpine 3.13 from CI and official support. [\#13415](https://github.com/netdata/netdata/pull/13415) ([Ferroin](https://github.com/Ferroin))
## [v1.36.1](https://github.com/netdata/netdata/tree/v1.36.1) (2022-08-15)
@@ -347,13 +363,6 @@
- Dont duplicate buffered bytes [\#13435](https://github.com/netdata/netdata/pull/13435) ([vlvkobal](https://github.com/vlvkobal))
- Show last 15 alerts in notification [\#13434](https://github.com/netdata/netdata/pull/13434) ([MrZammler](https://github.com/MrZammler))
- Query queue only for queries [\#13431](https://github.com/netdata/netdata/pull/13431) ([underhood](https://github.com/underhood))
-- Remove octopus from demo-sites [\#13423](https://github.com/netdata/netdata/pull/13423) ([cakrit](https://github.com/cakrit))
-- Tiering statistics API endpoint [\#13420](https://github.com/netdata/netdata/pull/13420) ([ktsaou](https://github.com/ktsaou))
-- add discord, youtube, linkedin links to README [\#13419](https://github.com/netdata/netdata/pull/13419) ([andrewm4894](https://github.com/andrewm4894))
-- add ML bullet point to features section on README [\#13418](https://github.com/netdata/netdata/pull/13418) ([andrewm4894](https://github.com/andrewm4894))
-- Set value to SN\_EMPTY\_SLOT if flags is SN\_EMPTY\_SLOT [\#13417](https://github.com/netdata/netdata/pull/13417) ([MrZammler](https://github.com/MrZammler))
-- Add missing comma \(handle coverity warning CID 379360\) [\#13413](https://github.com/netdata/netdata/pull/13413) ([stelfrag](https://github.com/stelfrag))
-- codacy/lgtm ignore judy sources [\#13411](https://github.com/netdata/netdata/pull/13411) ([underhood](https://github.com/underhood))
## [v1.35.1](https://github.com/netdata/netdata/tree/v1.35.1) (2022-06-10)
diff --git a/collectors/diskspace.plugin/plugin_diskspace.c b/collectors/diskspace.plugin/plugin_diskspace.c
index 5f610983b..e806a3360 100644
--- a/collectors/diskspace.plugin/plugin_diskspace.c
+++ b/collectors/diskspace.plugin/plugin_diskspace.c
@@ -320,7 +320,7 @@ static inline void do_disk_space_stats(struct mountinfo *mi, int update_every) {
, SIMPLE_PATTERN_EXACT
);
- dict_mountpoints = dictionary_create(DICT_OPTION_SINGLE_THREADED);
+ dict_mountpoints = dictionary_create(DICT_OPTION_NONE);
}
struct mount_point_metadata *m = dictionary_get(dict_mountpoints, mi->mount_point);
diff --git a/collectors/proc.plugin/ipc.c b/collectors/proc.plugin/ipc.c
index 9185894eb..7d3d2ecbb 100644
--- a/collectors/proc.plugin/ipc.c
+++ b/collectors/proc.plugin/ipc.c
@@ -195,7 +195,7 @@ int ipc_msq_get_info(char *msg_filename, struct message_queue **message_queue_ro
size_t words = 0;
if(unlikely(lines < 2)) {
- error("Cannot read %s. Expected 2 or more lines, read %zu.", ff->filename, lines);
+ error("Cannot read %s. Expected 2 or more lines, read %zu.", procfile_filename(ff), lines);
return 1;
}
@@ -205,7 +205,7 @@ int ipc_msq_get_info(char *msg_filename, struct message_queue **message_queue_ro
words = procfile_linewords(ff, l);
if(unlikely(words < 2)) continue;
if(unlikely(words < 14)) {
- error("Cannot read %s line. Expected 14 params, read %zu.", ff->filename, words);
+ error("Cannot read %s line. Expected 14 params, read %zu.", procfile_filename(ff), words);
continue;
}
@@ -250,7 +250,7 @@ int ipc_shm_get_info(char *shm_filename, struct shm_stats *shm) {
size_t words = 0;
if(unlikely(lines < 2)) {
- error("Cannot read %s. Expected 2 or more lines, read %zu.", ff->filename, lines);
+ error("Cannot read %s. Expected 2 or more lines, read %zu.", procfile_filename(ff), lines);
return 1;
}
@@ -263,7 +263,7 @@ int ipc_shm_get_info(char *shm_filename, struct shm_stats *shm) {
words = procfile_linewords(ff, l);
if(unlikely(words < 2)) continue;
if(unlikely(words < 16)) {
- error("Cannot read %s line. Expected 16 params, read %zu.", ff->filename, words);
+ error("Cannot read %s line. Expected 16 params, read %zu.", procfile_filename(ff), words);
continue;
}
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c
index 53fd6c45a..a4e9d321f 100644
--- a/daemon/global_statistics.c
+++ b/daemon/global_statistics.c
@@ -12,9 +12,10 @@
#define WORKER_JOB_STRINGS 5
#define WORKER_JOB_DICTIONARIES 6
#define WORKER_JOB_MALLOC_TRACE 7
+#define WORKER_JOB_SQLITE3 8
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 8
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 8
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 9
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 9
#endif
bool global_statistics_enabled = true;
@@ -60,21 +61,6 @@ static struct global_statistics {
uint64_t db_points_stored_per_tier[RRD_STORAGE_TIERS];
- uint64_t sqlite3_queries_made;
- uint64_t sqlite3_queries_ok;
- uint64_t sqlite3_queries_failed;
- uint64_t sqlite3_queries_failed_busy;
- uint64_t sqlite3_queries_failed_locked;
- uint64_t sqlite3_rows;
- uint64_t sqlite3_metadata_cache_hit;
- uint64_t sqlite3_context_cache_hit;
- uint64_t sqlite3_metadata_cache_miss;
- uint64_t sqlite3_context_cache_miss;
- uint64_t sqlite3_metadata_cache_spill;
- uint64_t sqlite3_context_cache_spill;
- uint64_t sqlite3_metadata_cache_write;
- uint64_t sqlite3_context_cache_write;
-
} global_statistics = {
.connected_clients = 0,
.web_requests = 0,
@@ -112,27 +98,6 @@ void global_statistics_backfill_query_completed(size_t points_read) {
__atomic_fetch_add(&global_statistics.backfill_db_points_read, points_read, __ATOMIC_RELAXED);
}
-void global_statistics_sqlite3_query_completed(bool success, bool busy, bool locked) {
- __atomic_fetch_add(&global_statistics.sqlite3_queries_made, 1, __ATOMIC_RELAXED);
-
- if(success) {
- __atomic_fetch_add(&global_statistics.sqlite3_queries_ok, 1, __ATOMIC_RELAXED);
- }
- else {
- __atomic_fetch_add(&global_statistics.sqlite3_queries_failed, 1, __ATOMIC_RELAXED);
-
- if(busy)
- __atomic_fetch_add(&global_statistics.sqlite3_queries_failed_busy, 1, __ATOMIC_RELAXED);
-
- if(locked)
- __atomic_fetch_add(&global_statistics.sqlite3_queries_failed_locked, 1, __ATOMIC_RELAXED);
- }
-}
-
-void global_statistics_sqlite3_row_completed(void) {
- __atomic_fetch_add(&global_statistics.sqlite3_rows, 1, __ATOMIC_RELAXED);
-}
-
void global_statistics_rrdr_query_completed(size_t queries, uint64_t db_points_read, uint64_t result_points_generated, QUERY_SOURCE query_source) {
switch(query_source) {
case QUERY_SOURCE_API_DATA:
@@ -241,25 +206,6 @@ static inline void global_statistics_copy(struct global_statistics *gs, uint8_t
uint64_t n = 0;
__atomic_compare_exchange(&global_statistics.web_usec_max, (uint64_t *) &gs->web_usec_max, &n, 1, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
}
-
- gs->sqlite3_queries_made = __atomic_load_n(&global_statistics.sqlite3_queries_made, __ATOMIC_RELAXED);
- gs->sqlite3_queries_ok = __atomic_load_n(&global_statistics.sqlite3_queries_ok, __ATOMIC_RELAXED);
- gs->sqlite3_queries_failed = __atomic_load_n(&global_statistics.sqlite3_queries_failed, __ATOMIC_RELAXED);
- gs->sqlite3_queries_failed_busy = __atomic_load_n(&global_statistics.sqlite3_queries_failed_busy, __ATOMIC_RELAXED);
- gs->sqlite3_queries_failed_locked = __atomic_load_n(&global_statistics.sqlite3_queries_failed_locked, __ATOMIC_RELAXED);
- gs->sqlite3_rows = __atomic_load_n(&global_statistics.sqlite3_rows, __ATOMIC_RELAXED);
-
- gs->sqlite3_metadata_cache_hit = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_HIT);
- gs->sqlite3_context_cache_hit = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_HIT);
-
- gs->sqlite3_metadata_cache_miss = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_MISS);
- gs->sqlite3_context_cache_miss = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_MISS);
-
- gs->sqlite3_metadata_cache_spill = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_SPILL);
- gs->sqlite3_context_cache_spill = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_SPILL);
-
- gs->sqlite3_metadata_cache_write = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_WRITE);
- gs->sqlite3_context_cache_write = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_WRITE);
}
static void global_statistics_charts(void) {
@@ -707,8 +653,129 @@ static void global_statistics_charts(void) {
rrdset_done(st_points_stored);
}
+}
- // ----------------------------------------------------------------
+// ----------------------------------------------------------------------------
+// sqlite3 statistics
+
+struct sqlite3_statistics {
+ uint64_t sqlite3_queries_made;
+ uint64_t sqlite3_queries_ok;
+ uint64_t sqlite3_queries_failed;
+ uint64_t sqlite3_queries_failed_busy;
+ uint64_t sqlite3_queries_failed_locked;
+ uint64_t sqlite3_rows;
+ uint64_t sqlite3_metadata_cache_hit;
+ uint64_t sqlite3_context_cache_hit;
+ uint64_t sqlite3_metadata_cache_miss;
+ uint64_t sqlite3_context_cache_miss;
+ uint64_t sqlite3_metadata_cache_spill;
+ uint64_t sqlite3_context_cache_spill;
+ uint64_t sqlite3_metadata_cache_write;
+ uint64_t sqlite3_context_cache_write;
+
+} sqlite3_statistics = { };
+
+void global_statistics_sqlite3_query_completed(bool success, bool busy, bool locked) {
+ __atomic_fetch_add(&sqlite3_statistics.sqlite3_queries_made, 1, __ATOMIC_RELAXED);
+
+ if(success) {
+ __atomic_fetch_add(&sqlite3_statistics.sqlite3_queries_ok, 1, __ATOMIC_RELAXED);
+ }
+ else {
+ __atomic_fetch_add(&sqlite3_statistics.sqlite3_queries_failed, 1, __ATOMIC_RELAXED);
+
+ if(busy)
+ __atomic_fetch_add(&sqlite3_statistics.sqlite3_queries_failed_busy, 1, __ATOMIC_RELAXED);
+
+ if(locked)
+ __atomic_fetch_add(&sqlite3_statistics.sqlite3_queries_failed_locked, 1, __ATOMIC_RELAXED);
+ }
+}
+
+void global_statistics_sqlite3_row_completed(void) {
+ __atomic_fetch_add(&sqlite3_statistics.sqlite3_rows, 1, __ATOMIC_RELAXED);
+}
+
+static inline void sqlite3_statistics_copy(struct sqlite3_statistics *gs) {
+ static usec_t last_run = 0;
+
+ gs->sqlite3_queries_made = __atomic_load_n(&sqlite3_statistics.sqlite3_queries_made, __ATOMIC_RELAXED);
+ gs->sqlite3_queries_ok = __atomic_load_n(&sqlite3_statistics.sqlite3_queries_ok, __ATOMIC_RELAXED);
+ gs->sqlite3_queries_failed = __atomic_load_n(&sqlite3_statistics.sqlite3_queries_failed, __ATOMIC_RELAXED);
+ gs->sqlite3_queries_failed_busy = __atomic_load_n(&sqlite3_statistics.sqlite3_queries_failed_busy, __ATOMIC_RELAXED);
+ gs->sqlite3_queries_failed_locked = __atomic_load_n(&sqlite3_statistics.sqlite3_queries_failed_locked, __ATOMIC_RELAXED);
+ gs->sqlite3_rows = __atomic_load_n(&sqlite3_statistics.sqlite3_rows, __ATOMIC_RELAXED);
+
+ usec_t timeout = default_rrd_update_every * USEC_PER_SEC + default_rrd_update_every * USEC_PER_SEC / 3;
+ usec_t now = now_monotonic_usec();
+ if(!last_run)
+ last_run = now;
+ usec_t delta = now - last_run;
+ bool query_sqlite3 = delta < timeout;
+
+ if(query_sqlite3 && now_monotonic_usec() - last_run < timeout)
+ gs->sqlite3_metadata_cache_hit = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_HIT);
+ else {
+ gs->sqlite3_metadata_cache_hit = UINT64_MAX;
+ query_sqlite3 = false;
+ }
+
+ if(query_sqlite3 && now_monotonic_usec() - last_run < timeout)
+ gs->sqlite3_context_cache_hit = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_HIT);
+ else {
+ gs->sqlite3_context_cache_hit = UINT64_MAX;
+ query_sqlite3 = false;
+ }
+
+ if(query_sqlite3 && now_monotonic_usec() - last_run < timeout)
+ gs->sqlite3_metadata_cache_miss = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_MISS);
+ else {
+ gs->sqlite3_metadata_cache_miss = UINT64_MAX;
+ query_sqlite3 = false;
+ }
+
+ if(query_sqlite3 && now_monotonic_usec() - last_run < timeout)
+ gs->sqlite3_context_cache_miss = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_MISS);
+ else {
+ gs->sqlite3_context_cache_miss = UINT64_MAX;
+ query_sqlite3 = false;
+ }
+
+ if(query_sqlite3 && now_monotonic_usec() - last_run < timeout)
+ gs->sqlite3_metadata_cache_spill = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_SPILL);
+ else {
+ gs->sqlite3_metadata_cache_spill = UINT64_MAX;
+ query_sqlite3 = false;
+ }
+
+ if(query_sqlite3 && now_monotonic_usec() - last_run < timeout)
+ gs->sqlite3_context_cache_spill = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_SPILL);
+ else {
+ gs->sqlite3_context_cache_spill = UINT64_MAX;
+ query_sqlite3 = false;
+ }
+
+ if(query_sqlite3 && now_monotonic_usec() - last_run < timeout)
+ gs->sqlite3_metadata_cache_write = (uint64_t) sql_metadata_cache_stats(SQLITE_DBSTATUS_CACHE_WRITE);
+ else {
+ gs->sqlite3_metadata_cache_write = UINT64_MAX;
+ query_sqlite3 = false;
+ }
+
+ if(query_sqlite3 && now_monotonic_usec() - last_run < timeout)
+ gs->sqlite3_context_cache_write = (uint64_t) sql_context_cache_stats(SQLITE_DBSTATUS_CACHE_WRITE);
+ else {
+ gs->sqlite3_context_cache_write = UINT64_MAX;
+ query_sqlite3 = false;
+ }
+
+ last_run = now_monotonic_usec();
+}
+
+static void sqlite3_statistics_charts(void) {
+ struct sqlite3_statistics gs;
+ sqlite3_statistics_copy(&gs);
if(gs.sqlite3_queries_made) {
static RRDSET *st_sqlite3_queries = NULL;
@@ -833,10 +900,17 @@ static void global_statistics_charts(void) {
rd_cache_write = rrddim_add(st_sqlite3_cache, "cache_write", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
}
- rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_hit, (collected_number)gs.sqlite3_metadata_cache_hit);
- rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_miss, (collected_number)gs.sqlite3_metadata_cache_miss);
- rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_spill, (collected_number)gs.sqlite3_metadata_cache_spill);
- rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_write, (collected_number)gs.sqlite3_metadata_cache_write);
+ if(gs.sqlite3_metadata_cache_hit != UINT64_MAX)
+ rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_hit, (collected_number)gs.sqlite3_metadata_cache_hit);
+
+ if(gs.sqlite3_metadata_cache_miss != UINT64_MAX)
+ rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_miss, (collected_number)gs.sqlite3_metadata_cache_miss);
+
+ if(gs.sqlite3_metadata_cache_spill != UINT64_MAX)
+ rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_spill, (collected_number)gs.sqlite3_metadata_cache_spill);
+
+ if(gs.sqlite3_metadata_cache_write != UINT64_MAX)
+ rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_write, (collected_number)gs.sqlite3_metadata_cache_write);
rrdset_done(st_sqlite3_cache);
}
@@ -870,10 +944,17 @@ static void global_statistics_charts(void) {
rd_cache_write = rrddim_add(st_sqlite3_cache, "cache_write", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
}
- rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_hit, (collected_number)gs.sqlite3_context_cache_hit);
- rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_miss, (collected_number)gs.sqlite3_context_cache_miss);
- rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_spill, (collected_number)gs.sqlite3_context_cache_spill);
- rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_write, (collected_number)gs.sqlite3_context_cache_write);
+ if(gs.sqlite3_context_cache_hit != UINT64_MAX)
+ rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_hit, (collected_number)gs.sqlite3_context_cache_hit);
+
+ if(gs.sqlite3_context_cache_miss != UINT64_MAX)
+ rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_miss, (collected_number)gs.sqlite3_context_cache_miss);
+
+ if(gs.sqlite3_context_cache_spill != UINT64_MAX)
+ rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_spill, (collected_number)gs.sqlite3_context_cache_spill);
+
+ if(gs.sqlite3_context_cache_write != UINT64_MAX)
+ rrddim_set_by_pointer(st_sqlite3_cache, rd_cache_write, (collected_number)gs.sqlite3_context_cache_write);
rrdset_done(st_sqlite3_cache);
}
@@ -2322,7 +2403,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) {
snprintf(context, RRD_ID_LENGTH_MAX, "netdata.workers.%s.value.%s", wu->name_lowercase, job_name_sanitized);
char title[1000 + 1];
- snprintf(title, 1000, "Netdata Workers %s Value of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name));
+ snprintf(title, 1000, "Netdata Workers %s value of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name));
wu->per_job_type[i].st = rrdset_create_localhost(
"netdata"
@@ -2334,7 +2415,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) {
, (wu->per_job_type[i].units)?string2str(wu->per_job_type[i].units):"value"
, "netdata"
, "stats"
- , wu->priority + 5
+ , wu->priority + 5 + i
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
@@ -2378,7 +2459,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) {
snprintf(context, RRD_ID_LENGTH_MAX, "netdata.workers.%s.rate.%s", wu->name_lowercase, job_name_sanitized);
char title[1000 + 1];
- snprintf(title, 1000, "Netdata Workers %s Rate of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name));
+ snprintf(title, 1000, "Netdata Workers %s rate of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name));
wu->per_job_type[i].st = rrdset_create_localhost(
"netdata"
@@ -2390,7 +2471,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) {
, (wu->per_job_type[i].units)?string2str(wu->per_job_type[i].units):"rate"
, "netdata"
, "stats"
- , wu->priority + 5
+ , wu->priority + 5 + i
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
@@ -2447,21 +2528,37 @@ static void workers_utilization_reset_statistics(struct worker_utilization *wu)
}
}
+#define TASK_STAT_PREFIX "/proc/self/task/"
+#define TASK_STAT_SUFFIX "/stat"
+
static int read_thread_cpu_time_from_proc_stat(pid_t pid __maybe_unused, kernel_uint_t *utime __maybe_unused, kernel_uint_t *stime __maybe_unused) {
#ifdef __linux__
- char filename[200 + 1];
- snprintfz(filename, 200, "/proc/self/task/%d/stat", pid);
+ static char filename[sizeof(TASK_STAT_PREFIX) + sizeof(TASK_STAT_SUFFIX) + 20] = TASK_STAT_PREFIX;
+ static size_t start_pos = sizeof(TASK_STAT_PREFIX) - 1;
+ static procfile *ff = NULL;
- procfile *ff = procfile_open(filename, " ", PROCFILE_FLAG_NO_ERROR_ON_FILE_IO);
- if(!ff) return -1;
+ // construct the filename
+ size_t end_pos = snprintfz(&filename[start_pos], 20, "%d", pid);
+ strcpy(&filename[start_pos + end_pos], TASK_STAT_SUFFIX);
+ // (re)open the procfile to the new filename
+ bool set_quotes = (ff == NULL) ? true : false;
+ ff = procfile_reopen(ff, filename, NULL, PROCFILE_FLAG_DEFAULT);
+ if(unlikely(!ff)) return -1;
+
+ if(set_quotes)
+ procfile_set_open_close(ff, "(", ")");
+
+ // read the entire file and split it to lines and words
ff = procfile_readall(ff);
- if(!ff) return -1;
+ if(unlikely(!ff)) return -1;
+ // parse the numbers we are interested
*utime = str2kernel_uint_t(procfile_lineword(ff, 0, 13));
*stime = str2kernel_uint_t(procfile_lineword(ff, 0, 14));
- procfile_close(ff);
+ // leave the file open for the next iteration
+
return 0;
#else
// TODO: add here cpu time detection per thread, for FreeBSD and MacOS
@@ -2474,8 +2571,6 @@ static int read_thread_cpu_time_from_proc_stat(pid_t pid __maybe_unused, kernel_
static Pvoid_t workers_by_pid_JudyL_array = NULL;
static void workers_threads_cleanup(struct worker_utilization *wu) {
- netdata_thread_disable_cancelability();
-
struct worker_thread *t = wu->threads;
while(t) {
struct worker_thread *next = t->next;
@@ -2487,8 +2582,6 @@ static void workers_threads_cleanup(struct worker_utilization *wu) {
}
t = next;
}
-
- netdata_thread_enable_cancelability();
}
static struct worker_thread *worker_thread_find(struct worker_utilization *wu __maybe_unused, pid_t pid) {
@@ -2621,16 +2714,20 @@ static void worker_utilization_charts(void) {
static size_t iterations = 0;
iterations++;
- int i;
- for(i = 0; all_workers_utilization[i].name ;i++) {
+ for(int i = 0; all_workers_utilization[i].name ;i++) {
workers_utilization_reset_statistics(&all_workers_utilization[i]);
+
+ netdata_thread_disable_cancelability();
workers_foreach(all_workers_utilization[i].name, worker_utilization_charts_callback, &all_workers_utilization[i]);
+ netdata_thread_enable_cancelability();
// skip the first iteration, so that we don't accumulate startup utilization to our charts
if(likely(iterations > 1))
workers_utilization_update_chart(&all_workers_utilization[i]);
+ netdata_thread_disable_cancelability();
workers_threads_cleanup(&all_workers_utilization[i]);
+ netdata_thread_enable_cancelability();
}
workers_total_cpu_utilization_chart();
@@ -2672,11 +2769,12 @@ static void global_statistics_register_workers(void) {
worker_register("STATS");
worker_register_job_name(WORKER_JOB_GLOBAL, "global");
worker_register_job_name(WORKER_JOB_REGISTRY, "registry");
- worker_register_job_name(WORKER_JOB_WORKERS, "workers");
worker_register_job_name(WORKER_JOB_DBENGINE, "dbengine");
worker_register_job_name(WORKER_JOB_STRINGS, "strings");
worker_register_job_name(WORKER_JOB_DICTIONARIES, "dictionaries");
worker_register_job_name(WORKER_JOB_MALLOC_TRACE, "malloc_trace");
+ worker_register_job_name(WORKER_JOB_WORKERS, "workers");
+ worker_register_job_name(WORKER_JOB_SQLITE3, "sqlite3");
}
static void global_statistics_cleanup(void *ptr)
@@ -2719,6 +2817,9 @@ void *global_statistics_main(void *ptr)
worker_is_busy(WORKER_JOB_GLOBAL);
global_statistics_charts();
+ worker_is_busy(WORKER_JOB_SQLITE3);
+ sqlite3_statistics_charts();
+
worker_is_busy(WORKER_JOB_REGISTRY);
registry_statistics();
diff --git a/daemon/service.c b/daemon/service.c
index a7db7ceb7..6db2ef69f 100644
--- a/daemon/service.c
+++ b/daemon/service.c
@@ -46,19 +46,19 @@ static void svc_rrddim_obsolete_to_archive(RRDDIM *rd) {
/* only a collector can mark a chart as obsolete, so we must remove the reference */
- size_t tiers_available = 0, tiers_said_yes = 0;
+ size_t tiers_available = 0, tiers_said_no_retention = 0;
for(size_t tier = 0; tier < storage_tiers ;tier++) {
if(rd->tiers[tier]) {
tiers_available++;
if(rd->tiers[tier]->collect_ops->finalize(rd->tiers[tier]->db_collection_handle))
- tiers_said_yes++;
+ tiers_said_no_retention++;
rd->tiers[tier]->db_collection_handle = NULL;
}
}
- if (tiers_available == tiers_said_yes && tiers_said_yes) {
+ if (tiers_available == tiers_said_no_retention && tiers_said_no_retention) {
/* This metric has no data and no references */
metaqueue_delete_dimension_uuid(&rd->metric_uuid);
}
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index d65cb35a5..4f5da7084 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -524,6 +524,14 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
+ while (unlikely(pg_cache_descr->flags & RRD_PAGE_READ_PENDING)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "%s: Found page with READ PENDING, waiting for read to complete", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr, "", true);
+ pg_cache_wait_event_unsafe(descr);
+ }
+
if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
/* only after locking can it be safely deleted from LRU */
pg_cache_replaceQ_delete(ctx, descr);
@@ -1196,7 +1204,6 @@ struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_insta
page_index->refcount = 0;
page_index->writers = 0;
page_index->ctx = ctx;
- page_index->alignment = NULL;
page_index->latest_update_every_s = default_rrd_update_every;
return page_index;
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index 2f4d6b332..635b02123 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -112,7 +112,6 @@ struct pg_cache_page_index {
usec_t latest_time_ut;
struct rrdengine_instance *ctx;
- struct pg_alignment *alignment;
uint32_t latest_update_every_s;
struct pg_cache_page_index *prev;
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index e4cd37e98..a6840f38c 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -272,9 +272,19 @@ static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type)
}
}
+struct rrdeng_page_descr *get_descriptor(struct pg_cache_page_index *page_index, time_t start_time_s)
+{
+ uv_rwlock_rdlock(&page_index->lock);
+ Pvoid_t *PValue = JudyLGet(page_index->JudyL_array, start_time_s, PJE0);
+ struct rrdeng_page_descr *descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ uv_rwlock_rdunlock(&page_index->lock);
+ return descr;
+};
+
static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed)
{
struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
int ret;
@@ -365,19 +375,30 @@ after_crc_check:
}
}
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, xt_io_descr->descr_array[0]->id, sizeof(uuid_t));
+ struct pg_cache_page_index *page_index = likely( NULL != PValue) ? *PValue : NULL;
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+
+
for (i = 0, page_offset = 0; i < count; page_offset += header->descr[i++].page_length) {
uint8_t is_prefetched_page;
descr = NULL;
for (j = 0 ; j < xt_io_descr->descr_count; ++j) {
- struct rrdeng_page_descr *descrj;
+ struct rrdeng_page_descr descrj;
- descrj = xt_io_descr->descr_array[j];
+ descrj = xt_io_descr->descr_read_array[j];
/* care, we don't hold the descriptor mutex */
- if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj->id) &&
- header->descr[i].page_length == descrj->page_length &&
- header->descr[i].start_time_ut == descrj->start_time_ut &&
- header->descr[i].end_time_ut == descrj->end_time_ut) {
- descr = descrj;
+ if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj.id) &&
+ header->descr[i].page_length == descrj.page_length &&
+ header->descr[i].start_time_ut == descrj.start_time_ut &&
+ header->descr[i].end_time_ut == descrj.end_time_ut) {
+ //descr = descrj;
+ descr = get_descriptor(page_index, (time_t) (descrj.start_time_ut / USEC_PER_SEC));
+ if (unlikely(!descr)) {
+ error_limit_static_thread_var(erl, 1, 0);
+ error_limit(&erl, "%s: Required descriptor is not in the page index anymore", __FUNCTION__);
+ }
break;
}
}
@@ -506,6 +527,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
pg_cache_descr->flags |= RRD_PAGE_READ_PENDING;
rrdeng_page_descr_mutex_unlock(ctx, descr[i]);
xt_io_descr->descr_array[i] = descr[i];
+ xt_io_descr->descr_read_array[i] = *(descr[i]);
}
xt_io_descr->descr_count = count;
xt_io_descr->file = datafile->file;
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index fedadbe86..521d2521a 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -41,6 +41,7 @@ struct rrdeng_collect_handle {
unsigned long page_correlation_id;
// set to 1 when this dimension is not page aligned with the other dimensions in the chart
uint8_t unaligned_page;
+ struct pg_alignment *alignment;
};
struct rrdeng_query_handle {
@@ -117,6 +118,7 @@ struct extent_io_descriptor {
unsigned descr_count;
int release_descr;
struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT];
+ struct rrdeng_page_descr descr_read_array[MAX_PAGES_PER_EXTENT];
Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */
};
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 27503baee..4525b041f 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -39,21 +39,35 @@ uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1;
// ----------------------------------------------------------------------------
// metrics groups
+static inline void rrdeng_page_alignment_acquire(struct pg_alignment *pa) {
+ if(unlikely(!pa)) return;
+ __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST);
+}
+
+static inline bool rrdeng_page_alignment_release(struct pg_alignment *pa) {
+ if(unlikely(!pa)) return true;
+
+ if(__atomic_sub_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST) == 0) {
+ freez(pa);
+ return true;
+ }
+
+ return false;
+}
+
+// charts call this
STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) {
- return callocz(1, sizeof(struct pg_alignment));
+ struct pg_alignment *pa = callocz(1, sizeof(struct pg_alignment));
+ rrdeng_page_alignment_acquire(pa);
+ return (STORAGE_METRICS_GROUP *)pa;
}
-void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) {
- if(!smg) return;
+// charts call this
+void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg) {
+ if(unlikely(!smg)) return;
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
struct pg_alignment *pa = (struct pg_alignment *)smg;
- struct page_cache *pg_cache = &ctx->pg_cache;
-
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- if(pa->refcount == 0)
- freez(pa);
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ rrdeng_page_alignment_release(pa);
}
// ----------------------------------------------------------------------------
@@ -93,10 +107,10 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu
memcpy(ret_uuid, hash_value, sizeof(uuid_t));
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg) {
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id) {
uuid_t legacy_uuid;
rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid);
- return rrdeng_metric_get(db_instance, &legacy_uuid, smg);
+ return rrdeng_metric_get(db_instance, &legacy_uuid);
}
// ----------------------------------------------------------------------------
@@ -105,11 +119,7 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, c
void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) {
struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
- unsigned short refcount = __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
- if(refcount == 0 && page_index->alignment) {
- __atomic_sub_fetch(&page_index->alignment->refcount, 1, __ATOMIC_SEQ_CST);
- page_index->alignment = NULL;
- }
+ __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
}
STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) {
@@ -118,9 +128,8 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle
return db_metric_handle;
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) {
+STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
- struct pg_alignment *pa = (struct pg_alignment *)smg;
struct page_cache *pg_cache = &ctx->pg_cache;
struct pg_cache_page_index *page_index = NULL;
@@ -130,27 +139,16 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *
page_index = *PValue;
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (likely(page_index)) {
+ if (likely(page_index))
__atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
- if(pa) {
- if(page_index->alignment && page_index->alignment != pa && page_index->writers > 0)
- fatal("DBENGINE: page_index has a different alignment (page_index refcount is %u, writers is %u).",
- page_index->refcount, page_index->writers);
-
- page_index->alignment = pa;
- __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST);
- }
- }
-
return (STORAGE_METRIC_HANDLE *)page_index;
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) {
+STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
internal_fatal(!db_instance, "DBENGINE: db_instance is NULL");
struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
- struct pg_alignment *pa = (struct pg_alignment *)smg;
struct pg_cache_page_index *page_index;
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -160,28 +158,25 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_
*PValue = page_index = create_page_index(uuid, ctx);
page_index->prev = pg_cache->metrics_index.last_page_index;
pg_cache->metrics_index.last_page_index = page_index;
- page_index->alignment = pa;
page_index->refcount = 1;
- if(pa)
- pa->refcount++;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
return (STORAGE_METRIC_HANDLE *)page_index;
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) {
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance) {
STORAGE_METRIC_HANDLE *db_metric_handle;
- db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid, smg);
+ db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid);
if(!db_metric_handle) {
- db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset), smg);
+ db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset));
if(db_metric_handle) {
struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
uuid_copy(rd->metric_uuid, page_index->id);
}
}
if(!db_metric_handle)
- db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid, smg);
+ db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid);
#ifdef NETDATA_INTERNAL_CHECKS
struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
@@ -210,19 +205,19 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE
* Gets a handle for storing metrics to the database.
* The handle must be released with rrdeng_store_metric_final().
*/
-STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every) {
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) {
struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
struct rrdeng_collect_handle *handle;
- if(!page_index->alignment)
- fatal("DBENGINE: metric group is required for collect operations");
-
handle = callocz(1, sizeof(struct rrdeng_collect_handle));
handle->page_index = page_index;
handle->descr = NULL;
handle->unaligned_page = 0;
page_index->latest_update_every_s = update_every;
+ handle->alignment = (struct pg_alignment *)smg;
+ rrdeng_page_alignment_acquire(handle->alignment);
+
uv_rwlock_wrlock(&page_index->lock);
++page_index->writers;
uv_rwlock_wrunlock(&page_index->lock);
@@ -331,18 +326,18 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection
}
#endif
- if (descr->page_length == page_index->alignment->page_length) {
+ if (descr->page_length == handle->alignment->page_length) {
/* this is the leading dimension that defines chart alignment */
perfect_page_alignment = 1;
}
/* is the metric far enough out of alignment with the others? */
- if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < page_index->alignment->page_length)) {
+ if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < handle->alignment->page_length)) {
handle->unaligned_page = 1;
print_page_cache_descr(descr, "Metric page is not aligned with chart", true);
}
if (unlikely(handle->unaligned_page &&
/* did the other metrics change page? */
- page_index->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) {
+ handle->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) {
print_page_cache_descr(descr, "must_flush_unaligned_page = 1", true);
must_flush_unaligned_page = 1;
handle->unaligned_page = 0;
@@ -365,7 +360,7 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection
handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1);
- if (0 == page_index->alignment->page_length) {
+ if (0 == handle->alignment->page_length) {
/* this is the leading dimension that defines chart alignment */
perfect_page_alignment = 1;
}
@@ -403,7 +398,7 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection
pg_cache_atomic_set_pg_info(descr, point_in_time_ut, descr->page_length + PAGE_POINT_SIZE_BYTES(descr));
if (perfect_page_alignment)
- page_index->alignment->page_length = descr->page_length;
+ handle->alignment->page_length = descr->page_length;
if (unlikely(INVALID_TIME == descr->start_time_ut)) {
unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers;
descr->start_time_ut = point_in_time_ut;
@@ -530,10 +525,13 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
rrdeng_store_metric_flush_current_page(collection_handle);
uv_rwlock_wrlock(&page_index->lock);
- if (!--page_index->writers && !page_index->page_count) {
+
+ if (!--page_index->writers && !page_index->page_count)
can_delete_metric = 1;
- }
+
uv_rwlock_wrunlock(&page_index->lock);
+
+ rrdeng_page_alignment_release(handle->alignment);
freez(handle);
return can_delete_metric;
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index 85375044f..3acee4ec6 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -42,14 +42,14 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu
uuid_t *ret_uuid);
-STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
-STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg);
-STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg);
-STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg);
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance);
+STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
+STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id);
void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle);
STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle);
-STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every);
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle);
void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE n,
diff --git a/database/ram/rrddim_mem.c b/database/ram/rrddim_mem.c
index 43f32350b..299b6557a 100644
--- a/database/ram/rrddim_mem.c
+++ b/database/ram/rrddim_mem.c
@@ -22,8 +22,8 @@ void rrddim_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused,
// RRDDIM legacy data collection functions
STORAGE_METRIC_HANDLE *
-rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg __maybe_unused) {
- STORAGE_METRIC_HANDLE *t = rrddim_metric_get(db_instance, &rd->metric_uuid, smg);
+rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance __maybe_unused) {
+ STORAGE_METRIC_HANDLE *t = rrddim_metric_get(db_instance, &rd->metric_uuid);
if(!t) {
netdata_rwlock_wrlock(&rrddim_JudyHS_rwlock);
Pvoid_t *PValue = JudyHSIns(&rrddim_JudyHS_array, &rd->metric_uuid, sizeof(uuid_t), PJE0);
@@ -40,7 +40,7 @@ rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance __maybe_un
}
STORAGE_METRIC_HANDLE *
-rrddim_metric_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid, STORAGE_METRICS_GROUP *smg __maybe_unused) {
+rrddim_metric_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid) {
RRDDIM *rd = NULL;
netdata_rwlock_rdlock(&rrddim_JudyHS_rwlock);
Pvoid_t *PValue = JudyHSGet(rrddim_JudyHS_array, uuid, sizeof(uuid_t));
@@ -67,7 +67,7 @@ void rrddim_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *col
rrddim_store_metric_flush(collection_handle);
}
-STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every __maybe_unused) {
+STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every __maybe_unused, STORAGE_METRICS_GROUP *smg __maybe_unused) {
RRDDIM *rd = (RRDDIM *)db_metric_handle;
rd->db[rd->rrdset->current_entry] = pack_storage_number(NAN, SN_FLAG_NONE);
struct mem_collect_handle *ch = callocz(1, sizeof(struct mem_collect_handle));
diff --git a/database/ram/rrddim_mem.h b/database/ram/rrddim_mem.h
index 297388f51..79c59f110 100644
--- a/database/ram/rrddim_mem.h
+++ b/database/ram/rrddim_mem.h
@@ -20,15 +20,15 @@ struct mem_query_handle {
size_t last_slot;
};
-STORAGE_METRIC_HANDLE *rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
-STORAGE_METRIC_HANDLE *rrddim_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg);
+STORAGE_METRIC_HANDLE *rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance);
+STORAGE_METRIC_HANDLE *rrddim_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
STORAGE_METRIC_HANDLE *rrddim_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle);
void rrddim_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle);
STORAGE_METRICS_GROUP *rrddim_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
void rrddim_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
-STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every);
+STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
void rrddim_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number,
NETDATA_DOUBLE min_value,
diff --git a/database/rrd.h b/database/rrd.h
index f071ee254..0796ff901 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -409,7 +409,7 @@ typedef struct storage_query_handle STORAGE_QUERY_HANDLE;
// function pointers that handle data collection
struct storage_engine_collect_ops {
// an initialization function to run before starting collection
- STORAGE_COLLECT_HANDLE *(*init)(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every);
+ STORAGE_COLLECT_HANDLE *(*init)(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
// run this to store each metric into the database
void (*store_metric)(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number, NETDATA_DOUBLE min_value,
@@ -464,8 +464,8 @@ typedef struct storage_engine STORAGE_ENGINE;
// function pointers for all APIs provided by a storage engine
typedef struct storage_engine_api {
// metric management
- STORAGE_METRIC_HANDLE *(*metric_get)(STORAGE_INSTANCE *instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg);
- STORAGE_METRIC_HANDLE *(*metric_get_or_create)(RRDDIM *rd, STORAGE_INSTANCE *instance, STORAGE_METRICS_GROUP *smg);
+ STORAGE_METRIC_HANDLE *(*metric_get)(STORAGE_INSTANCE *instance, uuid_t *uuid);
+ STORAGE_METRIC_HANDLE *(*metric_get_or_create)(RRDDIM *rd, STORAGE_INSTANCE *instance);
void (*metric_release)(STORAGE_METRIC_HANDLE *);
STORAGE_METRIC_HANDLE *(*metric_dup)(STORAGE_METRIC_HANDLE *);
diff --git a/database/rrdcontext.c b/database/rrdcontext.c
index cfa8af3e0..3413d1ea8 100644
--- a/database/rrdcontext.c
+++ b/database/rrdcontext.c
@@ -29,7 +29,7 @@
#define WORKER_JOB_PP_QUEUE_SIZE 13
-typedef enum {
+typedef enum __attribute__ ((__packed__)) {
RRD_FLAG_NONE = 0,
RRD_FLAG_DELETED = (1 << 0), // this is a deleted object (metrics, instances, contexts)
RRD_FLAG_COLLECTED = (1 << 1), // this object is currently being collected
@@ -115,6 +115,7 @@ typedef enum {
static inline void
rrd_flag_add_remove_atomic(RRD_FLAGS *flags, RRD_FLAGS check, RRD_FLAGS conditionally_add, RRD_FLAGS always_remove) {
RRD_FLAGS expected, desired;
+
do {
expected = *flags;
@@ -2435,7 +2436,7 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
if(rm->rrddim && rm->rrddim->tiers[tier] && rm->rrddim->tiers[tier]->db_metric_handle)
tier_retention[tier].db_metric_handle = eng->api.metric_dup(rm->rrddim->tiers[tier]->db_metric_handle);
else
- tier_retention[tier].db_metric_handle = eng->api.metric_get(qtl->host->db[tier].instance, &rm->uuid, NULL);
+ tier_retention[tier].db_metric_handle = eng->api.metric_get(qtl->host->db[tier].instance, &rm->uuid);
if(tier_retention[tier].db_metric_handle) {
tier_retention[tier].db_first_time_t = tier_retention[tier].eng->api.query_ops.oldest_time(tier_retention[tier].db_metric_handle);
@@ -3253,8 +3254,6 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo
"RRDCONTEXT: context '%s' of host '%s', deleted from rrdmetrics dictionary.",
string2str(rc->id),
rrdhost_hostname(host));
-
- fprintf(stderr, "RRDCONTEXT: deleted context '%s'", string2str(rc->id));
}
// the item is referenced in the dictionary
diff --git a/database/rrddim.c b/database/rrddim.c
index 1b3d9952c..2d909a701 100644
--- a/database/rrddim.c
+++ b/database/rrddim.c
@@ -112,7 +112,7 @@ static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
rd->tiers[tier]->tier_grouping = host->db[tier].tier_grouping;
rd->tiers[tier]->collect_ops = &eng->api.collect_ops;
rd->tiers[tier]->query_ops = &eng->api.query_ops;
- rd->tiers[tier]->db_metric_handle = eng->api.metric_get_or_create(rd, host->db[tier].instance, rd->rrdset->storage_metrics_groups[tier]);
+ rd->tiers[tier]->db_metric_handle = eng->api.metric_get_or_create(rd, host->db[tier].instance);
storage_point_unset(rd->tiers[tier]->virtual_point);
initialized++;
@@ -131,7 +131,7 @@ static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
size_t initialized = 0;
for (size_t tier = 0; tier < storage_tiers; tier++) {
if (rd->tiers[tier]) {
- rd->tiers[tier]->db_collection_handle = rd->tiers[tier]->collect_ops->init(rd->tiers[tier]->db_metric_handle, st->rrdhost->db[tier].tier_grouping * st->update_every);
+ rd->tiers[tier]->db_collection_handle = rd->tiers[tier]->collect_ops->init(rd->tiers[tier]->db_metric_handle, st->rrdhost->db[tier].tier_grouping * st->update_every, rd->rrdset->storage_metrics_groups[tier]);
initialized++;
}
}
@@ -195,19 +195,19 @@ static void rrddim_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v
debug(D_RRD_CALLS, "rrddim_free() %s.%s", rrdset_name(st), rrddim_name(rd));
- size_t tiers_available = 0, tiers_said_yes = 0;
+ size_t tiers_available = 0, tiers_said_no_retention = 0;
for(size_t tier = 0; tier < storage_tiers ;tier++) {
if(rd->tiers[tier] && rd->tiers[tier]->db_collection_handle) {
tiers_available++;
if(rd->tiers[tier]->collect_ops->finalize(rd->tiers[tier]->db_collection_handle))
- tiers_said_yes++;
+ tiers_said_no_retention++;
rd->tiers[tier]->db_collection_handle = NULL;
}
}
- if (tiers_available == tiers_said_yes && tiers_said_yes && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+ if (tiers_available == tiers_said_no_retention && tiers_said_no_retention && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
/* This metric has no data and no references */
metaqueue_delete_dimension_uuid(&rd->metric_uuid);
}
@@ -261,7 +261,7 @@ static bool rrddim_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused,
for(size_t tier = 0; tier < storage_tiers ;tier++) {
if (rd->tiers[tier] && !rd->tiers[tier]->db_collection_handle)
rd->tiers[tier]->db_collection_handle =
- rd->tiers[tier]->collect_ops->init(rd->tiers[tier]->db_metric_handle, st->rrdhost->db[tier].tier_grouping * st->update_every);
+ rd->tiers[tier]->collect_ops->init(rd->tiers[tier]->db_metric_handle, st->rrdhost->db[tier].tier_grouping * st->update_every, rd->rrdset->storage_metrics_groups[tier]);
}
if(rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
diff --git a/database/sqlite/sqlite_context.c b/database/sqlite/sqlite_context.c
index 9c7a61c6e..deca84584 100644
--- a/database/sqlite/sqlite_context.c
+++ b/database/sqlite/sqlite_context.c
@@ -449,7 +449,9 @@ skip_delete:
int sql_context_cache_stats(int op)
{
int count, dummy;
+ netdata_thread_disable_cancelability();
sqlite3_db_status(db_context_meta, op, &count, &dummy, 0);
+ netdata_thread_enable_cancelability();
return count;
}
diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c
index eeb3c3822..ce5487fbf 100644
--- a/database/sqlite/sqlite_functions.c
+++ b/database/sqlite/sqlite_functions.c
@@ -1263,7 +1263,9 @@ int bind_text_null(sqlite3_stmt *res, int position, const char *text, bool can_b
int sql_metadata_cache_stats(int op)
{
int count, dummy;
+ netdata_thread_disable_cancelability();
sqlite3_db_status(db_meta, op, &count, &dummy, 0);
+ netdata_thread_enable_cancelability();
return count;
}
diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c
index f03da25ab..0277e067f 100644
--- a/libnetdata/dictionary/dictionary.c
+++ b/libnetdata/dictionary/dictionary.c
@@ -3,10 +3,9 @@
#define DICTIONARY_INTERNALS
#include "../libnetdata.h"
-#include <Judy.h>
// runtime flags of the dictionary - must be checked with atomics
-typedef enum {
+typedef enum __attribute__ ((__packed__)) {
DICT_FLAG_NONE = 0,
DICT_FLAG_DESTROYED = (1 << 0), // this dictionary has been destroyed
} DICT_FLAGS;
@@ -23,14 +22,14 @@ typedef enum {
#define is_view_dictionary(dict) ((dict)->master)
#define is_master_dictionary(dict) (!is_view_dictionary(dict))
-typedef enum item_options {
+typedef enum __attribute__ ((__packed__)) item_options {
ITEM_OPTION_NONE = 0,
ITEM_OPTION_ALLOCATED_NAME = (1 << 0), // the name pointer is a STRING
// IMPORTANT: This is 1-bit - to add more change ITEM_OPTIONS_BITS
} ITEM_OPTIONS;
-typedef enum item_flags {
+typedef enum __attribute__ ((__packed__)) item_flags {
ITEM_FLAG_NONE = 0,
ITEM_FLAG_DELETED = (1 << 0), // this item is marked deleted, so it is not available for traversal (deleted from the index too)
ITEM_FLAG_BEING_CREATED = (1 << 1), // this item is currently being created - this flag is removed when construction finishes
@@ -623,7 +622,7 @@ static void dictionary_execute_delete_callback(DICTIONARY *dict, DICTIONARY_ITEM
if(likely(!dict->hooks || !dict->hooks->del_callback))
return;
- // We may execute the delete callback on items deleted from a view,
+ // We may execute delete callback on items deleted from a view,
// because we may have references to it, after the master is gone
// so, the shared structure will remain until the last reference is released.
@@ -782,7 +781,7 @@ static void garbage_collect_pending_deletes(DICTIONARY *dict) {
while(item) {
examined++;
- // this will cleanup
+ // this will clean up
item_next = item->next;
int rc = item_check_and_acquire_advanced(dict, item, is_view);
@@ -882,7 +881,7 @@ static void item_acquire(DICTIONARY *dict, DICTIONARY_ITEM *item) {
static void item_release(DICTIONARY *dict, DICTIONARY_ITEM *item) {
// this function may be called without any lock on the dictionary
- // or even when someone else has write lock on the dictionary
+ // or even when someone else has 'write' lock on the dictionary
bool is_deleted;
REFCOUNT refcount;
@@ -934,11 +933,11 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it
int ret = RC_ITEM_OK;
+ refcount = DICTIONARY_ITEM_REFCOUNT_GET(dict, item);
+
do {
spins++;
- refcount = DICTIONARY_ITEM_REFCOUNT_GET(dict, item);
-
if(refcount < 0) {
// we can't use this item
ret = RC_ITEM_IS_CURRENTLY_BEING_DELETED;
@@ -953,8 +952,7 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it
desired = refcount + 1;
- } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired,
- false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
+ } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
// if ret == ITEM_OK, we acquired the item
@@ -971,7 +969,7 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it
else
pointer_del(dict, item);
- // mark it in our dictionary as deleted too
+ // mark it in our dictionary as deleted too,
// this is safe to be done here, because we have got
// a reference counter on item
dict_item_set_deleted(dict, item);
@@ -1013,11 +1011,11 @@ static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY
int ret = RC_ITEM_OK;
+ refcount = DICTIONARY_ITEM_REFCOUNT_GET(dict, item);
+
do {
spins++;
- refcount = DICTIONARY_ITEM_REFCOUNT_GET(dict, item);
-
if(refcount < 0) {
// we can't use this item
ret = RC_ITEM_IS_CURRENTLY_BEING_DELETED;
@@ -1035,8 +1033,7 @@ static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY
ret = RC_ITEM_IS_CURRENTLY_BEING_CREATED;
break;
}
- } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired,
- false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
+ } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
#ifdef NETDATA_INTERNAL_CHECKS
if(ret == RC_ITEM_OK)
@@ -1055,8 +1052,7 @@ static inline bool item_shared_release_and_check_if_it_can_be_freed(DICTIONARY *
// if we can set refcount to REFCOUNT_DELETING, we can delete this item
REFCOUNT links = __atomic_sub_fetch(&item->shared->links, 1, __ATOMIC_SEQ_CST);
- if(links == 0 && __atomic_compare_exchange_n(&item->shared->links, &links, REFCOUNT_DELETING,
- false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
+ if(links == 0 && __atomic_compare_exchange_n(&item->shared->links, &links, REFCOUNT_DELETING, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
// we can delete it
return true;
@@ -1430,16 +1426,16 @@ static void dict_item_shared_set_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item
static bool dict_item_set_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item) {
ITEM_FLAGS expected, desired;
+ expected = __atomic_load_n(&item->flags, __ATOMIC_SEQ_CST);
+
do {
- expected = __atomic_load_n(&item->flags, __ATOMIC_SEQ_CST);
if (expected & ITEM_FLAG_DELETED)
return false;
desired = expected | ITEM_FLAG_DELETED;
- } while(!__atomic_compare_exchange_n(&item->flags, &expected, desired,
- false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
+ } while(!__atomic_compare_exchange_n(&item->flags, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
DICTIONARY_ENTRIES_MINUS1(dict);
return true;
@@ -1474,7 +1470,7 @@ static inline void dict_item_free_or_mark_deleted(DICTIONARY *dict, DICTIONARY_I
}
// this is used by traversal functions to remove the current item
-// if it is deleted and it has zero references. This will eliminate
+// if it is deleted, and it has zero references. This will eliminate
// the need for the garbage collector to kick-in later.
// Most deletions happen during traversal, so this is a nice hack
// to speed up everything!
@@ -1601,7 +1597,7 @@ static DICTIONARY_ITEM *dict_item_add_or_reset_value_and_acquire(DICTIONARY *dic
hashtable_inserted_item_unsafe(dict, item);
// unlock the index lock, before we add it to the linked list
- // DONT DO IT THE OTHER WAY AROUND - DO NOT CROSS THE LOCKS!
+ // DON'T DO IT THE OTHER WAY AROUND - DO NOT CROSS THE LOCKS!
dictionary_index_wrlock_unlock(dict);
item_linked_list_add(dict, item);
diff --git a/libnetdata/procfile/procfile.c b/libnetdata/procfile/procfile.c
index b4ca025ec..eb04316c3 100644
--- a/libnetdata/procfile/procfile.c
+++ b/libnetdata/procfile/procfile.c
@@ -22,16 +22,21 @@ size_t procfile_max_allocation = PROCFILE_INCREMENT_BUFFER;
// ----------------------------------------------------------------------------
char *procfile_filename(procfile *ff) {
- if(ff->filename[0]) return ff->filename;
+ if(ff->filename)
+ return ff->filename;
+ char filename[FILENAME_MAX + 1];
char buffer[FILENAME_MAX + 1];
snprintfz(buffer, FILENAME_MAX, "/proc/self/fd/%d", ff->fd);
- ssize_t l = readlink(buffer, ff->filename, FILENAME_MAX);
+ ssize_t l = readlink(buffer, filename, FILENAME_MAX);
if(unlikely(l == -1))
- snprintfz(ff->filename, FILENAME_MAX, "unknown filename for fd %d", ff->fd);
+ snprintfz(filename, FILENAME_MAX, "unknown filename for fd %d", ff->fd);
else
- ff->filename[l] = '\0';
+ filename[l] = '\0';
+
+
+ ff->filename = strdupz(filename);
// on non-linux systems, something like this will be needed
// fcntl(ff->fd, F_GETPATH, ff->filename)
@@ -141,8 +146,9 @@ void procfile_close(procfile *ff) {
debug(D_PROCFILE, PF_PREFIX ": Closing file '%s'", procfile_filename(ff));
- if(likely(ff->lines)) procfile_lines_free(ff->lines);
- if(likely(ff->words)) procfile_words_free(ff->words);
+ freez(ff->filename);
+ procfile_lines_free(ff->lines);
+ procfile_words_free(ff->words);
if(likely(ff->fd != -1)) close(ff->fd);
freez(ff);
@@ -319,40 +325,31 @@ procfile *procfile_readall(procfile *ff) {
return ff;
}
-NOINLINE
-static void procfile_set_separators(procfile *ff, const char *separators) {
- static PF_CHAR_TYPE def[256];
- static char initialized = 0;
-
- if(unlikely(!initialized)) {
- // this is thread safe
- // if initialized is zero, multiple threads may be executing
- // this code at the same time, setting in def[] the exact same values
- int i = 256;
- while(i--) {
- if(unlikely(i == '\n' || i == '\r'))
- def[i] = PF_CHAR_IS_NEWLINE;
+static PF_CHAR_TYPE procfile_default_separators[256];
+__attribute__((constructor)) void procfile_initialize_default_separators(void) {
+ int i = 256;
+ while(i--) {
+ if(unlikely(i == '\n' || i == '\r'))
+ procfile_default_separators[i] = PF_CHAR_IS_NEWLINE;
- else if(unlikely(isspace(i) || !isprint(i)))
- def[i] = PF_CHAR_IS_SEPARATOR;
+ else if(unlikely(isspace(i) || !isprint(i)))
+ procfile_default_separators[i] = PF_CHAR_IS_SEPARATOR;
- else
- def[i] = PF_CHAR_IS_WORD;
- }
-
- initialized = 1;
+ else
+ procfile_default_separators[i] = PF_CHAR_IS_WORD;
}
+}
- // copy the default
- PF_CHAR_TYPE *ffs = ff->separators, *ffd = def, *ffe = &def[256];
- while(ffd != ffe)
- *ffs++ = *ffd++;
-
+NOINLINE
+static void procfile_set_separators(procfile *ff, const char *separators) {
// set the separators
if(unlikely(!separators))
separators = " \t=|";
- ffs = ff->separators;
+ // copy the default
+ memcpy(ff->separators, procfile_default_separators, 256 * sizeof(PF_CHAR_TYPE));
+
+ PF_CHAR_TYPE *ffs = ff->separators;
const char *s = separators;
while(*s)
ffs[(int)*s++] = PF_CHAR_IS_SEPARATOR;
@@ -416,8 +413,7 @@ procfile *procfile_open(const char *filename, const char *separators, uint32_t f
procfile *ff = mallocz(sizeof(procfile) + size);
//strncpyz(ff->filename, filename, FILENAME_MAX);
- ff->filename[0] = '\0';
-
+ ff->filename = NULL;
ff->fd = fd;
ff->size = size;
ff->len = 0;
@@ -449,7 +445,8 @@ procfile *procfile_reopen(procfile *ff, const char *filename, const char *separa
// info("PROCFILE: opened '%s' on fd %d", filename, ff->fd);
//strncpyz(ff->filename, filename, FILENAME_MAX);
- ff->filename[0] = '\0';
+ freez(ff->filename);
+ ff->filename = NULL;
ff->flags = flags;
// do not do the separators again if NULL is given
diff --git a/libnetdata/procfile/procfile.h b/libnetdata/procfile/procfile.h
index 5d45e4028..cae4ad484 100644
--- a/libnetdata/procfile/procfile.h
+++ b/libnetdata/procfile/procfile.h
@@ -37,7 +37,7 @@ typedef struct {
#define PROCFILE_FLAG_DEFAULT 0x00000000
#define PROCFILE_FLAG_NO_ERROR_ON_FILE_IO 0x00000001
-typedef enum procfile_separator {
+typedef enum __attribute__ ((__packed__)) procfile_separator {
PF_CHAR_IS_SEPARATOR,
PF_CHAR_IS_NEWLINE,
PF_CHAR_IS_WORD,
@@ -46,17 +46,16 @@ typedef enum procfile_separator {
PF_CHAR_IS_CLOSE
} PF_CHAR_TYPE;
-typedef struct {
- char filename[FILENAME_MAX + 1]; // not populated until profile_filename() is called
-
+typedef struct procfile {
+ char *filename; // not populated until procfile_filename() is called
uint32_t flags;
- int fd; // the file descriptor
- size_t len; // the bytes we have placed into data
- size_t size; // the bytes we have allocated for data
+ int fd; // the file descriptor
+ size_t len; // the bytes we have placed into data
+ size_t size; // the bytes we have allocated for data
pflines *lines;
pfwords *words;
PF_CHAR_TYPE separators[256];
- char data[]; // allocated buffer to keep file contents
+ char data[]; // allocated buffer to keep file contents
} procfile;
// close the proc file and free all related memory
diff --git a/libnetdata/socket/security.c b/libnetdata/socket/security.c
index f7b44049b..88b3f6d93 100644
--- a/libnetdata/socket/security.c
+++ b/libnetdata/socket/security.c
@@ -204,31 +204,43 @@ static SSL_CTX * security_initialize_openssl_server() {
* NETDATA_SSL_CONTEXT_EXPORTING - Starts the OpenTSDB context
*/
void security_start_ssl(int selector) {
+ static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
+ netdata_spinlock_lock(&sp);
+
switch (selector) {
case NETDATA_SSL_CONTEXT_SERVER: {
- struct stat statbuf;
- if (stat(netdata_ssl_security_key, &statbuf) || stat(netdata_ssl_security_cert, &statbuf)) {
- info("To use encryption it is necessary to set \"ssl certificate\" and \"ssl key\" in [web] !\n");
- return;
+ if(!netdata_ssl_srv_ctx) {
+ struct stat statbuf;
+ if (stat(netdata_ssl_security_key, &statbuf) || stat(netdata_ssl_security_cert, &statbuf))
+ info("To use encryption it is necessary to set \"ssl certificate\" and \"ssl key\" in [web] !\n");
+ else {
+ netdata_ssl_srv_ctx = security_initialize_openssl_server();
+ SSL_CTX_set_mode(netdata_ssl_srv_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE);
+ }
}
-
- netdata_ssl_srv_ctx = security_initialize_openssl_server();
- SSL_CTX_set_mode(netdata_ssl_srv_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE);
break;
}
+
case NETDATA_SSL_CONTEXT_STREAMING: {
- netdata_ssl_client_ctx = security_initialize_openssl_client();
- //This is necessary for the stream, because it is working sometimes with nonblock socket.
- //It returns the bitmask after to change, there is not any description of errors in the documentation
- SSL_CTX_set_mode(
- netdata_ssl_client_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE |SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER |SSL_MODE_AUTO_RETRY);
+ if(!netdata_ssl_client_ctx) {
+ netdata_ssl_client_ctx = security_initialize_openssl_client();
+ //This is necessary for the stream, because it is working sometimes with nonblock socket.
+ //It returns the bitmask after to change, there is not any description of errors in the documentation
+ SSL_CTX_set_mode(netdata_ssl_client_ctx,
+ SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER |
+ SSL_MODE_AUTO_RETRY);
+ }
break;
}
+
case NETDATA_SSL_CONTEXT_EXPORTING: {
- netdata_ssl_exporting_ctx = security_initialize_openssl_client();
+ if(!netdata_ssl_exporting_ctx)
+ netdata_ssl_exporting_ctx = security_initialize_openssl_client();
break;
}
}
+
+ netdata_spinlock_unlock(&sp);
}
/**
diff --git a/libnetdata/string/string.c b/libnetdata/string/string.c
index a3f74b4ef..d2db8aab4 100644
--- a/libnetdata/string/string.c
+++ b/libnetdata/string/string.c
@@ -71,11 +71,12 @@ void string_statistics(size_t *inserts, size_t *deletes, size_t *searches, size_
static inline bool string_entry_check_and_acquire(STRING *se) {
REFCOUNT expected, desired, count = 0;
+
+ expected = __atomic_load_n(&se->refcount, __ATOMIC_SEQ_CST);
+
do {
count++;
- expected = __atomic_load_n(&se->refcount, __ATOMIC_SEQ_CST);
-
if(expected <= 0) {
// We cannot use this.
// The reference counter reached value zero,
@@ -85,8 +86,8 @@ static inline bool string_entry_check_and_acquire(STRING *se) {
}
desired = expected + 1;
- }
- while(!__atomic_compare_exchange_n(&se->refcount, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
+
+ } while(!__atomic_compare_exchange_n(&se->refcount, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
string_internal_stats_add(spins, count - 1);
diff --git a/libnetdata/worker_utilization/worker_utilization.c b/libnetdata/worker_utilization/worker_utilization.c
index 14b8926e0..afaff209b 100644
--- a/libnetdata/worker_utilization/worker_utilization.c
+++ b/libnetdata/worker_utilization/worker_utilization.c
@@ -44,35 +44,55 @@ struct worker {
struct worker *prev;
};
-static netdata_mutex_t workers_base_lock = NETDATA_MUTEX_INITIALIZER;
-static __thread struct worker *worker = NULL;
-static Pvoid_t workers_per_workname_JudyHS_array = NULL;
+struct workers_workname { // this is what we add to JudyHS
+ SPINLOCK spinlock;
+ struct worker *base;
+};
+
+static struct workers_globals {
+ SPINLOCK spinlock;
+ Pvoid_t worknames_JudyHS;
+
+} workers_globals = { // workers globals, the base of all worknames
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER, // a lock for the worknames index
+ .worknames_JudyHS = NULL, // the worknames index
+};
-void worker_register(const char *workname) {
+static __thread struct worker *worker = NULL; // the current thread worker
+
+void worker_register(const char *name) {
if(unlikely(worker)) return;
worker = callocz(1, sizeof(struct worker));
worker->pid = gettid();
worker->tag = strdupz(netdata_thread_tag());
- worker->workname = strdupz(workname);
+ worker->workname = strdupz(name);
usec_t now = now_monotonic_usec();
worker->statistics_last_checkpoint = now;
worker->last_action_timestamp = now;
worker->last_action = WORKER_IDLE;
- size_t workname_size = strlen(workname) + 1;
- netdata_mutex_lock(&workers_base_lock);
+ size_t name_size = strlen(name) + 1;
+ netdata_spinlock_lock(&workers_globals.spinlock);
- Pvoid_t *PValue = JudyHSGet(workers_per_workname_JudyHS_array, (void *)workname, workname_size);
+ Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)name, name_size);
if(!PValue)
- PValue = JudyHSIns(&workers_per_workname_JudyHS_array, (void *)workname, workname_size, PJE0);
+ PValue = JudyHSIns(&workers_globals.worknames_JudyHS, (void *)name, name_size, PJE0);
+
+ struct workers_workname *workname = *PValue;
+ if(!workname) {
+ workname = mallocz(sizeof(struct workers_workname));
+ workname->spinlock = NETDATA_SPINLOCK_INITIALIZER;
+ workname->base = NULL;
+ *PValue = workname;
+ }
- struct worker *base = *PValue;
- DOUBLE_LINKED_LIST_APPEND_UNSAFE(base, worker, prev, next);
- *PValue = base;
+ netdata_spinlock_lock(&workname->spinlock);
+ DOUBLE_LINKED_LIST_APPEND_UNSAFE(workname->base, worker, prev, next);
+ netdata_spinlock_unlock(&workname->spinlock);
- netdata_mutex_unlock(&workers_base_lock);
+ netdata_spinlock_unlock(&workers_globals.spinlock);
}
void worker_register_job_custom_metric(size_t job_id, const char *name, const char *units, WORKER_METRIC_TYPE type) {
@@ -105,17 +125,20 @@ void worker_unregister(void) {
if(unlikely(!worker)) return;
size_t workname_size = strlen(worker->workname) + 1;
- netdata_mutex_lock(&workers_base_lock);
- Pvoid_t *PValue = JudyHSGet(workers_per_workname_JudyHS_array, (void *)worker->workname, workname_size);
+ netdata_spinlock_lock(&workers_globals.spinlock);
+ Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)worker->workname, workname_size);
if(PValue) {
- struct worker *base = *PValue;
- DOUBLE_LINKED_LIST_REMOVE_UNSAFE(base, worker, prev, next);
- *PValue = base;
-
- if(!base)
- JudyHSDel(&workers_per_workname_JudyHS_array, (void *)worker->workname, workname_size, PJE0);
+ struct workers_workname *workname = *PValue;
+ netdata_spinlock_lock(&workname->spinlock);
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(workname->base, worker, prev, next);
+ netdata_spinlock_unlock(&workname->spinlock);
+
+ if(!workname->base) {
+ JudyHSDel(&workers_globals.worknames_JudyHS, (void *) worker->workname, workname_size, PJE0);
+ freez(workname);
+ }
}
- netdata_mutex_unlock(&workers_base_lock);
+ netdata_spinlock_unlock(&workers_globals.spinlock);
for(int i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES ;i++) {
string_freez(worker->per_job_type[i].name);
@@ -187,7 +210,7 @@ void worker_set_metric(size_t job_id, NETDATA_DOUBLE value) {
// statistics interface
-void workers_foreach(const char *workname, void (*callback)(
+void workers_foreach(const char *name, void (*callback)(
void *data
, pid_t pid
, const char *thread_tag
@@ -203,18 +226,27 @@ void workers_foreach(const char *workname, void (*callback)(
, NETDATA_DOUBLE *job_custom_values
)
, void *data) {
- netdata_mutex_lock(&workers_base_lock);
+ netdata_spinlock_lock(&workers_globals.spinlock);
usec_t busy_time, delta;
size_t i, jobs_started, jobs_running;
- size_t workname_size = strlen(workname) + 1;
- struct worker *base = NULL;
- Pvoid_t *PValue = JudyHSGet(workers_per_workname_JudyHS_array, (void *)workname, workname_size);
- if(PValue)
- base = *PValue;
+ size_t workname_size = strlen(name) + 1;
+ struct workers_workname *workname;
+ Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)name, workname_size);
+ if(PValue) {
+ workname = *PValue;
+ netdata_spinlock_lock(&workname->spinlock);
+ }
+ else
+ workname = NULL;
+
+ netdata_spinlock_unlock(&workers_globals.spinlock);
+
+ if(!workname)
+ return;
struct worker *p;
- DOUBLE_LINKED_LIST_FOREACH_FORWARD(base, p, prev, next) {
+ DOUBLE_LINKED_LIST_FOREACH_FORWARD(workname->base, p, prev, next) {
usec_t now = now_monotonic_usec();
// find per job type statistics
@@ -326,5 +358,5 @@ void workers_foreach(const char *workname, void (*callback)(
);
}
- netdata_mutex_unlock(&workers_base_lock);
+ netdata_spinlock_unlock(&workname->spinlock);
}
diff --git a/libnetdata/worker_utilization/worker_utilization.h b/libnetdata/worker_utilization/worker_utilization.h
index 04d24f1f7..f1412e6b4 100644
--- a/libnetdata/worker_utilization/worker_utilization.h
+++ b/libnetdata/worker_utilization/worker_utilization.h
@@ -15,7 +15,7 @@ typedef enum {
WORKER_METRIC_INCREMENTAL_TOTAL = 4,
} WORKER_METRIC_TYPE;
-void worker_register(const char *workname);
+void worker_register(const char *name);
void worker_register_job_name(size_t job_id, const char *name);
void worker_register_job_custom_metric(size_t job_id, const char *name, const char *units, WORKER_METRIC_TYPE type);
void worker_unregister(void);
@@ -26,7 +26,7 @@ void worker_set_metric(size_t job_id, NETDATA_DOUBLE value);
// statistics interface
-void workers_foreach(const char *workname, void (*callback)(
+void workers_foreach(const char *name, void (*callback)(
void *data
, pid_t pid
, const char *thread_tag
diff --git a/packaging/version b/packaging/version
index b909b32cd..094f172cf 100644
--- a/packaging/version
+++ b/packaging/version
@@ -1 +1 @@
-v1.37.0
+v1.37.1
diff --git a/streaming/replication.c b/streaming/replication.c
index 8fa501061..d659d701d 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -4,7 +4,7 @@
#include "Judy.h"
#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
-#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
+#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
#define WORKER_JOB_FIND_NEXT 1
@@ -14,17 +14,17 @@
#define WORKER_JOB_CHECK_CONSISTENCY 5
#define WORKER_JOB_BUFFER_COMMIT 6
#define WORKER_JOB_CLEANUP 7
+#define WORKER_JOB_WAIT 8
// master thread worker jobs
-#define WORKER_JOB_STATISTICS 8
-#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 9
-#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 10
-#define WORKER_JOB_CUSTOM_METRIC_ADDED 11
-#define WORKER_JOB_CUSTOM_METRIC_DONE 12
-#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 13
-#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 14
-#define WORKER_JOB_CUSTOM_METRIC_WAITS 15
-#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
+#define WORKER_JOB_STATISTICS 9
+#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 10
+#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 11
+#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 12
+#define WORKER_JOB_CUSTOM_METRIC_ADDED 13
+#define WORKER_JOB_CUSTOM_METRIC_DONE 14
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 15
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 16
#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
#define SECONDS_TO_RESET_POINT_IN_TIME 10
@@ -591,6 +591,7 @@ struct replication_request {
Word_t unique_id; // auto-increment, later requests have bigger
bool found; // used as a result boolean for the find call
bool indexed_in_judy; // true when the request is indexed in judy
+ bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full
};
// replication sort entry in JudyL array
@@ -605,7 +606,7 @@ struct replication_sort_entry {
// the global variables for the replication thread
static struct replication_thread {
- netdata_mutex_t mutex;
+ SPINLOCK spinlock;
struct {
size_t pending; // number of requests pending in the queue
@@ -614,9 +615,8 @@ static struct replication_thread {
// statistics
size_t added; // number of requests added to the queue
size_t removed; // number of requests removed from the queue
- size_t skipped_not_connected; // number of requests skipped, because the sender is not connected to a parent
- size_t skipped_no_room; // number of requests skipped, because the sender has no room for responses
-// size_t skipped_no_room_since_last_reset;
+ size_t pending_no_room; // number of requests skipped, because the sender has no room for responses
+ size_t senders_full; // number of times a sender reset our last position in the queue
size_t sender_resets; // number of times a sender reset our last position in the queue
time_t first_time_t; // the minimum 'after' we encountered
@@ -634,7 +634,6 @@ static struct replication_thread {
} atomic; // access should be with atomic operations
struct {
- size_t waits;
size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
netdata_thread_t **threads_ptrs;
@@ -642,17 +641,16 @@ static struct replication_thread {
} main_thread; // access is allowed only by the main thread
} replication_globals = {
- .mutex = NETDATA_MUTEX_INITIALIZER,
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
.unsafe = {
.pending = 0,
.unique_id = 0,
.added = 0,
.removed = 0,
- .skipped_not_connected = 0,
- .skipped_no_room = 0,
-// .skipped_no_room_since_last_reset = 0,
+ .pending_no_room = 0,
.sender_resets = 0,
+ .senders_full = 0,
.first_time_t = 0,
@@ -667,7 +665,6 @@ static struct replication_thread {
.latest_first_time = 0,
},
.main_thread = {
- .waits = 0,
.last_executed = 0,
.threads = 0,
.threads_ptrs = NULL,
@@ -682,11 +679,11 @@ static inline bool replication_recursive_lock_mode(char mode) {
if(mode == 'L') { // (L)ock
if(++recursions == 1)
- netdata_mutex_lock(&replication_globals.mutex);
+ netdata_spinlock_lock(&replication_globals.spinlock);
}
else if(mode == 'U') { // (U)nlock
if(--recursions == 0)
- netdata_mutex_unlock(&replication_globals.mutex);
+ netdata_spinlock_unlock(&replication_globals.spinlock);
}
else if(mode == 'C') { // (C)heck
if(recursions > 0)
@@ -736,6 +733,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
// save the unique id into the request, to be able to delete it later
rq->unique_id = rse->unique_id;
rq->indexed_in_judy = false;
+ rq->not_indexed_buffer_full = false;
return rse;
}
@@ -743,9 +741,20 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
freez(rse);
}
-static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) {
+static void replication_sort_entry_add(struct replication_request *rq) {
replication_recursive_lock();
+ if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
+ rq->indexed_in_judy = false;
+ rq->not_indexed_buffer_full = true;
+ replication_globals.unsafe.pending_no_room++;
+ replication_recursive_unlock();
+ return;
+ }
+
+ if(rq->not_indexed_buffer_full)
+ replication_globals.unsafe.pending_no_room--;
+
struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq);
// if(rq->after < (time_t)replication_globals.protected.queue.after &&
@@ -770,13 +779,12 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
*item = rse;
rq->indexed_in_judy = true;
+ rq->not_indexed_buffer_full = false;
if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
replication_globals.unsafe.first_time_t = rq->after;
replication_recursive_unlock();
-
- return rse;
}
static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) {
@@ -806,7 +814,7 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor
return inner_judy_deleted;
}
-static void replication_sort_entry_del(struct replication_request *rq) {
+static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) {
Pvoid_t *inner_judy_pptr;
struct replication_sort_entry *rse_to_delete = NULL;
@@ -819,6 +827,11 @@ static void replication_sort_entry_del(struct replication_request *rq) {
if (our_item_pptr) {
rse_to_delete = *our_item_pptr;
replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr);
+
+ if(buffer_full) {
+ replication_globals.unsafe.pending_no_room++;
+ rq->not_indexed_buffer_full = true;
+ }
}
}
@@ -877,44 +890,17 @@ static struct replication_request replication_request_get_first_available() {
while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
struct replication_sort_entry *rse = *our_item_pptr;
struct replication_request *rq = rse->rq;
- struct sender_state *s = rq->sender;
-
- if (likely(rrdpush_sender_get_buffer_used_percent(s) <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) {
- // there is room for this request in the sender buffer
-
- bool sender_is_connected =
- rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
-
- bool sender_has_been_flushed_since_this_request =
- rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
- if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
- // skip this request, the sender is not connected, or it has reconnected
+ // copy the request to return it
+ rq_to_return = *rq;
+ rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
- replication_globals.unsafe.skipped_not_connected++;
- if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
- // we removed the item from the outer JudyL
- break;
- }
- else {
- // this request is good to execute
+ // set the return result to found
+ rq_to_return.found = true;
- // copy the request to return it
- rq_to_return = *rq;
- rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
-
- // set the return result to found
- rq_to_return.found = true;
-
- if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
- // we removed the item from the outer JudyL
- break;
- }
- }
- else {
- replication_globals.unsafe.skipped_no_room++;
-// replication_globals.protected.skipped_no_room_since_last_reset++;
- }
+ if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ // we removed the item from the outer JudyL
+ break;
}
// call JudyLNext from now on
@@ -959,7 +945,20 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __
replication_recursive_lock();
- if(!rq->indexed_in_judy) {
+ if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) {
+ // we can replace this command
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
+ (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+
+ rq->after = rq_new->after;
+ rq->before = rq_new->before;
+ rq->start_streaming = rq_new->start_streaming;
+ }
+ else if(!rq->indexed_in_judy) {
replication_sort_entry_add(rq);
internal_error(
true,
@@ -991,7 +990,13 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma
rrdpush_sender_replicating_charts_minus_one(rq->sender);
if(rq->indexed_in_judy)
- replication_sort_entry_del(rq);
+ replication_sort_entry_del(rq, false);
+
+ else if(rq->not_indexed_buffer_full) {
+ replication_recursive_lock();
+ replication_globals.unsafe.pending_no_room--;
+ replication_recursive_unlock();
+ }
string_freez(rq->chart_id);
}
@@ -1046,6 +1051,7 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
cleanup:
string_freez(rq->chart_id);
+ worker_is_idle();
return ret;
}
@@ -1060,6 +1066,8 @@ void replication_add_request(struct sender_state *sender, const char *chart_id,
.before = before,
.start_streaming = start_streaming,
.sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
+ .indexed_in_judy = false,
+ .not_indexed_buffer_full = false,
};
if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
@@ -1094,15 +1102,36 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
- if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
- s->replication.unsafe.reached_max = true;
+ if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) {
+ rrdpush_sender_replication_buffer_full_set(s, true);
+
+ struct replication_request *rq;
+ dfe_start_read(s->replication.requests, rq) {
+ if(rq->indexed_in_judy && !rq->not_indexed_buffer_full) {
+ replication_sort_entry_del(rq, true);
+ }
+ }
+ dfe_done(rq);
- if(s->replication.unsafe.reached_max &&
- percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) {
- s->replication.unsafe.reached_max = false;
replication_recursive_lock();
-// replication_set_next_point_in_time(0, 0);
+ replication_globals.unsafe.senders_full++;
+ replication_recursive_unlock();
+ }
+ else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) {
+ rrdpush_sender_replication_buffer_full_set(s, false);
+
+ struct replication_request *rq;
+ dfe_start_read(s->replication.requests, rq) {
+ if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) {
+ replication_sort_entry_add(rq);
+ }
+ }
+ dfe_done(rq);
+
+ replication_recursive_lock();
+ replication_globals.unsafe.senders_full--;
replication_globals.unsafe.sender_resets++;
+ // replication_set_next_point_in_time(0, 0);
replication_recursive_unlock();
}
@@ -1188,17 +1217,17 @@ static void replication_initialize_workers(bool master) {
worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
+ worker_register_job_name(WORKER_JOB_WAIT, "wait");
if(master) {
worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE);
}
}
@@ -1210,8 +1239,10 @@ static int replication_execute_next_pending_request(void) {
worker_is_busy(WORKER_JOB_FIND_NEXT);
struct replication_request rq = replication_request_get_first_available();
- if(unlikely(!rq.found))
+ if(unlikely(!rq.found)) {
+ worker_is_idle();
return REQUEST_QUEUE_EMPTY;
+ }
// delete the request from the dictionary
worker_is_busy(WORKER_JOB_DELETE_ENTRY);
@@ -1221,9 +1252,12 @@ static int replication_execute_next_pending_request(void) {
replication_set_latest_first_time(rq.after);
- if(unlikely(!replication_execute_request(&rq, true)))
+ if(unlikely(!replication_execute_request(&rq, true))) {
+ worker_is_idle();
return REQUEST_CHART_NOT_FOUND;
+ }
+ worker_is_idle();
return REQUEST_OK;
}
@@ -1238,6 +1272,7 @@ static void *replication_worker_thread(void *ptr) {
while(!netdata_exit) {
if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+ worker_is_busy(WORKER_JOB_WAIT);
worker_is_idle();
sleep_usec(1 * USEC_PER_SEC);
}
@@ -1305,6 +1340,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
last_now_mono_ut = now_mono_ut;
+ worker_is_busy(WORKER_JOB_STATISTICS);
replication_recursive_lock();
size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
@@ -1321,19 +1357,21 @@ void *replication_thread_main(void *ptr __maybe_unused) {
replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
}
- if(!replication_globals.unsafe.pending && --run_verification_countdown == 0) {
- // reset the statistics about completion percentage
- replication_globals.unsafe.first_time_t = 0;
- replication_set_latest_first_time(0);
+ if(--run_verification_countdown == 0) {
+ if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) {
+ // reset the statistics about completion percentage
+ replication_globals.unsafe.first_time_t = 0;
+ replication_set_latest_first_time(0);
- verify_all_hosts_charts_are_streaming_now();
+ verify_all_hosts_charts_are_streaming_now();
- run_verification_countdown = LONG_MAX;
- slow = true;
+ run_verification_countdown = LONG_MAX;
+ slow = true;
+ }
+ else
+ run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
}
- worker_is_busy(WORKER_JOB_STATISTICS);
-
time_t latest_first_time_t = replication_get_latest_first_time();
if(latest_first_time_t && replication_globals.unsafe.pending) {
// completion percentage statistics
@@ -1349,15 +1387,17 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED));
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_not_connected);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_no_room);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)replication_globals.main_thread.waits);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full);
replication_recursive_unlock();
+ worker_is_idle();
}
if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+
+ worker_is_busy(WORKER_JOB_WAIT);
replication_recursive_lock();
// the timeout also defines now frequently we will traverse all the pending requests
@@ -1388,7 +1428,6 @@ void *replication_thread_main(void *ptr __maybe_unused) {
last_sender_resets = replication_globals.unsafe.sender_resets;
}
- replication_globals.main_thread.waits++;
replication_recursive_unlock();
worker_is_idle();
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index c5f7618c1..a0c7e8de2 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -168,12 +168,9 @@ struct sender_state {
struct {
size_t pending_requests; // the currently outstanding replication requests
size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
+ bool reached_max; // true when the sender buffer should not get more replication responses
} atomic;
- struct {
- bool reached_max; // used to avoid resetting the replication thread too frequently
- } unsafe; // protected by sender mutex
-
} replication;
struct {
@@ -182,10 +179,13 @@ struct sender_state {
} atomic;
};
-#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED);
+#define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
+#define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
+
+#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED)
#define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
-#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED);
+#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED)
#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
diff --git a/streaming/sender.c b/streaming/sender.c
index 8e637d2bd..62097e39f 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -1114,9 +1114,14 @@ void *rrdpush_sender_thread(void *ptr) {
}
#ifdef ENABLE_HTTPS
- if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ){
- security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING);
- ssl_security_location_for_context(netdata_ssl_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
+ if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ) {
+ static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
+ netdata_spinlock_lock(&sp);
+ if(!netdata_ssl_client_ctx) {
+ security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING);
+ ssl_security_location_for_context(netdata_ssl_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
+ }
+ netdata_spinlock_unlock(&sp);
}
#endif
diff --git a/web/api/queries/query.c b/web/api/queries/query.c
index ccd195135..0365b6e96 100644
--- a/web/api/queries/query.c
+++ b/web/api/queries/query.c
@@ -1496,15 +1496,15 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now)
struct storage_engine_query_handle handle;
// for each lower tier
- for(int tr = (int)tier - 1; tr >= 0 ;tr--){
- time_t smaller_tier_first_time = rd->tiers[tr]->query_ops->oldest_time(rd->tiers[tr]->db_metric_handle);
- time_t smaller_tier_last_time = rd->tiers[tr]->query_ops->latest_time(rd->tiers[tr]->db_metric_handle);
+ for(int read_tier = (int)tier - 1; read_tier >= 0 ; read_tier--){
+ time_t smaller_tier_first_time = rd->tiers[read_tier]->query_ops->oldest_time(rd->tiers[read_tier]->db_metric_handle);
+ time_t smaller_tier_last_time = rd->tiers[read_tier]->query_ops->latest_time(rd->tiers[read_tier]->db_metric_handle);
if(smaller_tier_last_time <= latest_time_t) continue; // it is as bad as we are
long after_wanted = (latest_time_t < smaller_tier_first_time) ? smaller_tier_first_time : latest_time_t;
long before_wanted = smaller_tier_last_time;
- struct rrddim_tier *tmp = rd->tiers[tr];
+ struct rrddim_tier *tmp = rd->tiers[read_tier];
tmp->query_ops->init(tmp->db_metric_handle, &handle, after_wanted, before_wanted);
size_t points_read = 0;
@@ -1516,7 +1516,7 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now)
if(sp.end_time > latest_time_t) {
latest_time_t = sp.end_time;
- store_metric_at_tier(rd, tr, t, sp, sp.end_time * USEC_PER_SEC);
+ store_metric_at_tier(rd, tier, t, sp, sp.end_time * USEC_PER_SEC);
}
}