summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
Diffstat (limited to 'database')
-rw-r--r--database/README.md2
-rw-r--r--database/engine/README.md2
-rw-r--r--database/engine/datafile.c1
-rw-r--r--database/engine/journalfile.c3
-rw-r--r--database/rrd.c3
-rw-r--r--database/rrd.h29
-rw-r--r--database/rrdcalctemplate.c3
-rw-r--r--database/rrddim.c66
-rw-r--r--database/rrdhost.c142
-rw-r--r--database/rrdset.c140
-rw-r--r--database/sqlite/sqlite_aclk.c15
-rw-r--r--database/sqlite/sqlite_aclk.h2
-rw-r--r--database/sqlite/sqlite_aclk_alert.c128
-rw-r--r--database/sqlite/sqlite_aclk_alert.h10
-rw-r--r--database/sqlite/sqlite_aclk_chart.c601
-rw-r--r--database/sqlite/sqlite_aclk_chart.h27
-rw-r--r--database/sqlite/sqlite_aclk_node.c30
-rw-r--r--database/sqlite/sqlite_functions.c167
-rw-r--r--database/sqlite/sqlite_functions.h2
-rw-r--r--database/sqlite/sqlite_health.c7
-rw-r--r--database/sqlite/sqlite_health.h2
21 files changed, 977 insertions, 405 deletions
diff --git a/database/README.md b/database/README.md
index 9fef7058..a8bb21e4 100644
--- a/database/README.md
+++ b/database/README.md
@@ -212,4 +212,4 @@ Netdata will create charts for kernel memory de-duplication performance, like th
![image](https://cloud.githubusercontent.com/assets/2662304/11998786/eb23ae54-aab6-11e5-94d4-e848e8a5c56a.png)
-[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)
+
diff --git a/database/engine/README.md b/database/engine/README.md
index a782716f..7defcce9 100644
--- a/database/engine/README.md
+++ b/database/engine/README.md
@@ -256,4 +256,4 @@ and generate a read load of 1.7M/sec, whereas in the CPU-bound scenario the read
Consequently, there is a significant degree of interference by the reader threads, that slow down the writer threads.
This is also possible because the interference effects are greater than the SSD impact on data generation throughput.
-[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2Fengine%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)
+
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index d4231107..46d7a8f1 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -159,6 +159,7 @@ int create_data_file(struct rrdengine_datafile *datafile)
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
}
+ memset(superblock, 0, sizeof(*superblock));
(void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ);
(void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ);
superblock->tier = 1;
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 64065616..1541eb10 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -210,6 +210,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
}
+ memset(superblock, 0, sizeof(*superblock));
(void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
(void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ);
@@ -428,7 +429,7 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
iov = uv_buf_init(buf, size_bytes);
ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
if (ret < 0) {
- error("uv_fs_read: pos=%lu, %s", pos, uv_strerror(ret));
+ error("uv_fs_read: pos=%"PRIu64", %s", pos, uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto skip_file;
}
diff --git a/database/rrd.c b/database/rrd.c
index dcab6518..321d3561 100644
--- a/database/rrd.c
+++ b/database/rrd.c
@@ -140,6 +140,7 @@ const char *rrdset_type_name(RRDSET_TYPE chart_type) {
// RRD - cache directory
char *rrdset_cache_dir(RRDHOST *host, const char *id, const char *config_section) {
+ UNUSED(config_section);
char *ret = NULL;
char b[FILENAME_MAX + 1];
@@ -147,7 +148,7 @@ char *rrdset_cache_dir(RRDHOST *host, const char *id, const char *config_section
rrdset_strncpyz_name(b, id, FILENAME_MAX);
snprintfz(n, FILENAME_MAX, "%s/%s", host->cache_dir, b);
- ret = config_get(config_section, "cache directory", n);
+ ret = strdupz(n);
if(host->rrd_memory_mode == RRD_MEMORY_MODE_MAP || host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
int r = mkdir(ret, 0775);
diff --git a/database/rrd.h b/database/rrd.h
index 3e0daa16..071e1d03 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -53,7 +53,6 @@ struct context_param {
uint8_t flags;
};
-#define RRDSET_MINIMUM_LIVE_COUNT 3
#define META_CHART_UPDATED 1
#define META_PLUGIN_UPDATED 2
#define META_MODULE_UPDATED 4
@@ -168,7 +167,9 @@ typedef enum rrddim_flags {
// No new values have been collected for this dimension since agent start or it was marked RRDDIM_FLAG_OBSOLETE at
// least rrdset_free_obsolete_time seconds ago.
RRDDIM_FLAG_ARCHIVED = (1 << 3),
- RRDDIM_FLAG_ACLK = (1 << 4)
+ RRDDIM_FLAG_ACLK = (1 << 4),
+
+ RRDDIM_FLAG_PENDING_FOREACH_ALARM = (1 << 5), // set when foreach alarm has not been initialized yet
} RRDDIM_FLAGS;
#ifdef HAVE_C___ATOMIC
@@ -239,6 +240,7 @@ extern void rrdset_add_label_to_new_list(RRDSET *st, char *key, char *value, LAB
extern void rrdset_finalize_labels(RRDSET *st);
extern void rrdset_update_labels(RRDSET *st, struct label *labels);
extern int rrdset_contains_label_keylist(RRDSET *st, char *key);
+extern int rrdset_matches_label_keys(RRDSET *st, char *key, char *words[], uint32_t *hash_key_list, int *word_count, int size);
extern struct label *rrdset_lookup_label_key(RRDSET *st, char *key, uint32_t key_hash);
// ----------------------------------------------------------------------------
@@ -436,6 +438,7 @@ struct rrdset_volatile {
uuid_t hash_id;
struct label *new_labels;
struct label_index labels;
+ bool is_ar_chart;
};
// ----------------------------------------------------------------------------
@@ -461,23 +464,23 @@ typedef enum rrdset_flags {
// (the master data set should be the one that has the same family and is not detail)
RRDSET_FLAG_DEBUG = 1 << 2, // enables or disables debugging for a chart
RRDSET_FLAG_OBSOLETE = 1 << 3, // this is marked by the collector/module as obsolete
- RRDSET_FLAG_EXPORTING_SEND = 1 << 4, // if set, this chart should be sent to Prometheus web API
- RRDSET_FLAG_EXPORTING_IGNORE = 1 << 5, // if set, this chart should not be sent to Prometheus web API
+ RRDSET_FLAG_EXPORTING_SEND = 1 << 4, // if set, this chart should be sent to Prometheus web API and external databases
+ RRDSET_FLAG_EXPORTING_IGNORE = 1 << 5, // if set, this chart should not be sent to Prometheus web API and external databases
RRDSET_FLAG_UPSTREAM_SEND = 1 << 6, // if set, this chart should be sent upstream (streaming)
RRDSET_FLAG_UPSTREAM_IGNORE = 1 << 7, // if set, this chart should not be sent upstream (streaming)
RRDSET_FLAG_UPSTREAM_EXPOSED = 1 << 8, // if set, we have sent this chart definition to netdata parent (streaming)
RRDSET_FLAG_STORE_FIRST = 1 << 9, // if set, do not eliminate the first collection during interpolation
RRDSET_FLAG_HETEROGENEOUS = 1 << 10, // if set, the chart is not homogeneous (dimensions in it have multiple algorithms, multipliers or dividers)
RRDSET_FLAG_HOMOGENEOUS_CHECK = 1 << 11, // if set, the chart should be checked to determine if the dimensions are homogeneous
- RRDSET_FLAG_HIDDEN = 1 << 12, // if set, do not show this chart on the dashboard, but use it for backends
+ RRDSET_FLAG_HIDDEN = 1 << 12, // if set, do not show this chart on the dashboard, but use it for exporting
RRDSET_FLAG_SYNC_CLOCK = 1 << 13, // if set, microseconds on next data collection will be ignored (the chart will be synced to now)
RRDSET_FLAG_OBSOLETE_DIMENSIONS = 1 << 14, // this is marked by the collector/module when a chart has obsolete dimensions
// No new values have been collected for this chart since agent start or it was marked RRDSET_FLAG_OBSOLETE at
// least rrdset_free_obsolete_time seconds ago.
RRDSET_FLAG_ARCHIVED = 1 << 15,
RRDSET_FLAG_ACLK = 1 << 16,
- RRDSET_FLAG_BACKEND_SEND = 1 << 17, // if set, this chart should be sent to backends
- RRDSET_FLAG_BACKEND_IGNORE = 1 << 18 // if set, this chart should not be sent to backends
+ RRDSET_FLAG_PENDING_FOREACH_ALARMS = 1 << 17, // contains dims with uninitialized foreach alarms
+ RRDSET_FLAG_ANOMALY_DETECTION = 1 << 18 // flag to identify anomaly detection charts.
} RRDSET_FLAGS;
#ifdef HAVE_C___ATOMIC
@@ -632,10 +635,11 @@ typedef enum rrdhost_flags {
RRDHOST_FLAG_ORPHAN = 1 << 0, // this host is orphan (not receiving data)
RRDHOST_FLAG_DELETE_OBSOLETE_CHARTS = 1 << 1, // delete files of obsolete charts
RRDHOST_FLAG_DELETE_ORPHAN_HOST = 1 << 2, // delete the entire host when orphan
- RRDHOST_FLAG_BACKEND_SEND = 1 << 3, // send it to backends
- RRDHOST_FLAG_BACKEND_DONT_SEND = 1 << 4, // don't send it to backends
+ RRDHOST_FLAG_EXPORTING_SEND = 1 << 3, // send it to external databases
+ RRDHOST_FLAG_EXPORTING_DONT_SEND = 1 << 4, // don't send it to external databases
RRDHOST_FLAG_ARCHIVED = 1 << 5, // The host is archived, no collected charts yet
RRDHOST_FLAG_MULTIHOST = 1 << 6, // Host belongs to localhost/megadb
+ RRDHOST_FLAG_PENDING_FOREACH_ALARMS = 1 << 7, // contains dims with uninitialized foreach alarms
} RRDHOST_FLAGS;
#ifdef HAVE_C___ATOMIC
@@ -729,6 +733,10 @@ typedef struct alarm_log {
// RRD HOST
struct rrdhost_system_info {
+ char *cloud_provider_type;
+ char *cloud_instance_type;
+ char *cloud_instance_region;
+
char *host_os_name;
char *host_os_id;
char *host_os_id_like;
@@ -756,6 +764,9 @@ struct rrdhost_system_info {
uint16_t hops;
bool ml_capable;
bool ml_enabled;
+ char *install_type;
+ char *prebuilt_arch;
+ char *prebuilt_dist;
};
struct rrdhost {
diff --git a/database/rrdcalctemplate.c b/database/rrdcalctemplate.c
index 67288e9d..9789f4be 100644
--- a/database/rrdcalctemplate.c
+++ b/database/rrdcalctemplate.c
@@ -120,6 +120,9 @@ inline void rrdcalctemplate_free(RRDCALCTEMPLATE *rt) {
freez(rt->name);
freez(rt->exec);
freez(rt->recipient);
+ freez(rt->classification);
+ freez(rt->component);
+ freez(rt->type);
freez(rt->context);
freez(rt->source);
freez(rt->units);
diff --git a/database/rrddim.c b/database/rrddim.c
index 78885df3..df45363b 100644
--- a/database/rrddim.c
+++ b/database/rrddim.c
@@ -3,32 +3,6 @@
#define NETDATA_RRD_INTERNALS
#include "rrd.h"
-static inline void calc_link_to_rrddim(RRDDIM *rd)
-{
- RRDHOST *host = rd->rrdset->rrdhost;
- RRDSET *st = rd->rrdset;
- if (host->alarms_with_foreach || host->alarms_template_with_foreach) {
- int count = 0;
- int hostlocked;
- for (count = 0; count < 5; count++) {
- hostlocked = netdata_rwlock_trywrlock(&host->rrdhost_rwlock);
- if (!hostlocked) {
- rrdcalc_link_to_rrddim(rd, st, host);
- rrdhost_unlock(host);
- break;
- } else if (hostlocked != EBUSY) {
- error("Cannot lock host to create an alarm for the dimension.");
- }
- sleep_usec(USEC_PER_MS * 200);
- }
-
- if (count == 5) {
- error(
- "Failed to create an alarm for dimension %s of chart %s 5 times. Skipping alarm.", rd->name, st->name);
- }
- }
-}
-
// ----------------------------------------------------------------------------
// RRDDIM index
@@ -73,9 +47,15 @@ inline int rrddim_set_name(RRDSET *st, RRDDIM *rd, const char *name) {
snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
rd->name = config_set_default(st->config_section, varname, name);
rd->hash_name = simple_hash(rd->name);
- rrddimvar_rename_all(rd);
+
+ if (!st->state->is_ar_chart)
+ rrddimvar_rename_all(rd);
+
rd->exposed = 0;
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+
+ ml_dimension_update_name(st, rd, name);
+
return 1;
}
@@ -238,7 +218,10 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
rrddimvar_create(rd, RRDVAR_TYPE_CALCULATED, NULL, NULL, &rd->last_stored_value, RRDVAR_OPTION_DEFAULT);
rrddimvar_create(rd, RRDVAR_TYPE_COLLECTED, NULL, "_raw", &rd->last_collected_value, RRDVAR_OPTION_DEFAULT);
rrddimvar_create(rd, RRDVAR_TYPE_TIME_T, NULL, "_last_collected_t", &rd->last_collected_time.tv_sec, RRDVAR_OPTION_DEFAULT);
- calc_link_to_rrddim(rd);
+
+ rrddim_flag_set(rd, RRDDIM_FLAG_PENDING_FOREACH_ALARM);
+ rrdset_flag_set(st, RRDSET_FLAG_PENDING_FOREACH_ALARMS);
+ rrdhost_flag_set(host, RRDHOST_FLAG_PENDING_FOREACH_ALARMS);
}
if (unlikely(rc)) {
debug(D_METADATALOG, "DIMENSION [%s] metadata updated", rd->id);
@@ -252,7 +235,6 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
char filename[FILENAME_MAX + 1];
char fullfilename[FILENAME_MAX + 1];
- char varname[CONFIG_MAX_NAME + 1];
unsigned long size = sizeof(RRDDIM) + (st->entries * sizeof(storage_number));
debug(D_RRD_CALLS, "Adding dimension '%s/%s'.", st->id, id);
@@ -350,18 +332,14 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
rd->cache_filename = strdupz(fullfilename);
- snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
- rd->name = config_get(st->config_section, varname, (name && *name)?name:rd->id);
+ rd->name = (name && *name)?strdupz(name):strdupz(rd->id);
rd->hash_name = simple_hash(rd->name);
- snprintfz(varname, CONFIG_MAX_NAME, "dim %s algorithm", rd->id);
- rd->algorithm = rrd_algorithm_id(config_get(st->config_section, varname, rrd_algorithm_name(algorithm)));
+ rd->algorithm = algorithm;
- snprintfz(varname, CONFIG_MAX_NAME, "dim %s multiplier", rd->id);
- rd->multiplier = config_get_number(st->config_section, varname, multiplier);
+ rd->multiplier = multiplier;
- snprintfz(varname, CONFIG_MAX_NAME, "dim %s divisor", rd->id);
- rd->divisor = config_get_number(st->config_section, varname, divisor);
+ rd->divisor = divisor;
if(!rd->divisor) rd->divisor = 1;
rd->entries = st->entries;
@@ -443,7 +421,7 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
td->next = rd;
}
- if(host->health_enabled) {
+ if(host->health_enabled && !st->state->is_ar_chart) {
rrddimvar_create(rd, RRDVAR_TYPE_CALCULATED, NULL, NULL, &rd->last_stored_value, RRDVAR_OPTION_DEFAULT);
rrddimvar_create(rd, RRDVAR_TYPE_COLLECTED, NULL, "_raw", &rd->last_collected_value, RRDVAR_OPTION_DEFAULT);
rrddimvar_create(rd, RRDVAR_TYPE_TIME_T, NULL, "_last_collected_t", &rd->last_collected_time.tv_sec, RRDVAR_OPTION_DEFAULT);
@@ -452,7 +430,9 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
if(unlikely(rrddim_index_add(st, rd) != rd))
error("RRDDIM: INTERNAL ERROR: attempt to index duplicate dimension '%s' on chart '%s'", rd->id, st->id);
- calc_link_to_rrddim(rd);
+ rrddim_flag_set(rd, RRDDIM_FLAG_PENDING_FOREACH_ALARM);
+ rrdset_flag_set(st, RRDSET_FLAG_PENDING_FOREACH_ALARMS);
+ rrdhost_flag_set(host, RRDHOST_FLAG_PENDING_FOREACH_ALARMS);
ml_new_dimension(rd);
@@ -503,6 +483,10 @@ void rrddim_free_custom(RRDSET *st, RRDDIM *rd, int db_rotated)
error("RRDDIM: INTERNAL ERROR: attempt to remove from index dimension '%s' on chart '%s', removed a different dimension.", rd->id, st->id);
// free(rd->annotations);
+#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+ if (!netdata_exit)
+ aclk_send_dimension_update(rd);
+#endif
RRD_MEMORY_MODE rrd_memory_mode = rd->rrd_memory_mode;
switch(rrd_memory_mode) {
@@ -511,6 +495,7 @@ void rrddim_free_custom(RRDSET *st, RRDDIM *rd, int db_rotated)
case RRD_MEMORY_MODE_RAM:
debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
freez((void *)rd->id);
+ freez((void *)rd->name);
freez(rd->cache_filename);
freez(rd->state);
munmap(rd, rd->memsize);
@@ -521,6 +506,7 @@ void rrddim_free_custom(RRDSET *st, RRDDIM *rd, int db_rotated)
case RRD_MEMORY_MODE_DBENGINE:
debug(D_RRD_CALLS, "Removing dimension '%s'.", rd->name);
freez((void *)rd->id);
+ freez((void *)rd->name);
freez(rd->cache_filename);
freez(rd->state);
freez(rd);
@@ -546,6 +532,7 @@ int rrddim_hide(RRDSET *st, const char *id) {
error("Cannot find dimension with id '%s' on stats '%s' (%s) on host '%s'.", id, st->name, st->id, host->hostname);
return 1;
}
+ (void) sql_set_dimension_option(&rd->state->metric_uuid, "hidden");
rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN);
#ifdef ENABLE_ACLK
@@ -563,6 +550,7 @@ int rrddim_unhide(RRDSET *st, const char *id) {
error("Cannot find dimension with id '%s' on stats '%s' (%s) on host '%s'.", id, st->name, st->id, host->hostname);
return 1;
}
+ (void) sql_set_dimension_option(&rd->state->metric_uuid, NULL);
rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN);
#ifdef ENABLE_ACLK
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 79e283a6..649736ca 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -385,17 +385,15 @@ RRDHOST *rrdhost_create(const char *hostname,
// ------------------------------------------------------------------------
// init new ML host and update system_info to let upstreams know
// about ML functionality
+ //
- ml_new_host(host);
if (is_localhost && host->system_info) {
-#ifndef ENABLE_ML
- host->system_info->ml_capable = 0;
-#else
- host->system_info->ml_capable = 1;
-#endif
- host->system_info->ml_enabled = host->ml_host != NULL;
+ host->system_info->ml_capable = ml_capable();
+ host->system_info->ml_enabled = ml_enabled(host);
}
+ ml_new_host(host);
+
info("Host '%s' (at registry as '%s') with guid '%s' initialized"
", os '%s'"
", timezone '%s'"
@@ -642,8 +640,6 @@ RRDHOST *rrdhost_find_or_create(
rrdhost_unlock(host);
}
- rrdhost_cleanup_orphan_hosts_nolock(host);
-
rrd_unlock();
return host;
@@ -652,7 +648,7 @@ inline int rrdhost_should_be_removed(RRDHOST *host, RRDHOST *protected_host, tim
if(host != protected_host
&& host != localhost
&& rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN)
- && host->receiver
+ && !host->receiver
&& host->senders_disconnected_time
&& host->senders_disconnected_time + rrdhost_free_orphan_time < now)
return 1;
@@ -722,7 +718,7 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info) {
, netdata_configured_timezone
, netdata_configured_abbrev_timezone
, netdata_configured_utc_offset
- , config_get(CONFIG_SECTION_BACKEND, "host tags", "")
+ , ""
, program_name
, program_version
, default_rrd_update_every
@@ -810,6 +806,9 @@ void rrdhost_system_info_free(struct rrdhost_system_info *system_info) {
info("SYSTEM_INFO: free %p", system_info);
if(likely(system_info)) {
+ freez(system_info->cloud_provider_type);
+ freez(system_info->cloud_instance_type);
+ freez(system_info->cloud_instance_region);
freez(system_info->host_os_name);
freez(system_info->host_os_id);
freez(system_info->host_os_id_like);
@@ -834,6 +833,9 @@ void rrdhost_system_info_free(struct rrdhost_system_info *system_info) {
freez(system_info->container);
freez(system_info->container_detection);
freez(system_info->is_k8s_node);
+ freez(system_info->install_type);
+ freez(system_info->prebuilt_arch);
+ freez(system_info->prebuilt_dist);
freez(system_info);
}
}
@@ -846,6 +848,10 @@ void rrdhost_free(RRDHOST *host) {
rrd_check_wrlock(); // make sure the RRDs are write locked
+ rrdhost_wrlock(host);
+ ml_delete_host(host);
+ rrdhost_unlock(host);
+
// ------------------------------------------------------------------------
// clean up streaming
rrdpush_sender_thread_stop(host); // stop a possibly running thread
@@ -877,7 +883,20 @@ void rrdhost_free(RRDHOST *host) {
rrdhost_wrlock(host); // lock this RRDHOST
-
+#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+ struct aclk_database_worker_config *wc = host->dbsync_worker;
+ if (wc && !netdata_exit) {
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_ORPHAN_HOST;
+ struct aclk_completion compl ;
+ init_aclk_completion(&compl );
+ cmd.completion = &compl ;
+ aclk_database_enq_cmd(wc, &cmd);
+ wait_for_aclk_completion(&compl );
+ destroy_aclk_completion(&compl );
+ }
+#endif
// ------------------------------------------------------------------------
// release its children resources
@@ -922,8 +941,6 @@ void rrdhost_free(RRDHOST *host) {
rrdeng_exit(host->rrdeng_ctx);
#endif
- ml_delete_host(host);
-
// ------------------------------------------------------------------------
// remove it from the indexes
@@ -980,7 +997,10 @@ void rrdhost_free(RRDHOST *host) {
freez(host->node_id);
freez(host);
-
+#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+ if (wc)
+ wc->is_orphan = 0;
+#endif
rrd_hosts_available--;
}
@@ -1019,6 +1039,18 @@ static struct label *rrdhost_load_auto_labels(void)
{
struct label *label_list = NULL;
+ if (localhost->system_info->cloud_provider_type)
+ label_list =
+ add_label_to_list(label_list, "_cloud_provider_type", localhost->system_info->cloud_provider_type, LABEL_SOURCE_AUTO);
+
+ if (localhost->system_info->cloud_instance_type)
+ label_list =
+ add_label_to_list(label_list, "_cloud_instance_type", localhost->system_info->cloud_instance_type, LABEL_SOURCE_AUTO);
+
+ if (localhost->system_info->cloud_instance_region)
+ label_list =
+ add_label_to_list(label_list, "_cloud_instance_region", localhost->system_info->cloud_instance_region, LABEL_SOURCE_AUTO);
+
if (localhost->system_info->host_os_name)
label_list =
add_label_to_list(label_list, "_os_name", localhost->system_info->host_os_name, LABEL_SOURCE_AUTO);
@@ -1071,6 +1103,18 @@ static struct label *rrdhost_load_auto_labels(void)
label_list =
add_label_to_list(label_list, "_is_k8s_node", localhost->system_info->is_k8s_node, LABEL_SOURCE_AUTO);
+ if (localhost->system_info->install_type)
+ label_list =
+ add_label_to_list(label_list, "_install_type", localhost->system_info->install_type, LABEL_SOURCE_AUTO);
+
+ if (localhost->system_info->prebuilt_arch)
+ label_list =
+ add_label_to_list(label_list, "_prebuilt_arch", localhost->system_info->prebuilt_arch, LABEL_SOURCE_AUTO);
+
+ if (localhost->system_info->prebuilt_dist)
+ label_list =
+ add_label_to_list(label_list, "_prebuilt_dist", localhost->system_info->prebuilt_dist, LABEL_SOURCE_AUTO);
+
label_list = add_aclk_host_labels(label_list);
label_list = add_label_to_list(
@@ -1206,50 +1250,6 @@ struct label *parse_json_tags(struct label *label_list, const char *tags)
return label_list;
}
-static struct label *rrdhost_load_labels_from_tags(void)
-{
- if (!localhost->tags)
- return NULL;
-
- struct label *label_list = NULL;
- BACKEND_TYPE type = BACKEND_TYPE_UNKNOWN;
-
- if (config_exists(CONFIG_SECTION_BACKEND, "enabled")) {
- if (config_get_boolean(CONFIG_SECTION_BACKEND, "enabled", CONFIG_BOOLEAN_NO) != CONFIG_BOOLEAN_NO) {
- const char *type_name = config_get(CONFIG_SECTION_BACKEND, "type", "graphite");
- type = backend_select_type(type_name);
- }
- }
-
- switch (type) {
- case BACKEND_TYPE_GRAPHITE:
- label_list = parse_simple_tags(
- label_list, localhost->tags, '=', ';', DO_NOT_STRIP_QUOTES, DO_NOT_STRIP_QUOTES,
- DO_NOT_SKIP_ESCAPED_CHARACTERS);
- break;
- case BACKEND_TYPE_OPENTSDB_USING_TELNET:
- label_list = parse_simple_tags(
- label_list, localhost->tags, '=', ' ', DO_NOT_STRIP_QUOTES, DO_NOT_STRIP_QUOTES,
- DO_NOT_SKIP_ESCAPED_CHARACTERS);
- break;
- case BACKEND_TYPE_OPENTSDB_USING_HTTP:
- label_list = parse_simple_tags(
- label_list, localhost->tags, ':', ',', STRIP_QUOTES, STRIP_QUOTES,
- DO_NOT_SKIP_ESCAPED_CHARACTERS);
- break;
- case BACKEND_TYPE_JSON:
- label_list = parse_json_tags(label_list, localhost->tags);
- break;
- default:
- label_list = parse_simple_tags(
- label_list, localhost->tags, '=', ',', DO_NOT_STRIP_QUOTES, STRIP_QUOTES,
- DO_NOT_SKIP_ESCAPED_CHARACTERS);
- break;
- }
-
- return label_list;
-}
-
static struct label *rrdhost_load_kubernetes_labels(void)
{
struct label *l=NULL;
@@ -1313,10 +1313,8 @@ void reload_host_labels(void)
struct label *from_auto = rrdhost_load_auto_labels();
struct label *from_k8s = rrdhost_load_kubernetes_labels();
struct label *from_config = rrdhost_load_config_labels();
- struct label *from_tags = rrdhost_load_labels_from_tags();
struct label *new_labels = merge_label_lists(from_auto, from_k8s);
- new_labels = merge_label_lists(new_labels, from_tags);
new_labels = merge_label_lists(new_labels, from_config);
rrdhost_rdlock(localhost);
@@ -1489,6 +1487,11 @@ restart_after_removal:
}
continue;
}
+#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+ else {
+ aclk_send_dimension_update(rd);
+ }
+#endif
}
last = rd;
rd = rd->next;
@@ -1518,6 +1521,10 @@ restart_after_removal:
rrdset_free(st);
goto restart_after_removal;
}
+#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+ else
+ sql_check_chart_liveness(st);
+#endif
}
}
@@ -1553,6 +1560,18 @@ int rrdhost_set_system_info_variable(struct rrdhost_system_info *system_info, ch
if (!strcmp(name, "NETDATA_PROTOCOL_VERSION"))
return res;
+ else if(!strcmp(name, "NETDATA_INSTANCE_CLOUD_TYPE")){
+ freez(system_info->cloud_provider_type);
+ system_info->cloud_provider_type = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE")){
+ freez(system_info->cloud_instance_type);
+ system_info->cloud_instance_type = strdupz(value);
+ }
+ else if(!strcmp(name, "NETDATA_INSTANCE_CLOUD_INSTANCE_REGION")){
+ freez(system_info->cloud_instance_region);
+ system_info->cloud_instance_region = strdupz(value);
+ }
else if(!strcmp(name, "NETDATA_CONTAINER_OS_NAME")){
freez(system_info->container_os_name);
system_info->container_os_name = strdupz(value);
@@ -1580,6 +1599,7 @@ int rrdhost_set_system_info_variable(struct rrdhost_system_info *system_info, ch
else if(!strcmp(name, "NETDATA_HOST_OS_NAME")){
freez(system_info->host_os_name);
system_info->host_os_name = strdupz(value);
+ json_fix_string(system_info->host_os_name);
}
else if(!strcmp(name, "NETDATA_HOST_OS_ID")){
freez(system_info->host_os_id);
diff --git a/database/rrdset.c b/database/rrdset.c
index 19af449d..f8e471be 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -144,25 +144,38 @@ int rrdset_set_name(RRDSET *st, const char *name) {
debug(D_RRD_CALLS, "rrdset_set_name() old: '%s', new: '%s'", st->name?st->name:"", name);
- char b[CONFIG_MAX_VALUE + 1];
- char n[RRD_ID_LENGTH_MAX + 1];
+ char full_name[RRD_ID_LENGTH_MAX + 1];
+ char sanitized_name[CONFIG_MAX_VALUE + 1];
+ char new_name[CONFIG_MAX_VALUE + 1];
- snprintfz(n, RRD_ID_LENGTH_MAX, "%s.%s", st->type, name);
- rrdset_strncpyz_name(b, n, CONFIG_MAX_VALUE);
+ snprintfz(full_name, RRD_ID_LENGTH_MAX, "%s.%s", st->type, name);
+ rrdset_strncpyz_name(sanitized_name, full_name, CONFIG_MAX_VALUE);
+ strncpyz(new_name, sanitized_name, CONFIG_MAX_VALUE);
- if(rrdset_index_find_name(host, b, 0)) {
- info("RRDSET: chart name '%s' on host '%s' already exists.", b, host->hostname);
- return 0;
+ if(rrdset_index_find_name(host, new_name, 0)) {
+ debug(D_RRD_CALLS, "RRDSET: chart name '%s' on host '%s' already exists.", new_name, host->hostname);
+ if(!strcmp(st->id, full_name) && !st->name) {
+ unsigned i = 1;
+
+ do {
+ snprintfz(new_name, CONFIG_MAX_VALUE, "%s_%u", sanitized_name, i);
+ i++;
+ } while (rrdset_index_find_name(host, new_name, 0));
+
+ info("RRDSET: using name '%s' for chart '%s' on host '%s'.", new_name, full_name, host->hostname);
+ } else {
+ return 0;
+ }
}
if(st->name) {
rrdset_index_del_name(host, st);
- st->name = config_set_default(st->config_section, "name", b);
+ st->name = strdupz(new_name);
st->hash_name = simple_hash(st->name);
rrdsetvar_rename_all(st);
}
else {
- st->name = config_get(st->config_section, "name", b);
+ st->name = strdupz(new_name);
st->hash_name = simple_hash(st->name);
}
@@ -177,8 +190,6 @@ int rrdset_set_name(RRDSET *st, const char *name) {
rrdset_flag_clear(st, RRDSET_FLAG_EXPORTING_SEND);
rrdset_flag_clear(st, RRDSET_FLAG_EXPORTING_IGNORE);
- rrdset_flag_clear(st, RRDSET_FLAG_BACKEND_SEND);
- rrdset_flag_clear(st, RRDSET_FLAG_BACKEND_IGNORE);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
@@ -384,6 +395,13 @@ void rrdset_free(RRDSET *st) {
netdata_rwlock_destroy(&st->state->labels.labels_rwlock);
// free directly allocated members
+ freez((void *)st->name);
+ freez(st->type);
+ freez(st->family);
+ freez(st->title);
+ freez(st->units);
+ freez(st->context);
+ freez(st->cache_dir);
freez(st->config_section);
freez(st->plugin_name);
freez(st->module_name);
@@ -617,10 +635,8 @@ RRDSET *rrdset_create_custom(
mark_rebuild |= META_CHART_UPDATED;
}
- RRDSET_TYPE new_chart_type =
- rrdset_type_id(config_get(st->config_section, "chart type", rrdset_type_name(chart_type)));
- if (st->chart_type != new_chart_type) {
- st->chart_type = new_chart_type;
+ if (st->chart_type != chart_type) {
+ st->chart_type = chart_type;
mark_rebuild |= META_CHART_UPDATED;
}
@@ -709,19 +725,11 @@ RRDSET *rrdset_create_custom(
// get the options from the config, we need to create it
long entries;
- if(memory_mode == RRD_MEMORY_MODE_DBENGINE) {
- // only sets it the first time
- entries = config_get_number(config_section, "history", 5);
- } else {
- long rentries = config_get_number(config_section, "history", history_entries);
- entries = align_entries_to_pagesize(memory_mode, rentries);
- if (entries != rentries) entries = config_set_number(config_section, "history", entries);
-
- if (memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries)
- entries = config_set_number(config_section, "history", 10);
- }
int enabled = config_get_boolean(config_section, "enabled", 1);
- if(!enabled) entries = 5;
+ if(!enabled || memory_mode == RRD_MEMORY_MODE_DBENGINE)
+ entries = 5;
+ else
+ entries = align_entries_to_pagesize(memory_mode, history_entries);
unsigned long size = sizeof(RRDSET);
char *cache_dir = rrdset_cache_dir(host, fullid, config_section);
@@ -840,22 +848,25 @@ RRDSET *rrdset_create_custom(
st->cache_dir = cache_dir;
- st->chart_type = rrdset_type_id(config_get(st->config_section, "chart type", rrdset_type_name(chart_type)));
- st->type = config_get(st->config_section, "type", type);
+ st->chart_type = chart_type;
+ st->type = strdupz(type);
st->state = callocz(1, sizeof(*st->state));
- st->family = config_get(st->config_section, "family", family?family:st->type);
+
+ st->family = family ? strdupz(family) : strdupz(st->type);
json_fix_string(st->family);
- st->units = config_get(st->config_section, "units", units?units:"");
+ st->state->is_ar_chart = strcmp(st->id, ML_ANOMALY_RATES_CHART_ID) == 0;
+
+ st->units = units ? strdupz(units) : strdupz("");
json_fix_string(st->units);
- st->context = config_get(st->config_section, "context", context?context:st->id);
+ st->context = context ? strdupz(context) : strdupz(st->id);
st->state->old_context = strdupz(st->context);
json_fix_string(st->context);
st->hash_context = simple_hash(st->context);
- st->priority = config_get_number(st->config_section, "priority", priority);
+ st->priority = priority;
if(enabled)
rrdset_flag_set(st, RRDSET_FLAG_ENABLED);
else
@@ -866,8 +877,6 @@ RRDSET *rrdset_create_custom(
rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE);
rrdset_flag_clear(st, RRDSET_FLAG_EXPORTING_SEND);
rrdset_flag_clear(st, RRDSET_FLAG_EXPORTING_IGNORE);
- rrdset_flag_clear(st, RRDSET_FLAG_BACKEND_SEND);
- rrdset_flag_clear(st, RRDSET_FLAG_BACKEND_IGNORE);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
@@ -905,7 +914,7 @@ RRDSET *rrdset_create_custom(
// could not use the name, use the id
rrdset_set_name(st, id);
- st->title = config_get(st->config_section, "title", title);
+ st->title = strdupz(title);
st->state->old_title = strdupz(st->title);
json_fix_string(st->title);
@@ -1392,9 +1401,9 @@ void rrdset_done(RRDSET *st) {
rrdset_rdlock(st);
#ifdef ENABLE_ACLK
- if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
- if (st->counter_done >= RRDSET_MINIMUM_LIVE_COUNT && st->dimensions) {
- if (likely(!queue_chart_to_aclk(st)))
+ if (likely(!st->state->is_ar_chart)) {
+ if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
+ if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st)))
rrdset_flag_set(st, RRDSET_FLAG_ACLK);
}
}
@@ -1475,7 +1484,14 @@ void rrdset_done(RRDSET *st) {
// check if we will re-write the entire page
if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE &&
dt_usec(&st->last_collected_time, &st->last_updated) > (RRDENG_BLOCK_SIZE / sizeof(storage_number)) * update_every_ut)) {
- info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
+ info(
+ "%s: too old data (last updated at %" PRId64 ".%" PRId64 ", last collected at %" PRId64 ".%" PRId64 "). "
+ "Resetting it. Will not store the next entry.",
+ st->name,
+ (int64_t)st->last_updated.tv_sec,
+ (int64_t)st->last_updated.tv_usec,
+ (int64_t)st->last_collected_time.tv_sec,
+ (int64_t)st->last_collected_time.tv_usec);
rrdset_reset(st);
rrdset_init_last_updated_time(st);
@@ -1815,8 +1831,10 @@ after_second_database_work:
continue;
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+ if (likely(!st->state->is_ar_chart)) {
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) {
- int live = ((mark - rd->last_collected_time.tv_sec) < (RRDSET_MINIMUM_LIVE_COUNT * rd->update_every));
+ int live =
+ ((mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every);
if (unlikely(live != rd->state->aclk_live_status)) {
if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
if (likely(!queue_dimension_to_aclk(rd))) {
@@ -1826,6 +1844,7 @@ after_second_database_work:
}
}
}
+ }
#endif
if(unlikely(!rd->updated))
continue;
@@ -1926,6 +1945,9 @@ after_second_database_work:
delete_dimension_uuid(&rd->state->metric_uuid);
} else {
/* Do not delete this dimension */
+#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+ aclk_send_dimension_update(rd);
+#endif
last = rd;
rd = rd->next;
continue;
@@ -2021,3 +2043,39 @@ struct label *rrdset_lookup_label_key(RRDSET *st, char *key, uint32_t key_hash)
}
return ret;
}
+
+static inline int k8s_space(char c) {
+ switch(c) {
+ case ':':
+ case ',':
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+int rrdset_matches_label_keys(RRDSET *st, char *keylist, char *words[], uint32_t *hash_key_list, int *word_count, int size)
+{
+ struct label_index *labels = &st->state->labels;
+
+ if (!labels->head)
+ return 0;
+
+ struct label *one_label;
+
+ if (!*word_count) {
+ *word_count = quoted_strings_splitter(keylist, words, size, k8s_space, NULL, NULL, 0);
+ for (int i = 0; i < *word_count - 1; i += 2) {
+ hash_key_list[i] = simple_hash(words[i]);
+ }
+ }
+
+ int ret = 1;
+ netdata_rwlock_rdlock(&labels->labels_rwlock);
+ for (int i = 0; ret && i < *word_count - 1; i += 2) {
+ one_label = label_list_lookup_key(labels->head, words[i], hash_key_list[i]);
+ ret = (one_label && !strcmp(one_label->value, words[i + 1]));
+ }
+ netdata_rwlock_unlock(&labels->labels_rwlock);
+ return ret;
+}
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index 63196a81..98932809 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -209,7 +209,7 @@ struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id)
void aclk_sync_exit_all()
{
- rrd_wrlock();
+ rrd_rdlock();
RRDHOST *host = localhost;
while(host) {
struct aclk_database_worker_config *wc = host->dbsync_worker;
@@ -508,11 +508,16 @@ void aclk_database_worker(void *arg)
aclk_update_retention(wc, cmd);
aclk_process_dimension_deletion(wc, cmd);
break;
-#endif
// NODE_INSTANCE DETECTION
+ case ACLK_DATABASE_ORPHAN_HOST:
+ wc->host = NULL;
+ wc->is_orphan = 1;
+ aclk_add_worker_thread(wc);
+ break;
+#endif
case ACLK_DATABASE_TIMER:
- if (unlikely(localhost && !wc->host)) {
+ if (unlikely(localhost && !wc->host && !wc->is_orphan)) {
if (claimed()) {
wc->host = rrdhost_find_by_guid(wc->host_guid, 0);
if (wc->host) {
@@ -567,7 +572,7 @@ void aclk_database_worker(void *arg)
freez(loop);
- rrd_wrlock();
+ rrd_rdlock();
if (likely(wc->host))
wc->host->dbsync_worker = NULL;
freez(wc);
@@ -815,7 +820,7 @@ void aclk_data_rotated(void)
return;
time_t next_rotation_time = now_realtime_sec()+ACLK_DATABASE_ROTATION_DELAY;
- rrd_wrlock();
+ rrd_rdlock();
RRDHOST *this_host = localhost;
while (this_host) {
struct aclk_database_worker_config *wc = this_host->dbsync_worker;
diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h
index 42494974..894d9348 100644
--- a/database/sqlite/sqlite_aclk.h
+++ b/database/sqlite/sqlite_aclk.h
@@ -125,6 +125,7 @@ enum aclk_database_opcode {
ACLK_DATABASE_CHART_ACK,
ACLK_DATABASE_UPD_RETENTION,
ACLK_DATABASE_DIM_DELETION,
+ ACLK_DATABASE_ORPHAN_HOST,
#endif
ACLK_DATABASE_ALARM_HEALTH_LOG,
ACLK_DATABASE_CLEANUP,
@@ -194,6 +195,7 @@ struct aclk_database_worker_config {
int chart_pending;
int chart_reset_count;
volatile unsigned is_shutting_down;
+ volatile unsigned is_orphan;
struct aclk_database_worker_config *next;
};
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index 238b500a..54e8be4a 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -127,7 +127,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
int rc;
if (unlikely(!wc->alert_updates)) {
- log_access("AC [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
return;
}
@@ -135,6 +135,11 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
if (unlikely(!claim_id))
return;
+ if (unlikely(!wc->host)) {
+ freez(claim_id);
+ return;
+ }
+
BUFFER *sql = buffer_create(1024);
if (wc->alerts_start_seq_id != 0) {
@@ -242,7 +247,9 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
alarm_log.old_value = (calculated_number) sqlite3_column_double(res, 24);
alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
- alarm_log.rendered_info = strdupz((char *)sqlite3_column_text(res, 18));
+ alarm_log.rendered_info = sqlite3_column_type(res, 18) == SQLITE_NULL ?
+ strdupz((char *)"") :
+ strdupz((char *)sqlite3_column_text(res, 18));
aclk_send_alarm_log_entry(&alarm_log);
@@ -267,7 +274,13 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
db_execute(buffer_tostring(sql));
} else {
if (log_first_sequence_id)
- log_access("OG [%s (%s)]: Sent alert events, first sequence_id %"PRIu64", last sequence_id %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", log_first_sequence_id, log_last_sequence_id);
+ log_access(
+ "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64,
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ log_first_sequence_id,
+ log_last_sequence_id,
+ wc->alerts_batch_id);
log_first_sequence_id = 0;
log_last_sequence_id = 0;
}
@@ -288,23 +301,32 @@ void aclk_send_alarm_health_log(char *node_id)
if (unlikely(!node_id))
return;
- log_access("IN [%s (N/A)]: Request to send alarm health log.", node_id);
+ char *hostname= NULL;
struct aclk_database_worker_config *wc = NULL;
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG;
- rrd_wrlock();
+ rrd_rdlock();
RRDHOST *host = find_host_by_node_id(node_id);
- if (likely(host))
+ if (likely(host)) {
wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ hostname = host->hostname;
+ }
+ else
+ hostname = get_hostname_by_node_id(node_id);
rrd_unlock();
+
+ log_access("ACLK REQ [%s (%s)]: HEALTH LOG request received", node_id, hostname ? hostname : "N/A");
+ if (unlikely(!host))
+ freez(hostname);
+
if (wc)
aclk_database_enq_cmd(wc, &cmd);
else {
if (aclk_worker_enq_cmd(node_id, &cmd))
- log_access("AC [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
+ log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
}
return;
}
@@ -323,7 +345,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
RRDHOST *host = wc->host;
if (unlikely(!host)) {
- rrd_wrlock();
+ rrd_rdlock();
host = find_host_by_node_id(wc->node_id);
rrd_unlock();
@@ -391,7 +413,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
wc->alert_sequence_id = last_sequence;
aclk_send_alarm_log_health(&alarm_log);
- log_access("OG [%s (%s)]: Alarm health log sent, first sequence id %"PRIu64", last sequence id %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", first_sequence, last_sequence);
+ log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %"PRIu64" to %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", first_sequence, last_sequence);
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
@@ -415,7 +437,7 @@ void aclk_send_alarm_configuration(char *config_hash)
return;
}
- log_access("IN [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
+ log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
@@ -525,14 +547,14 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct
}
if (likely(p_alarm_config.cfg_hash)) {
- log_access("OG [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
+ log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
aclk_send_provide_alarm_cfg(&p_alarm_config);
freez((char *) cmd.data_param);
freez(p_alarm_config.cfg_hash);
destroy_aclk_alarm_configuration(&alarm_config);
}
else
- log_access("AC [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
+ log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
bind_fail:
rc = sqlite3_finalize(res);
@@ -552,28 +574,30 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
if (unlikely(!node_id))
return;
- log_access("IN [%s (N/A)]: Start streaming alerts with batch_id %"PRIu64" and start_seq_id %"PRIu64".", node_id, batch_id, start_seq_id);
+ //log_access("ACLK REQ [%s (N/A)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64".", node_id, start_seq_id, batch_id);
uuid_t node_uuid;
if (uuid_parse(node_id, node_uuid))
return;
struct aclk_database_worker_config *wc = NULL;
- rrd_wrlock();
+ rrd_rdlock();
RRDHOST *host = find_host_by_node_id(node_id);
- if (likely(host))
+ rrd_unlock();
+ if (likely(host)) {
wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
(struct aclk_database_worker_config *)host->dbsync_worker :
(struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
- rrd_unlock();
- if (unlikely(!host->health_enabled)) {
- log_access("AC [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
- return;
- }
+ if (unlikely(!host->health_enabled)) {
+ log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
+ return;
+ }
+ } else
+ wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
if (likely(wc)) {
- log_access("AC [%s (%s)]: Start streaming alerts enabled with batch_id %"PRIu64" and start_seq_id %"PRIu64".", node_id, wc->host ? wc->host->hostname : "N/A", batch_id, start_seq_id);
+ log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? wc->host->hostname : "N/A", start_seq_id, batch_id);
__sync_synchronize();
wc->alerts_batch_id = batch_id;
wc->alerts_start_seq_id = start_seq_id;
@@ -581,7 +605,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
__sync_synchronize();
}
else
- log_access("AC [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
+ log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
#else
UNUSED(node_id);
@@ -607,7 +631,7 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config
db_execute(buffer_tostring(sql));
- log_access("AC [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK STA [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
buffer_free(sql);
#endif
@@ -644,7 +668,7 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn
return;
struct aclk_database_worker_config *wc = NULL;
- rrd_wrlock();
+ rrd_rdlock();
RRDHOST *host = find_host_by_node_id(node_id);
if (likely(host))
wc = (struct aclk_database_worker_config *)host->dbsync_worker;
@@ -657,6 +681,8 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn
wc->host ? wc->host->hostname : "N/A",
snapshot_id,
sequence_id);
+ if (wc->alerts_snapshot_id == snapshot_id)
+ return;
__sync_synchronize();
wc->alerts_snapshot_id = snapshot_id;
wc->alerts_ack_sequence_id = sequence_id;
@@ -669,7 +695,7 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn
cmd.completion = NULL;
aclk_database_enq_cmd(wc, &cmd);
} else
- log_access("AC [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
+ log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
#else
UNUSED(node_id);
UNUSED(snapshot_id);
@@ -714,7 +740,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN
alarm_log->utc_offset = host->utc_offset;
alarm_log->timezone = strdupz((char *)host->abbrev_timezone);
alarm_log->exec_path = ae->exec ? strdupz((char *)ae->exec) : strdupz((char *)host->health_default_exec);
- alarm_log->conf_source = ae->source ? strdupz((char *)ae->source) : "";
+ alarm_log->conf_source = ae->source ? strdupz((char *)ae->source) : strdupz((char *)"");
alarm_log->command = strdupz((char *)edit_command);
@@ -738,7 +764,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN
alarm_log->old_value = (!isnan(ae->old_value)) ? (calculated_number)ae->old_value : 0;
alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
- alarm_log->rendered_info = strdupz(ae->info);
+ alarm_log->rendered_info = ae->info ? strdupz(ae->info) : strdupz((char *)"");
freez(edit_command);
}
@@ -770,7 +796,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
UNUSED(cmd);
// we perhaps we don't need this for snapshots
if (unlikely(!wc->alert_updates)) {
- log_access("AC [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
return;
}
@@ -779,11 +805,14 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
return;
}
+ if (unlikely(!wc->alerts_snapshot_id))
+ return;
+
char *claim_id = is_agent_claimed();
if (unlikely(!claim_id))
return;
- log_access("OG [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id);
+ log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id);
aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id);
@@ -906,3 +935,44 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
UNUSED(host);
#endif
}
+
+int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
+{
+ int rc;
+ struct aclk_database_worker_config *wc = NULL;
+ wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ if (!wc)
+ return 1;
+
+ proto_alert_status->alert_updates = wc->alert_updates;
+ proto_alert_status->alerts_batch_id = wc->alerts_batch_id;
+
+ BUFFER *sql = buffer_create(1024);
+ sqlite3_stmt *res = NULL;
+
+ buffer_sprintf(sql, "SELECT MIN(sequence_id), MAX(sequence_id), " \
+ "(select MAX(sequence_id) from aclk_alert_%s where date_cloud_ack is not NULL), " \
+ "(select MAX(sequence_id) from aclk_alert_%s where date_submitted is not NULL) " \
+ "FROM aclk_alert_%s where date_submitted is null;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
+
+ rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement to get alert log status from the database.");
+ buffer_free(sql);
+ return 1;
+ }
+
+ while (sqlite3_step(res) == SQLITE_ROW) {
+ proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
+ proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
+ proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
+ proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0;
+ }
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc);
+
+ buffer_free(sql);
+ return 0;
+}
diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h
index 1aaaa5d2..957cb94a 100644
--- a/database/sqlite/sqlite_aclk_alert.h
+++ b/database/sqlite/sqlite_aclk_alert.h
@@ -5,6 +5,15 @@
extern sqlite3 *db_meta;
+struct proto_alert_status {
+ int alert_updates;
+ uint64_t alerts_batch_id;
+ uint64_t last_acked_sequence_id;
+ uint64_t pending_min_sequence_id;
+ uint64_t pending_max_sequence_id;
+ uint64_t last_submitted_sequence_id;
+};
+
int aclk_add_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_send_alarm_health_log(char *node_id);
@@ -16,5 +25,6 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host);
void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id);
+int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status);
#endif //NETDATA_SQLITE_ACLK_ALERT_H
diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c
index eea48a56..7afa1d45 100644
--- a/database/sqlite/sqlite_aclk_chart.c
+++ b/database/sqlite/sqlite_aclk_chart.c
@@ -3,12 +3,12 @@
#include "sqlite_functions.h"
#include "sqlite_aclk_chart.h"
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#include "../../aclk/aclk_charts_api.h"
#include "../../aclk/aclk.h"
-static inline int sql_queue_chart_payload(struct aclk_database_worker_config *wc,
- void *data, enum aclk_database_opcode opcode)
+static inline int
+sql_queue_chart_payload(struct aclk_database_worker_config *wc, void *data, enum aclk_database_opcode opcode)
{
int rc;
if (unlikely(!wc))
@@ -29,23 +29,22 @@ static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payl
int send_status = 0;
if (unlikely(!res)) {
- BUFFER *sql = buffer_create(1024);
- buffer_sprintf(sql,"SELECT 1 FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp "
+ char sql[ACLK_SYNC_QUERY_SIZE];
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT 1 FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp "
"WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;",
uuid_str, uuid_str);
- rc = prepare_statement(db_meta, (char *) buffer_tostring(sql), &res);
- buffer_free(sql);
+ rc = prepare_statement(db_meta, sql, &res);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to check payload data");
return 0;
}
}
- rc = sqlite3_bind_blob(res, 1, uuid , sizeof(*uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, 1, uuid, sizeof(*uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_blob(res, 2, payload , payload_size, SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, 2, payload, payload_size, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
@@ -60,24 +59,23 @@ bind_fail:
}
static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t *uuid, char *claim_id,
- ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size)
+ ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, int *send_status)
{
static __thread sqlite3_stmt *res_chart = NULL;
int rc;
rc = payload_sent(wc->uuid_str, uuid, payload, payload_size);
+ if (send_status)
+ *send_status = rc;
if (rc == 1)
return 0;
if (unlikely(!res_chart)) {
- BUFFER *sql = buffer_create(1024);
-
- buffer_sprintf(sql,"INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \
- "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str);
-
- rc = prepare_statement(db_meta, (char *) buffer_tostring(sql), &res_chart);
- buffer_free(sql);
-
+ char sql[ACLK_SYNC_QUERY_SIZE];
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,
+ "INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \
+ "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str);
+ rc = prepare_statement(db_meta, sql, &res_chart);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to store chart payload data");
return 1;
@@ -91,15 +89,15 @@ static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t
if (uuid_parse(claim_id, claim_uuid))
return 1;
- rc = sqlite3_bind_blob(res_chart, 1, &unique_uuid , sizeof(unique_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res_chart, 1, &unique_uuid, sizeof(unique_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_blob(res_chart, 2, uuid , sizeof(*uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res_chart, 2, uuid, sizeof(*uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_blob(res_chart, 3, &claim_uuid , sizeof(claim_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res_chart, 3, &claim_uuid, sizeof(claim_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
@@ -127,7 +125,6 @@ bind_fail:
return (rc != SQLITE_DONE);
}
-
int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
int rc = 0;
@@ -162,7 +159,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
size_t size;
char *payload = generate_chart_instance_updated(&size, &chart_payload);
if (likely(payload))
- rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size);
+ rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL);
freez(payload);
chart_instance_updated_destroy(&chart_payload);
}
@@ -170,7 +167,8 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
}
static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *wc, char *claim_id, uuid_t *dim_uuid,
- const char *dim_id, const char *dim_name, const char *chart_type_id, time_t first_time, time_t last_time)
+ const char *dim_id, const char *dim_name, const char *chart_type_id, time_t first_time, time_t last_time,
+ int *send_status)
{
int rc = 0;
size_t size;
@@ -185,6 +183,9 @@ static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *w
if (!first_time)
info("Host %s (node %s) deleting dimension id=[%s] name=[%s] chart=[%s]",
wc->host_guid, wc->node_id, dim_id, dim_name, chart_type_id);
+ if (last_time)
+ info("Host %s (node %s) stopped collecting dimension id=[%s] name=[%s] chart=[%s] %ld seconds ago at %ld",
+ wc->host_guid, wc->node_id, dim_id, dim_name, chart_type_id, now_realtime_sec() - last_time, last_time);
#endif
dim_payload.node_id = wc->node_id;
@@ -196,7 +197,7 @@ static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *w
dim_payload.last_timestamp.tv_sec = last_time;
char *payload = generate_chart_dimension_updated(&size, &dim_payload);
if (likely(payload))
- rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size);
+ rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status);
freez(payload);
return rc;
}
@@ -220,8 +221,13 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str
if (!claim_id)
return;
- rc = sqlite3_prepare_v2(db_meta, "DELETE FROM dimension_delete where host_id = @host_id " \
- "RETURNING dimension_id, dimension_name, chart_type_id, dim_id LIMIT 10;", -1, &res, 0);
+ rc = sqlite3_prepare_v2(
+ db_meta,
+ "DELETE FROM dimension_delete where host_id = @host_id "
+ "RETURNING dimension_id, dimension_name, chart_type_id, dim_id LIMIT 10;",
+ -1,
+ &res,
+ 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement when trying to delete dimension deletes");
@@ -229,7 +235,7 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str
return;
}
- rc = sqlite3_bind_blob(res, 1, &host_id , sizeof(host_id), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, 1, &host_id, sizeof(host_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
@@ -243,7 +249,8 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str
(const char *)sqlite3_column_text(res, 1),
(const char *)sqlite3_column_text(res, 2),
0,
- 0);
+ 0,
+ NULL);
count++;
}
@@ -272,12 +279,13 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk
RRDDIM *rd = cmd.data;
if (likely(claim_id)) {
+ int send_status = 0;
time_t now = now_realtime_sec();
time_t first_t = rd->state->query_ops.oldest_time(rd);
- time_t last_t = rd->state->query_ops.latest_time(rd);
+ time_t last_t = rd->state->query_ops.latest_time(rd);
- int live = ((now - last_t) < (RRDSET_MINIMUM_LIVE_COUNT * rd->update_every));
+ int live = ((now - last_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every));
rc = aclk_upd_dimension_event(
wc,
@@ -287,7 +295,11 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk
rd->name,
rd->rrdset->id,
first_t,
- live ? 0 : last_t);
+ live ? 0 : last_t,
+ &send_status);
+
+ if (!send_status)
+ rd->state->aclk_live_status = live;
freez(claim_id);
}
@@ -295,14 +307,16 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk
return rc;
}
-
void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
int rc;
wc->chart_pending = 0;
if (unlikely(!wc->chart_updates)) {
- log_access("AC [%s (%s)]: Ignoring chart push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access(
+ "ACLK STA [%s (%s)]: Ignoring chart push event, updates have been turned off for this node.",
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A");
return;
}
@@ -320,32 +334,31 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
uint64_t last_sequence;
time_t last_timestamp = 0;
- BUFFER *sql = buffer_create(1024);
-
- sqlite3_stmt *res = NULL;
-
- buffer_sprintf(sql, "SELECT ac.sequence_id, acp.payload, ac.date_created, ac.type, ac.uuid " \
- "FROM aclk_chart_%s ac, aclk_chart_payload_%s acp " \
- "WHERE ac.date_submitted IS NULL AND ac.unique_id = acp.unique_id AND ac.update_count > 0 " \
- "AND acp.claim_id = @claim_id ORDER BY ac.sequence_id ASC LIMIT %d;", wc->uuid_str, wc->uuid_str, limit);
+ char sql[ACLK_SYNC_QUERY_SIZE];
+ static __thread sqlite3_stmt *res = NULL;
- rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
- if (rc != SQLITE_OK) {
- error_report("Failed to prepare statement when trying to send a chart update via ACLK");
- buffer_free(sql);
- freez(claim_id);
- return;
+ if (unlikely(!res)) {
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"SELECT ac.sequence_id, acp.payload, ac.date_created, ac.type, ac.uuid " \
+ "FROM aclk_chart_%s ac, aclk_chart_payload_%s acp " \
+ "WHERE ac.date_submitted IS NULL AND ac.unique_id = acp.unique_id AND ac.update_count > 0 " \
+ "AND acp.claim_id = @claim_id ORDER BY ac.sequence_id ASC LIMIT %d;", wc->uuid_str, wc->uuid_str, limit);
+ rc = prepare_statement(db_meta, sql, &res);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement when trying to send a chart update via ACLK");
+ freez(claim_id);
+ return;
+ }
}
- rc = sqlite3_bind_blob(res, 1, claim_uuid , sizeof(claim_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, 1, claim_uuid, sizeof(claim_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- char **payload_list = callocz(limit+1, sizeof(char *));
- size_t *payload_list_size = callocz(limit+1, sizeof(size_t));
- size_t *payload_list_max_size = callocz(limit+1, sizeof(size_t));
- struct aclk_message_position *position_list = callocz(limit+1, sizeof(*position_list));
- int *is_dim = callocz(limit+1, sizeof(*is_dim));
+ char **payload_list = callocz(limit + 1, sizeof(char *));
+ size_t *payload_list_size = callocz(limit + 1, sizeof(size_t));
+ size_t *payload_list_max_size = callocz(limit + 1, sizeof(size_t));
+ struct aclk_message_position *position_list = callocz(limit + 1, sizeof(*position_list));
+ int *is_dim = callocz(limit + 1, sizeof(*is_dim));
int loop = cmd.param1;
@@ -388,29 +401,31 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
error_report("Failed to reset statement when pushing chart events, rc = %d", rc);
if (likely(first_sequence)) {
- buffer_flush(sql);
db_lock();
- buffer_sprintf(sql, "UPDATE aclk_chart_%s SET status = NULL, date_submitted=strftime('%%s','now') "
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "UPDATE aclk_chart_%s SET status = NULL, date_submitted=strftime('%%s','now') "
"WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
wc->uuid_str, first_sequence, last_sequence);
- db_execute(buffer_tostring(sql));
-
- buffer_flush(sql);
- buffer_sprintf(sql, "INSERT OR REPLACE INTO aclk_chart_latest_%s (uuid, unique_id, date_submitted) "
+ db_execute(sql);
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "INSERT OR REPLACE INTO aclk_chart_latest_%s (uuid, unique_id, date_submitted) "
" SELECT uuid, unique_id, date_submitted FROM aclk_chart_%s s "
" WHERE date_submitted IS NOT NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64
" ;",
wc->uuid_str, wc->uuid_str, first_sequence, last_sequence);
- db_execute(buffer_tostring(sql));
+ db_execute(sql);
db_unlock();
aclk_chart_inst_and_dim_update(payload_list, payload_list_size, is_dim, position_list, wc->batch_id);
- log_access("OG [%s (%s)]: Sending charts and dimensions update, batch_id %"PRIu64", first sequence %"PRIu64", last sequence %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->batch_id, first_sequence, last_sequence);
+ log_access(
+ "ACLK RES [%s (%s)]: CHARTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64,
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ first_sequence,
+ last_sequence,
+ wc->batch_id);
wc->chart_sequence_id = last_sequence;
wc->chart_timestamp = last_timestamp;
- }
- else
+ } else
break;
--loop;
}
@@ -419,11 +434,14 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
time_t now = now_realtime_sec();
if (wc->rotation_after > now && wc->rotation_after < now + ACLK_DATABASE_ROTATION_DELAY)
wc->rotation_after = now + ACLK_DATABASE_ROTATION_DELAY;
- }
- else {
+ } else {
wc->chart_payload_count = sql_get_pending_count(wc);
if (!wc->chart_payload_count)
- log_access("AC [%s (%s)]: Sync of charts and dimensions done in %ld seconds.", wc->node_id, wc->host ? wc->host->hostname : "N/A", now_realtime_sec() - wc->startup_time);
+ log_access(
+ "ACLK STA [%s (%s)]: Sync of charts and dimensions done in %ld seconds.",
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ now_realtime_sec() - wc->startup_time);
}
for (int i = 0; i <= limit; ++i)
@@ -436,11 +454,10 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
freez(is_dim);
bind_fail:
- rc = sqlite3_finalize(res);
+ rc = sqlite3_reset(res);
if (unlikely(rc != SQLITE_OK))
- error_report("Failed to finalize statement when pushing chart events, rc = %d", rc);
+ error_report("Failed to reset statement when pushing chart events, rc = %d", rc);
- buffer_free(sql);
freez(claim_id);
return;
}
@@ -496,40 +513,44 @@ int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_d
}
if (likely(chart_config.config_hash)) {
- log_access("OG [%s (%s)]: Sending chart config for %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", hash_id);
+ log_access(
+ "ACLK REQ [%s (%s)]: Sending chart config for %s.",
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ hash_id);
aclk_chart_config_updated(&chart_config, 1);
destroy_chart_config_updated(&chart_config);
- }
- else
- log_access("AC [%s (%s)]: Chart config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", hash_id);
+ } else
+ log_access(
+ "ACLK STA [%s (%s)]: Chart config for %s not found.",
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ hash_id);
- bind_fail:
- rc = sqlite3_finalize(res);
- if (unlikely(rc != SQLITE_OK))
- error_report("Failed to reset statement when pushing chart config hash, rc = %d", rc);
- fail:
- freez((char *) cmd.data_param);
- buffer_free(sql);
- return rc;
+bind_fail:
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement when pushing chart config hash, rc = %d", rc);
+fail:
+ freez((char *)cmd.data_param);
+ buffer_free(sql);
+ return rc;
}
-
void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
int rc;
sqlite3_stmt *res = NULL;
- log_access("IN [%s (%s)]: Received ack chart sequence id %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", cmd.param1);
+ char sql[ACLK_SYNC_QUERY_SIZE];
- BUFFER *sql = buffer_create(1024);
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"UPDATE aclk_chart_%s SET date_updated=strftime('%%s','now') WHERE sequence_id <= @sequence_id "
+ "AND date_submitted IS NOT NULL AND date_updated IS NULL;", wc->uuid_str);
- buffer_sprintf(sql, "UPDATE aclk_chart_%s SET date_updated=strftime('%%s','now') WHERE sequence_id <= @sequence_id "
- "AND date_submitted IS NOT NULL AND date_updated IS NULL;", wc->uuid_str);
-
- rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
- error_report("Failed to prepare statement count sequence ids in the database");
- goto prepare_fail;
+ error_report("Failed to prepare statement to ack chart sequence ids");
+ return;
}
rc = sqlite3_bind_int64(res, 1, (uint64_t) cmd.param1);
@@ -539,28 +560,34 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_
rc = execute_insert(res);
if (rc != SQLITE_DONE)
error_report("Failed to ACK sequence id, rc = %d", rc);
+ else
+ log_access(
+ "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED in the database upto %" PRIu64,
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ cmd.param1);
- bind_fail:
- if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
- error_report("Failed to finalize statement to ACK older sequence ids, rc = %d", rc);
-
- prepare_fail:
- buffer_free(sql);
- return;
+bind_fail:
+ if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
+ error_report("Failed to finalize statement to ACK older sequence ids, rc = %d", rc);
+ return;
}
void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
BUFFER *sql = buffer_create(1024);
- buffer_sprintf(sql, "UPDATE aclk_chart_%s SET status = NULL, date_submitted = NULL WHERE sequence_id >= %"PRIu64";",
- wc->uuid_str, cmd.param1);
+ buffer_sprintf(
+ sql,
+ "UPDATE aclk_chart_%s SET status = NULL, date_submitted = NULL WHERE sequence_id >= %" PRIu64 ";",
+ wc->uuid_str,
+ cmd.param1);
db_execute(buffer_tostring(sql));
if (cmd.param1 == 1) {
- db_lock();
buffer_flush(sql);
- log_access("IN [%s (%s)]: Received chart full resync.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; " \
"DELETE FROM aclk_chart_latest_%s;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
+ db_lock();
db_execute("BEGIN TRANSACTION;");
db_execute(buffer_tostring(sql));
@@ -587,12 +614,14 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
rrdset_unlock(st);
}
rrdhost_unlock(host);
- }
- else
+ } else
error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid);
- }
- else {
- log_access("AC [%s (%s)]: Restarting chart sync from sequence %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", cmd.param1);
+ } else {
+ log_access(
+ "ACLK STA [%s (%s)]: Restarting chart sync from sequence %" PRIu64,
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ cmd.param1);
wc->chart_payload_count = sql_get_pending_count(wc);
sql_get_last_chart_sequence(wc);
}
@@ -601,7 +630,6 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
return;
}
-
//
// Functions called directly from ACLK threads and will queue commands
//
@@ -617,7 +645,12 @@ void aclk_get_chart_config(char **hash_id)
cmd.opcode = ACLK_DATABASE_PUSH_CHART_CONFIG;
for (int i = 0; hash_id[i]; ++i) {
// TODO: Verify that we have a valid hash_id
- log_access("IN [%s (%s)]: Request %d for chart config with hash %s received.", wc->node_id, wc->host ? wc->host->hostname : "N/A", i, hash_id[i]);
+ log_access(
+ "ACLK REQ [%s (%s)]: Request %d for chart config with hash %s received.",
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ i,
+ hash_id[i]);
cmd.data_param = (void *)strdupz(hash_id[i]);
aclk_database_enq_cmd(wc, &cmd);
}
@@ -632,13 +665,13 @@ static void aclk_submit_param_command(char *node_id, enum aclk_database_opcode a
if (unlikely(!node_id))
return;
- struct aclk_database_worker_config *wc = NULL;
+ struct aclk_database_worker_config *wc = NULL;
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = aclk_command;
cmd.param1 = param;
- rrd_wrlock();
+ rrd_rdlock();
RRDHOST *host = find_host_by_node_id(node_id);
if (likely(host))
wc = (struct aclk_database_worker_config *)host->dbsync_worker;
@@ -647,7 +680,7 @@ static void aclk_submit_param_command(char *node_id, enum aclk_database_opcode a
aclk_database_enq_cmd(wc, &cmd);
else {
if (aclk_worker_enq_cmd(node_id, &cmd))
- log_access("AC [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
+ log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
}
return;
}
@@ -657,7 +690,10 @@ void aclk_ack_chart_sequence_id(char *node_id, uint64_t last_sequence_id)
if (unlikely(!node_id))
return;
- log_access("AC [%s (N/A)]: Node reports last sequence id received %"PRIu64, node_id, last_sequence_id);
+ char *hostname = get_hostname_by_node_id(node_id);
+ log_access("ACLK REQ [%s (%s)]: CHARTS ACKNOWLEDGED upto %" PRIu64, node_id, hostname ? hostname : "N/A",
+ last_sequence_id);
+ freez(hostname);
aclk_submit_param_command(node_id, ACLK_DATABASE_CHART_ACK, last_sequence_id);
return;
}
@@ -669,15 +705,17 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
if (unlikely(!node_id))
return;
- log_access("IN [%s (N/A)]: Start streaming charts from sequence %"PRIu64" t=%ld, batch=%"PRIu64, node_id,
- sequence_id, created_at, batch_id);
+ // log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM from %"PRIu64" t=%ld batch=%"PRIu64, node_id,
+ // sequence_id, created_at, batch_id);
uuid_t node_uuid;
- if (uuid_parse(node_id, node_uuid))
+ if (uuid_parse(node_id, node_uuid)) {
+ log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM ignored, invalid node id", node_id);
return;
+ }
struct aclk_database_worker_config *wc = NULL;
- rrd_wrlock();
+ rrd_rdlock();
RRDHOST *host = localhost;
while(host) {
if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) {
@@ -692,10 +730,23 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
wc->batch_id = batch_id;
__sync_synchronize();
wc->batch_created = now_realtime_sec();
+ log_access(
+ "ACLK REQ [%s (%s)]: CHARTS STREAM from %" PRIu64 " t=%ld resets=%d",
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ wc->chart_sequence_id,
+ wc->chart_timestamp,
+ wc->chart_reset_count);
if (sequence_id > wc->chart_sequence_id || wc->chart_reset_count > 10) {
- log_access("AC [%s (%s)]: Requesting full resync from the cloud "
- "(reset=%d, remote_seq=%"PRIu64", local_seq=%"PRIu64")"
- , wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->chart_reset_count, sequence_id, wc->chart_sequence_id);
+ log_access(
+ "ACLK RES [%s (%s)]: CHARTS FULL RESYNC REQUEST "
+ "remote_seq=%" PRIu64 " local_seq=%" PRIu64 " resets=%d ",
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ sequence_id,
+ wc->chart_sequence_id,
+ wc->chart_reset_count);
+
chart_reset_t chart_reset;
chart_reset.claim_id = is_agent_claimed();
if (chart_reset.claim_id) {
@@ -710,26 +761,34 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
// TODO: handle timestamp
- if (sequence_id < wc->chart_sequence_id || !sequence_id) { // || created_at != wc->chart_timestamp) {
- log_access("AC [%s (%s)]: Reset streaming charts from sequence %"PRIu64 \
- " t=%ld (reset count=%d)", wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->chart_sequence_id,
- wc->chart_timestamp, wc->chart_reset_count);
+ if (sequence_id < wc->chart_sequence_id ||
+ !sequence_id) { // || created_at != wc->chart_timestamp) {
+ log_access(
+ "ACLK REQ [%s (%s)]: CHART RESET from %" PRIu64 " t=%ld batch=%" PRIu64,
+ wc->node_id,
+ wc->host ? wc->host->hostname : "N/A",
+ wc->chart_sequence_id,
+ wc->chart_timestamp,
+ wc->batch_id);
cmd.opcode = ACLK_DATABASE_RESET_CHART;
cmd.param1 = sequence_id + 1;
cmd.completion = NULL;
aclk_database_enq_cmd(wc, &cmd);
- }
- else {
- log_access("AC [%s (%s)]: Start streaming charts enabled -- last streamed sequence %"PRIu64 \
- " t=%ld (reset count=%d)", wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->chart_sequence_id,
- wc->chart_timestamp, wc->chart_reset_count);
+ } else {
+// log_access(
+// "ACLK RES [%s (%s)]: CHARTS STREAM from %" PRIu64
+// " t=%ld resets=%d",
+// wc->node_id,
+// wc->host ? wc->host->hostname : "N/A",
+// wc->chart_sequence_id,
+// wc->chart_timestamp,
+// wc->chart_reset_count);
wc->chart_reset_count = 0;
wc->chart_updates = 1;
}
}
- }
- else
- log_access("AC [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
+ } else
+ log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
return;
}
host = host->next;
@@ -761,7 +820,7 @@ static RRD_MEMORY_MODE sql_get_host_memory_mode(uuid_t *host_id)
}
while (sqlite3_step(res) == SQLITE_ROW) {
- memory_mode = (RRD_MEMORY_MODE) sqlite3_column_int(res, 0);
+ memory_mode = (RRD_MEMORY_MODE)sqlite3_column_int(res, 0);
}
failed:
@@ -771,11 +830,13 @@ failed:
return memory_mode;
}
-#define SELECT_HOST_DIMENSION_LIST "SELECT d.dim_id, c.update_every, c.type||'.'||c.id, d.id, d.name FROM chart c, dimension d " \
- "WHERE d.chart_id = c.chart_id AND c.host_id = @host_id ORDER BY c.update_every ASC;"
+#define SELECT_HOST_DIMENSION_LIST \
+ "SELECT d.dim_id, c.update_every, c.type||'.'||c.id, d.id, d.name FROM chart c, dimension d " \
+ "WHERE d.chart_id = c.chart_id AND c.host_id = @host_id ORDER BY c.update_every ASC;"
-#define SELECT_HOST_CHART_LIST "SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \
- "WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;"
+#define SELECT_HOST_CHART_LIST \
+ "SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \
+ "WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;"
void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
@@ -821,10 +882,12 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
goto failed;
}
- time_t start_time = LONG_MAX;
- time_t first_entry_t;
- time_t last_entry_t;
+ time_t start_time = LONG_MAX;
+ time_t first_entry_t;
+ time_t last_entry_t;
uint32_t update_every = 0;
+ uint32_t dimension_update_count = 0;
+ int send_status;
struct retention_updated rotate_data;
@@ -840,11 +903,11 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
rotate_data.claim_id = claim_id;
rotate_data.node_id = strdupz(wc->node_id);
- // time_t now = now_realtime_sec();
+ time_t now = now_realtime_sec();
while (sqlite3_step(res) == SQLITE_ROW) {
- if (!update_every || update_every != (uint32_t) sqlite3_column_int(res, 1)) {
+ if (!update_every || update_every != (uint32_t)sqlite3_column_int(res, 1)) {
if (update_every) {
- debug(D_ACLK_SYNC,"Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
+ debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
if (start_time == LONG_MAX)
rotate_data.interval_durations[rotate_data.interval_duration_count].retention = 0;
else
@@ -852,13 +915,14 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
rotate_data.rotation_timestamp.tv_sec - start_time;
rotate_data.interval_duration_count++;
}
- update_every = (uint32_t) sqlite3_column_int(res, 1);
+ update_every = (uint32_t)sqlite3_column_int(res, 1);
rotate_data.interval_durations[rotate_data.interval_duration_count].update_every = update_every;
start_time = LONG_MAX;
}
#ifdef ENABLE_DBENGINE
if (memory_mode == RRD_MEMORY_MODE_DBENGINE)
- rc = rrdeng_metric_latest_time_by_uuid((uuid_t *)sqlite3_column_blob(res, 0), &first_entry_t, &last_entry_t);
+ rc =
+ rrdeng_metric_latest_time_by_uuid((uuid_t *)sqlite3_column_blob(res, 0), &first_entry_t, &last_entry_t);
else
#endif
{
@@ -869,8 +933,7 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
first_entry_t = rrdset_first_entry_t(st);
last_entry_t = rrdset_last_entry_t(st);
}
- }
- else {
+ } else {
rc = 0;
first_entry_t = rotate_data.rotation_timestamp.tv_sec;
}
@@ -878,6 +941,24 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
if (likely(!rc && first_entry_t))
start_time = MIN(start_time, first_entry_t);
+
+ if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates) {
+ int live = ((now - last_entry_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * update_every));
+ if ((!live || !first_entry_t) && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) {
+ (void)aclk_upd_dimension_event(
+ wc,
+ claim_id,
+ (uuid_t *)sqlite3_column_blob(res, 0),
+ (const char *)(const char *)sqlite3_column_text(res, 3),
+ (const char *)(const char *)sqlite3_column_text(res, 4),
+ (const char *)(const char *)sqlite3_column_text(res, 2),
+ first_entry_t,
+ live ? 0 : last_entry_t,
+ &send_status);
+ if (!send_status)
+ dimension_update_count++;
+ }
+ }
}
if (update_every) {
debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
@@ -891,8 +972,12 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
#ifdef NETDATA_INTERNAL_CHECKS
for (int i = 0; i < rotate_data.interval_duration_count; ++i)
- info("Update for host %s (node %s) for %u Retention = %u", wc->host_guid, wc->node_id,
- rotate_data.interval_durations[i].update_every, rotate_data.interval_durations[i].retention);
+ info(
+ "Update for host %s (node %s) for %u Retention = %u",
+ wc->host_guid,
+ wc->node_id,
+ rotate_data.interval_durations[i].update_every,
+ rotate_data.interval_durations[i].retention);
#endif
aclk_retention_updated(&rotate_data);
freez(rotate_data.node_id);
@@ -906,63 +991,60 @@ failed:
return;
}
-
uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc)
{
- BUFFER *sql = buffer_create(1024);
- sqlite3_stmt *res = NULL;
+ char sql[ACLK_SYNC_QUERY_SIZE];
+ static __thread sqlite3_stmt *res = NULL;
- buffer_sprintf(sql,"SELECT count(1) FROM aclk_chart_%s ac WHERE ac.date_submitted IS NULL;", wc->uuid_str);
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT count(1) FROM aclk_chart_%s ac WHERE ac.date_submitted IS NULL;", wc->uuid_str);
int rc;
uint32_t chart_payload_count = 0;
- rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
- if (rc != SQLITE_OK) {
- error_report("Failed to prepare statement to count pending messages");
- goto fail;
+ if (unlikely(!res)) {
+ rc = prepare_statement(db_meta, sql, &res);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement to count pending messages");
+ return 0;
+ }
}
while (sqlite3_step(res) == SQLITE_ROW)
chart_payload_count = (uint32_t) sqlite3_column_int(res, 0);
- rc = sqlite3_finalize(res);
+ rc = sqlite3_reset(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to reset statement when fetching pending messages, rc = %d", rc);
-fail:
- buffer_free(sql);
return chart_payload_count;
}
void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc)
{
- BUFFER *sql = buffer_create(1024);
+ char sql[ACLK_SYNC_QUERY_SIZE];
- buffer_sprintf(sql,"SELECT ac.sequence_id, ac.date_created FROM aclk_chart_%s ac " \
- "WHERE ac.date_submitted IS NOT NULL ORDER BY ac.sequence_id DESC LIMIT 1;", wc->uuid_str);
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT ac.sequence_id, ac.date_created FROM aclk_chart_%s ac " \
+ "WHERE ac.date_submitted IS NOT NULL ORDER BY ac.sequence_id DESC LIMIT 1;", wc->uuid_str);
int rc;
sqlite3_stmt *res = NULL;
- rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to find last chart sequence id");
- goto fail;
+ return;
}
wc->chart_sequence_id = 0;
wc->chart_timestamp = 0;
while (sqlite3_step(res) == SQLITE_ROW) {
- wc->chart_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
- wc->chart_timestamp = (time_t) sqlite3_column_int64(res, 1);
+ wc->chart_sequence_id = (uint64_t)sqlite3_column_int64(res, 0);
+ wc->chart_timestamp = (time_t)sqlite3_column_int64(res, 1);
}
- debug(D_ACLK_SYNC,"Node %s reports last sequence_id=%"PRIu64, wc->node_id, wc->chart_sequence_id);
+ debug(D_ACLK_SYNC, "Node %s reports last sequence_id=%" PRIu64, wc->node_id, wc->chart_sequence_id);
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to reset statement when fetching chart sequence info, rc = %d", rc);
-fail:
- buffer_free(sql);
return;
}
@@ -973,6 +1055,184 @@ int queue_dimension_to_aclk(RRDDIM *rd)
return rc;
}
+void aclk_send_dimension_update(RRDDIM *rd)
+{
+ if (!aclk_use_new_cloud_arch)
+ return;
+
+ char *claim_id = is_agent_claimed();
+ if (unlikely(!claim_id))
+ return;
+
+ time_t first_entry_t = rrddim_first_entry_t(rd);
+ time_t last_entry_t = rrddim_last_entry_t(rd);
+
+ time_t now = now_realtime_sec();
+ int live = ((now - rd->last_collected_time.tv_sec) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every));
+
+ if (!live || rd->state->aclk_live_status != live || !first_entry_t) {
+ (void)aclk_upd_dimension_event(
+ rd->rrdset->rrdhost->dbsync_worker,
+ claim_id,
+ &rd->state->metric_uuid,
+ rd->id,
+ rd->name,
+ rd->rrdset->id,
+ first_entry_t,
+ live ? 0 : last_entry_t,
+ NULL);
+
+ if (!first_entry_t)
+ debug(
+ D_ACLK_SYNC,
+ "%s: Update dimension chart=%s dim=%s live=%d (%ld, %ld)",
+ rd->rrdset->rrdhost->hostname,
+ rd->rrdset->name,
+ rd->name,
+ live,
+ first_entry_t,
+ last_entry_t);
+ else
+ debug(
+ D_ACLK_SYNC,
+ "%s: Update dimension chart=%s dim=%s live=%d (%ld, %ld) collected %ld seconds ago",
+ rd->rrdset->rrdhost->hostname,
+ rd->rrdset->name,
+ rd->name,
+ live,
+ first_entry_t,
+ last_entry_t,
+ now - last_entry_t);
+ rd->state->aclk_live_status = live;
+ }
+
+ freez(claim_id);
+ return;
+}
+
+#define SQL_SEQ_NULL(result, n) sqlite3_column_type(result, n) == SQLITE_NULL ? 0 : sqlite3_column_int64(result, n)
+
+struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host)
+{
+ struct aclk_chart_sync_stats *aclk_statistics = NULL;
+
+ struct aclk_database_worker_config *wc = NULL;
+ wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ if (!wc)
+ return NULL;
+
+ aclk_statistics = callocz(1, sizeof(struct aclk_chart_sync_stats));
+
+ aclk_statistics->updates = wc->chart_updates;
+ aclk_statistics->batch_id = wc->batch_id;
+
+ char host_uuid_fixed[GUID_LEN + 1];
+
+ strncpy(host_uuid_fixed, host->machine_guid, GUID_LEN);
+ host_uuid_fixed[GUID_LEN] = 0;
+
+ host_uuid_fixed[8] = '_';
+ host_uuid_fixed[13] = '_';
+ host_uuid_fixed[18] = '_';
+ host_uuid_fixed[23] = '_';
+
+ sqlite3_stmt *res = NULL;
+ BUFFER *sql = buffer_create(1024);
+ buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s;", host_uuid_fixed);
+ buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s WHERE date_submitted IS NULL;", host_uuid_fixed);
+ buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s WHERE date_submitted IS NOT NULL;", host_uuid_fixed);
+ buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s WHERE date_updated IS NOT NULL;", host_uuid_fixed);
+ buffer_sprintf(sql, "SELECT max(date_created), max(date_submitted), max(date_updated), 0 FROM aclk_chart_%s;", host_uuid_fixed);
+
+ int rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ buffer_free(sql);
+ freez(aclk_statistics);
+ return NULL;
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->min_seqid = SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_seqid = SQL_SEQ_NULL(res, 1);
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->min_seqid_pend = SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_seqid_pend = SQL_SEQ_NULL(res, 1);
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->min_seqid_sent = SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_seqid_sent = SQL_SEQ_NULL(res, 1);
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->min_seqid_ack = SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_seqid_ack = SQL_SEQ_NULL(res, 1);
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->min_seqid_ack = SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_seqid_ack = SQL_SEQ_NULL(res, 1);
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->max_date_created = (time_t) SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_date_submitted = (time_t) SQL_SEQ_NULL(res, 1);
+ aclk_statistics->max_date_ack = (time_t) SQL_SEQ_NULL(res, 2);
+ }
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement when fetching aclk sync statistics, rc = %d", rc);
+
+ buffer_free(sql);
+ return aclk_statistics;
+}
+
+void sql_check_chart_liveness(RRDSET *st) {
+ RRDDIM *rd;
+
+ if (unlikely(st->state->is_ar_chart))
+ return;
+
+ rrdset_rdlock(st);
+ if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
+ if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) {
+ debug(D_ACLK_SYNC,"Check chart liveness [%s] submit chart definition", st->name);
+ rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ }
+ }
+ else
+ debug(D_ACLK_SYNC,"Check chart liveness [%s] chart definition already submitted", st->name);
+ time_t mark = now_realtime_sec();
+
+ debug(D_ACLK_SYNC,"Check chart liveness [%s] scanning dimensions", st->name);
+ rrddim_foreach_read(rd, st) {
+ if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) {
+ int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every;
+ if (unlikely(live != rd->state->aclk_live_status)) {
+ if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
+ if (likely(!queue_dimension_to_aclk(rd))) {
+ debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live);
+ rd->state->aclk_live_status = live;
+ rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
+ }
+ }
+ }
+ else
+ debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name);
+ }
+ }
+ rrdset_unlock(st);
+}
+
#endif //ENABLE_NEW_CLOUD_PROTOCOL
// ST is read locked
@@ -994,4 +1254,3 @@ int queue_chart_to_aclk(RRDSET *st)
st, ACLK_DATABASE_ADD_CHART);
#endif
}
-
diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h
index 67d81a53..1d25de24 100644
--- a/database/sqlite/sqlite_aclk_chart.h
+++ b/database/sqlite/sqlite_aclk_chart.h
@@ -12,10 +12,30 @@ typedef enum payload_type {
extern sqlite3 *db_meta;
-#ifndef RRDSET_MINIMUM_LIVE_COUNT
-#define RRDSET_MINIMUM_LIVE_COUNT 3
+#ifndef RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER
+#define RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER (3)
#endif
+#ifndef ACLK_MAX_DIMENSION_CLEANUP
+#define ACLK_MAX_DIMENSION_CLEANUP (500)
+#endif
+
+struct aclk_chart_sync_stats {
+ int updates;
+ uint64_t batch_id;
+ uint64_t min_seqid;
+ uint64_t max_seqid;
+ uint64_t min_seqid_pend;
+ uint64_t max_seqid_pend;
+ uint64_t min_seqid_sent;
+ uint64_t max_seqid_sent;
+ uint64_t min_seqid_ack;
+ uint64_t max_seqid_ack;
+ time_t max_date_created;
+ time_t max_date_submitted;
+ time_t max_date_ack;
+};
+
extern int queue_chart_to_aclk(RRDSET *st);
extern int queue_dimension_to_aclk(RRDDIM *rd);
extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
@@ -34,4 +54,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc);
+void aclk_send_dimension_update(RRDDIM *rd);
+struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host);
+void sql_check_chart_liveness(RRDSET *st);
#endif //NETDATA_SQLITE_ACLK_CHART_H
diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c
index 6261b9af..97e6bebd 100644
--- a/database/sqlite/sqlite_aclk_node.c
+++ b/database/sqlite/sqlite_aclk_node.c
@@ -17,16 +17,23 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
if (!wc->host)
return;
- rrd_wrlock();
+ rrd_rdlock();
node_info.node_id = wc->node_id;
node_info.claim_id = is_agent_claimed();
node_info.machine_guid = wc->host_guid;
node_info.child = (wc->host != localhost);
- node_info.ml_info.ml_capable = localhost->system_info->ml_capable;
- node_info.ml_info.ml_enabled = wc->host->ml_host != NULL;
+ node_info.ml_info.ml_capable = ml_capable(localhost);
+ node_info.ml_info.ml_enabled = ml_enabled(wc->host);
now_realtime_timeval(&node_info.updated_at);
RRDHOST *host = wc->host;
+ char *host_version = NULL;
+ if (host != localhost) {
+ netdata_mutex_lock(&host->receiver_lock);
+ host_version =
+ strdupz(host->receiver && host->receiver->program_version ? host->receiver->program_version : "unknown");
+ netdata_mutex_unlock(&host->receiver_lock);
+ }
node_info.data.name = host->hostname;
node_info.data.os = (char *) host->os;
@@ -35,15 +42,15 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
node_info.data.kernel_name = host->system_info->kernel_name;
node_info.data.kernel_version = host->system_info->kernel_version;
node_info.data.architecture = host->system_info->architecture;
- node_info.data.cpus = str2uint32_t(host->system_info->host_cores);
- node_info.data.cpu_frequency = host->system_info->host_cpu_freq;
- node_info.data.memory = host->system_info->host_ram_total;
- node_info.data.disk_space = host->system_info->host_disk_space;
- node_info.data.version = VERSION;
+ node_info.data.cpus = host->system_info->host_cores ? str2uint32_t(host->system_info->host_cores) : 0;
+ node_info.data.cpu_frequency = host->system_info->host_cpu_freq ? host->system_info->host_cpu_freq : "0";
+ node_info.data.memory = host->system_info->host_ram_total ? host->system_info->host_ram_total : "0";
+ node_info.data.disk_space = host->system_info->host_disk_space ? host->system_info->host_disk_space : "0";
+ node_info.data.version = host_version ? host_version : VERSION;
node_info.data.release_channel = "nightly";
node_info.data.timezone = (char *) host->abbrev_timezone;
- node_info.data.virtualization_type = host->system_info->virtualization;
- node_info.data.container_type = host->system_info->container;
+ node_info.data.virtualization_type = host->system_info->virtualization ? host->system_info->virtualization : "unknown";
+ node_info.data.container_type = host->system_info->container ? host->system_info->container : "unknown";
node_info.data.custom_info = config_get(CONFIG_SECTION_WEB, "custom dashboard_info.js", "");
node_info.data.services = NULL; // char **
node_info.data.service_count = 0;
@@ -56,11 +63,12 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
node_info.data.host_labels_head = labels->head;
aclk_update_node_info(&node_info);
- log_access("OG [%s (%s)]: Sending node info for guid [%s] (%s).", wc->node_id, wc->host->hostname, wc->host_guid, wc->host == localhost ? "parent" : "child");
+ log_access("ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", wc->node_id, wc->host->hostname, wc->host_guid, wc->host == localhost ? "parent" : "child");
netdata_rwlock_unlock(&labels->labels_rwlock);
rrd_unlock();
freez(node_info.claim_id);
+ freez(host_version);
#else
UNUSED(wc);
#endif
diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c
index d5afdb6e..1e1d2a74 100644
--- a/database/sqlite/sqlite_functions.c
+++ b/database/sqlite/sqlite_functions.c
@@ -2,6 +2,8 @@
#include "sqlite_functions.h"
+#define DB_METADATA_VERSION "1"
+
const char *database_config[] = {
"PRAGMA auto_vacuum=incremental; PRAGMA synchronous=1 ; PRAGMA journal_mode=WAL; PRAGMA temp_store=MEMORY;",
"PRAGMA journal_size_limit=16777216;",
@@ -40,6 +42,9 @@ const char *database_config[] = {
"CREATE VIEW IF NOT EXISTS v_chart_hash as SELECT ch.*, chm.chart_id FROM chart_hash ch, chart_hash_map chm "
"WHERE ch.hash_id = chm.hash_id;",
+ "CREATE TRIGGER IF NOT EXISTS ins_host AFTER INSERT ON host BEGIN INSERT INTO node_instance (host_id, date_created)"
+ " SELECT new.host_id, strftime(\"%s\") WHERE new.host_id NOT IN (SELECT host_id FROM node_instance); END;",
+
"CREATE TRIGGER IF NOT EXISTS tr_v_chart_hash INSTEAD OF INSERT on v_chart_hash BEGIN "
"INSERT INTO chart_hash (hash_id, type, id, name, family, context, title, unit, plugin, "
"module, priority, chart_type, last_used) "
@@ -49,6 +54,7 @@ const char *database_config[] = {
"INSERT INTO chart_hash_map (chart_id, hash_id) values (new.chart_id, new.hash_id) "
"on conflict (chart_id, hash_id) do nothing; END; ",
+ "PRAGMA user_version="DB_METADATA_VERSION";",
NULL
};
@@ -115,7 +121,7 @@ static int store_active_uuid_object(sqlite3_stmt **res, char *statement, uuid_t
// Check if we should need to prepare the statement
if (!*res) {
- rc = sqlite3_prepare_v2(db_meta, statement, -1, res, 0);
+ rc = prepare_statement(db_meta, statement, res);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to prepare statement to store active object, rc = %d", rc);
return rc;
@@ -136,7 +142,7 @@ static int store_active_uuid_object(sqlite3_stmt **res, char *statement, uuid_t
*/
void store_active_chart(uuid_t *chart_uuid)
{
- sqlite3_stmt *res = NULL;
+ static __thread sqlite3_stmt *res = NULL;
int rc;
if (unlikely(!db_meta)) {
@@ -152,7 +158,7 @@ void store_active_chart(uuid_t *chart_uuid)
if (rc != SQLITE_DONE)
error_report("Failed to store active chart, rc = %d", rc);
- rc = sqlite3_finalize(res);
+ rc = sqlite3_reset(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement in store active chart, rc = %d", rc);
return;
@@ -164,7 +170,7 @@ void store_active_chart(uuid_t *chart_uuid)
*/
void store_active_dimension(uuid_t *dimension_uuid)
{
- sqlite3_stmt *res = NULL;
+ static __thread sqlite3_stmt *res = NULL;
int rc;
if (unlikely(!db_meta)) {
@@ -180,7 +186,7 @@ void store_active_dimension(uuid_t *dimension_uuid)
if (rc != SQLITE_DONE)
error_report("Failed to store active dimension, rc = %d", rc);
- rc = sqlite3_finalize(res);
+ rc = sqlite3_reset(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement in store active dimension, rc = %d", rc);
return;
@@ -905,6 +911,49 @@ bind_fail:
return 1;
}
+/*
+ * Store set option for a dimension
+ */
+int sql_set_dimension_option(uuid_t *dim_uuid, char *option)
+{
+ sqlite3_stmt *res = NULL;
+ int rc;
+
+ if (unlikely(!db_meta)) {
+ if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
+ return 0;
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ rc = sqlite3_prepare_v2(db_meta, "UPDATE dimension SET options = @options WHERE dim_id = @dim_id", -1, &res, 0);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to update dimension options");
+ return 0;
+ };
+
+ rc = sqlite3_bind_blob(res, 2, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ if (!option || !strcmp(option,"unhide"))
+ rc = sqlite3_bind_null(res, 1);
+ else
+ rc = sqlite3_bind_text(res, 1, option, -1, SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = execute_insert(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to update dimension option, rc = %d", rc);
+
+bind_fail:
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement in update dimension options, rc = %d", rc);
+ return 0;
+}
+
//
// Support for archived charts
@@ -1286,7 +1335,7 @@ void add_migrated_file(char *path, uint64_t file_size)
void sql_store_chart_label(uuid_t *chart_uuid, int source_type, char *label, char *value)
{
- sqlite3_stmt *res = NULL;
+ static __thread sqlite3_stmt *res = NULL;
int rc;
if (unlikely(!db_meta)) {
@@ -1295,10 +1344,12 @@ void sql_store_chart_label(uuid_t *chart_uuid, int source_type, char *label, cha
return;
}
- rc = sqlite3_prepare_v2(db_meta, SQL_INS_CHART_LABEL, -1, &res, 0);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to prepare statement store chart labels");
- return;
+ if (unlikely(!res)) {
+ rc = prepare_statement(db_meta, SQL_INS_CHART_LABEL, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement store chart labels");
+ return;
+ }
}
rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC);
@@ -1330,8 +1381,8 @@ void sql_store_chart_label(uuid_t *chart_uuid, int source_type, char *label, cha
error_report("Failed to store chart label entry, rc = %d", rc);
failed:
- if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
- error_report("Failed to finalize the prepared statement when storing chart label information");
+ if (unlikely(sqlite3_reset(res) != SQLITE_OK))
+ error_report("Failed to reset the prepared statement when storing chart label information");
return;
}
@@ -1402,12 +1453,14 @@ static RRDDIM *create_rrdim_entry(RRDSET *st, char *id, char *name, uuid_t *metr
}
#endif
-#define SELECT_CHART_CONTEXT "select d.dim_id, d.id, d.name, c.id, c.type, c.name, c.update_every, c.chart_id from chart c, " \
+#define SELECT_CHART_CONTEXT "select d.dim_id, d.id, d.name, c.id, c.type, c.name, c.update_every, c.chart_id, " \
+ "c.context, CASE WHEN d.options = 'hidden' THEN 1 else 0 END from chart c, " \
"dimension d, host h " \
"where d.chart_id = c.chart_id and c.host_id = h.host_id and c.host_id = @host_id and c.context = @context " \
"order by c.chart_id asc, c.type||c.id desc;"
-#define SELECT_CHART_SINGLE "select d.dim_id, d.id, d.name, c.id, c.type, c.name, c.update_every, c.chart_id, c.context from chart c, " \
+#define SELECT_CHART_SINGLE "select d.dim_id, d.id, d.name, c.id, c.type, c.name, c.update_every, c.chart_id, " \
+ "c.context, CASE WHEN d.options = 'hidden' THEN 1 else 0 END from chart c, " \
"dimension d, host h " \
"where d.chart_id = c.chart_id and c.host_id = h.host_id and c.host_id = @host_id and c.type||'.'||c.id = @chart " \
"order by c.chart_id asc, c.type||'.'||c.id desc;"
@@ -1501,6 +1554,8 @@ void sql_build_context_param_list(struct context_param **param_list, RRDHOST *ho
st->last_entry_t = MAX(st->last_entry_t, (*param_list)->last_entry_t);
RRDDIM *rd = create_rrdim_entry(st, (char *)sqlite3_column_text(res, 1), (char *)sqlite3_column_text(res, 2), &rrdeng_uuid);
+ if (sqlite3_column_int(res, 9) == 1)
+ rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN);
rd->next = (*param_list)->rd;
(*param_list)->rd = rd;
}
@@ -1828,25 +1883,75 @@ failed:
return rc - 1;
}
-#define SQL_SELECT_HOST_BY_NODE_ID "select host_id from node_instance where node_id = @node_id;"
+#define SQL_SELECT_HOSTNAME_BY_NODE_ID "SELECT h.hostname FROM node_instance ni, " \
+"host h WHERE ni.host_id = h.host_id AND ni.node_id = @node_id;"
-int get_host_id(uuid_t *node_id, uuid_t *host_id)
+char *get_hostname_by_node_id(char *node)
{
sqlite3_stmt *res = NULL;
+ char *hostname = NULL;
int rc;
+ rrd_rdlock();
+ RRDHOST *host = find_host_by_node_id(node);
+ rrd_unlock();
+ if (host)
+ return strdupz(host->hostname);
+
if (unlikely(!db_meta)) {
if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
error_report("Database has not been initialized");
- return 1;
+ return NULL;
+ }
+
+ uuid_t node_id;
+ if (uuid_parse(node, node_id))
+ return NULL;
+
+ rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOSTNAME_BY_NODE_ID, -1, &res, 0);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to fetch hostname by node id");
+ return NULL;
}
- rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_BY_NODE_ID, -1, &res, 0);
+ rc = sqlite3_bind_blob(res, 1, &node_id, sizeof(node_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to prepare statement to select node instance information for a node");
+ error_report("Failed to bind host_id parameter to select node instance information");
+ goto failed;
+ }
+
+ rc = sqlite3_step(res);
+ if (likely(rc == SQLITE_ROW))
+ hostname = strdupz((char *)sqlite3_column_text(res, 0));
+
+failed:
+ if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
+ error_report("Failed to finalize the prepared statement when search for hostname by node id");
+
+ return hostname;
+}
+
+#define SQL_SELECT_HOST_BY_NODE_ID "select host_id from node_instance where node_id = @node_id;"
+
+int get_host_id(uuid_t *node_id, uuid_t *host_id)
+{
+ static __thread sqlite3_stmt *res = NULL;
+ int rc;
+
+ if (unlikely(!db_meta)) {
+ if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
+ error_report("Database has not been initialized");
return 1;
}
+ if (unlikely(!res)) {
+ rc = prepare_statement(db_meta, SQL_SELECT_HOST_BY_NODE_ID, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to select node instance information for a node");
+ return 1;
+ }
+ }
+
rc = sqlite3_bind_blob(res, 1, node_id, sizeof(*node_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host_id parameter to select node instance information");
@@ -1858,8 +1963,8 @@ int get_host_id(uuid_t *node_id, uuid_t *host_id)
uuid_copy(*host_id, *((uuid_t *) sqlite3_column_blob(res, 0)));
failed:
- if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
- error_report("Failed to finalize the prepared statement when selecting node instance information");
+ if (unlikely(sqlite3_reset(res) != SQLITE_OK))
+ error_report("Failed to reset the prepared statement when selecting node instance information");
return (rc == SQLITE_ROW) ? 0 : -1;
}
@@ -1978,7 +2083,7 @@ struct node_instance_list *get_node_list(void)
node_list = callocz(row + 1, sizeof(*node_list));
int max_rows = row;
row = 0;
- rrd_wrlock();
+ rrd_rdlock();
while (sqlite3_step(res) == SQLITE_ROW) {
if (sqlite3_column_bytes(res, 0) == sizeof(uuid_t))
uuid_copy(node_list[row].node_id, *((uuid_t *)sqlite3_column_blob(res, 0)));
@@ -2011,7 +2116,7 @@ failed:
void sql_load_node_id(RRDHOST *host)
{
- sqlite3_stmt *res = NULL;
+ static __thread sqlite3_stmt *res = NULL;
int rc;
if (unlikely(!db_meta)) {
@@ -2020,11 +2125,13 @@ void sql_load_node_id(RRDHOST *host)
return;
}
- rc = sqlite3_prepare_v2(db_meta, SQL_GET_HOST_NODE_ID, -1, &res, 0);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to prepare statement to fetch node id");
- return;
- };
+ if (unlikely(!res)) {
+ rc = prepare_statement(db_meta, SQL_GET_HOST_NODE_ID, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to fetch node id");
+ return;
+ };
+ }
rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
@@ -2041,8 +2148,8 @@ void sql_load_node_id(RRDHOST *host)
}
failed:
- if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
- error_report("Failed to finalize the prepared statement when loading node instance information");
+ if (unlikely(sqlite3_reset(res) != SQLITE_OK))
+ error_report("Failed to reset the prepared statement when loading node instance information");
return;
};
diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h
index 3e41f6aa..30b8dee6 100644
--- a/database/sqlite/sqlite_functions.h
+++ b/database/sqlite/sqlite_functions.h
@@ -98,4 +98,6 @@ extern void invalidate_node_instances(uuid_t *host_id, uuid_t *claim_id);
extern struct node_instance_list *get_node_list(void);
extern void sql_load_node_id(RRDHOST *host);
extern void compute_chart_hash(RRDSET *st);
+extern int sql_set_dimension_option(uuid_t *dim_uuid, char *option);
+char *get_hostname_by_node_id(char *node_id);
#endif //NETDATA_SQLITE_FUNCTIONS_H
diff --git a/database/sqlite/sqlite_health.c b/database/sqlite/sqlite_health.c
index 27c67c3a..8ba95628 100644
--- a/database/sqlite/sqlite_health.c
+++ b/database/sqlite/sqlite_health.c
@@ -900,7 +900,8 @@ int sql_store_alert_config_hash(uuid_t *hash_id, struct alert_config *cfg)
#endif
int alert_hash_and_store_config(
uuid_t hash_id,
- struct alert_config *cfg)
+ struct alert_config *cfg,
+ int store_hash)
{
#if !defined DISABLE_CLOUD && defined ENABLE_HTTPS
EVP_MD_CTX *evpctx;
@@ -946,10 +947,12 @@ int alert_hash_and_store_config(
uuid_copy(hash_id, *((uuid_t *)&hash_value));
/* store everything, so it can be recreated when not in memory or just a subset ? */
- (void)sql_store_alert_config_hash( (uuid_t *)&hash_value, cfg);
+ if (store_hash)
+ (void)sql_store_alert_config_hash( (uuid_t *)&hash_value, cfg);
#else
UNUSED(hash_id);
UNUSED(cfg);
+ UNUSED(store_hash);
#endif
return 1;
diff --git a/database/sqlite/sqlite_health.h b/database/sqlite/sqlite_health.h
index 3b946089..ef837894 100644
--- a/database/sqlite/sqlite_health.h
+++ b/database/sqlite/sqlite_health.h
@@ -12,6 +12,6 @@ extern void sql_health_alarm_log_update(RRDHOST *host, ALARM_ENTRY *ae);
extern void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae);
extern void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae);
extern void sql_health_alarm_log_cleanup(RRDHOST *host);
-extern int alert_hash_and_store_config(uuid_t hash_id, struct alert_config *cfg);
+extern int alert_hash_and_store_config(uuid_t hash_id, struct alert_config *cfg, int store_hash);
extern void sql_aclk_alert_clean_dead_entries(RRDHOST *host);
#endif //NETDATA_SQLITE_HEALTH_H