summaryrefslogtreecommitdiffstats
path: root/database/contexts
diff options
context:
space:
mode:
Diffstat (limited to 'database/contexts')
-rw-r--r--database/contexts/api_v2.c13
-rw-r--r--database/contexts/instance.c42
-rw-r--r--database/contexts/metric.c28
-rw-r--r--database/contexts/query_target.c10
-rw-r--r--database/contexts/rrdcontext.c65
5 files changed, 100 insertions, 58 deletions
diff --git a/database/contexts/api_v2.c b/database/contexts/api_v2.c
index d0b27a2aa..3ca49a319 100644
--- a/database/contexts/api_v2.c
+++ b/database/contexts/api_v2.c
@@ -507,7 +507,7 @@ static bool rrdcontext_matches_alert(struct rrdcontext_to_json_v2_data *ctl, RRD
if (ctl->options & (CONTEXT_V2_OPTION_ALERTS_WITH_INSTANCES | CONTEXT_V2_OPTION_ALERTS_WITH_VALUES)) {
char key[20 + 1];
- snprintfz(key, 20, "%p", rcl);
+ snprintfz(key, sizeof(key) - 1, "%p", rcl);
struct sql_alert_instance_v2_entry z = {
.ati = ati,
@@ -616,10 +616,10 @@ static void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST_STATUS *s, const char *
buffer_json_member_add_object(wb, "source");
{
char buf[1024 + 1];
- snprintfz(buf, 1024, "[%s]:%d%s", s->ingest.peers.local.ip, s->ingest.peers.local.port, s->ingest.ssl ? ":SSL" : "");
+ snprintfz(buf, sizeof(buf) - 1, "[%s]:%d%s", s->ingest.peers.local.ip, s->ingest.peers.local.port, s->ingest.ssl ? ":SSL" : "");
buffer_json_member_add_string(wb, "local", buf);
- snprintfz(buf, 1024, "[%s]:%d%s", s->ingest.peers.peer.ip, s->ingest.peers.peer.port, s->ingest.ssl ? ":SSL" : "");
+ snprintfz(buf, sizeof(buf) - 1, "[%s]:%d%s", s->ingest.peers.peer.ip, s->ingest.peers.peer.port, s->ingest.ssl ? ":SSL" : "");
buffer_json_member_add_string(wb, "remote", buf);
stream_capabilities_to_json_array(wb, s->ingest.capabilities, "capabilities");
@@ -659,10 +659,10 @@ static void rrdhost_sender_to_json(BUFFER *wb, RRDHOST_STATUS *s, const char *ke
buffer_json_member_add_object(wb, "destination");
{
char buf[1024 + 1];
- snprintfz(buf, 1024, "[%s]:%d%s", s->stream.peers.local.ip, s->stream.peers.local.port, s->stream.ssl ? ":SSL" : "");
+ snprintfz(buf, sizeof(buf) - 1, "[%s]:%d%s", s->stream.peers.local.ip, s->stream.peers.local.port, s->stream.ssl ? ":SSL" : "");
buffer_json_member_add_string(wb, "local", buf);
- snprintfz(buf, 1024, "[%s]:%d%s", s->stream.peers.peer.ip, s->stream.peers.peer.port, s->stream.ssl ? ":SSL" : "");
+ snprintfz(buf, sizeof(buf) - 1, "[%s]:%d%s", s->stream.peers.peer.ip, s->stream.peers.peer.port, s->stream.ssl ? ":SSL" : "");
buffer_json_member_add_string(wb, "remote", buf);
stream_capabilities_to_json_array(wb, s->stream.capabilities, "capabilities");
@@ -674,6 +674,7 @@ static void rrdhost_sender_to_json(BUFFER *wb, RRDHOST_STATUS *s, const char *ke
buffer_json_member_add_uint64(wb, "metadata", s->stream.sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_METADATA]);
buffer_json_member_add_uint64(wb, "functions", s->stream.sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_FUNCTIONS]);
buffer_json_member_add_uint64(wb, "replication", s->stream.sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_REPLICATION]);
+ buffer_json_member_add_uint64(wb, "dyncfg", s->stream.sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_DYNCFG]);
}
buffer_json_object_close(wb); // traffic
@@ -685,7 +686,7 @@ static void rrdhost_sender_to_json(BUFFER *wb, RRDHOST_STATUS *s, const char *ke
{
if (d->ssl) {
- snprintfz(buf, 1024, "%s:SSL", string2str(d->destination));
+ snprintfz(buf, sizeof(buf) - 1, "%s:SSL", string2str(d->destination));
buffer_json_member_add_string(wb, "destination", buf);
}
else
diff --git a/database/contexts/instance.c b/database/contexts/instance.c
index 8a60ce662..39837dbf6 100644
--- a/database/contexts/instance.c
+++ b/database/contexts/instance.c
@@ -329,11 +329,11 @@ inline void rrdinstance_from_rrdset(RRDSET *st) {
RRDINSTANCE_ACQUIRED *ria = (RRDINSTANCE_ACQUIRED *)dictionary_set_and_acquire_item(rc->rrdinstances, string2str(tri.id), &tri, sizeof(tri));
- RRDCONTEXT_ACQUIRED *rca_old = st->rrdcontext;
- RRDINSTANCE_ACQUIRED *ria_old = st->rrdinstance;
+ RRDCONTEXT_ACQUIRED *rca_old = st->rrdcontexts.rrdcontext;
+ RRDINSTANCE_ACQUIRED *ria_old = st->rrdcontexts.rrdinstance;
- st->rrdcontext = rca;
- st->rrdinstance = ria;
+ st->rrdcontexts.rrdcontext = rca;
+ st->rrdcontexts.rrdinstance = ria;
if(rca == rca_old) {
rrdcontext_release(rca_old);
@@ -354,16 +354,16 @@ inline void rrdinstance_from_rrdset(RRDSET *st) {
// migrate all dimensions to the new metrics
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if (!rd->rrdmetric) continue;
+ if (!rd->rrdcontexts.rrdmetric) continue;
- RRDMETRIC *rm_old = rrdmetric_acquired_value(rd->rrdmetric);
+ RRDMETRIC *rm_old = rrdmetric_acquired_value(rd->rrdcontexts.rrdmetric);
rrd_flags_replace(rm_old, RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
rm_old->rrddim = NULL;
rm_old->first_time_s = 0;
rm_old->last_time_s = 0;
- rrdmetric_release(rd->rrdmetric);
- rd->rrdmetric = NULL;
+ rrdmetric_release(rd->rrdcontexts.rrdmetric);
+ rd->rrdcontexts.rrdmetric = NULL;
rrdmetric_from_rrddim(rd);
}
@@ -406,12 +406,12 @@ inline void rrdinstance_from_rrdset(RRDSET *st) {
#define rrdset_get_rrdinstance(st) rrdset_get_rrdinstance_with_trace(st, __FUNCTION__);
static inline RRDINSTANCE *rrdset_get_rrdinstance_with_trace(RRDSET *st, const char *function) {
- if(unlikely(!st->rrdinstance)) {
+ if(unlikely(!st->rrdcontexts.rrdinstance)) {
netdata_log_error("RRDINSTANCE: RRDSET '%s' is not linked to an RRDINSTANCE at %s()", rrdset_id(st), function);
return NULL;
}
- RRDINSTANCE *ri = rrdinstance_acquired_value(st->rrdinstance);
+ RRDINSTANCE *ri = rrdinstance_acquired_value(st->rrdcontexts.rrdinstance);
if(unlikely(!ri)) {
netdata_log_error("RRDINSTANCE: RRDSET '%s' lost its link to an RRDINSTANCE at %s()", rrdset_id(st), function);
return NULL;
@@ -439,14 +439,17 @@ inline void rrdinstance_rrdset_is_freed(RRDSET *st) {
rrdinstance_trigger_updates(ri, __FUNCTION__ );
- rrdinstance_release(st->rrdinstance);
- st->rrdinstance = NULL;
+ rrdinstance_release(st->rrdcontexts.rrdinstance);
+ st->rrdcontexts.rrdinstance = NULL;
- rrdcontext_release(st->rrdcontext);
- st->rrdcontext = NULL;
+ rrdcontext_release(st->rrdcontexts.rrdcontext);
+ st->rrdcontexts.rrdcontext = NULL;
+ st->rrdcontexts.collected = false;
}
inline void rrdinstance_rrdset_has_updated_retention(RRDSET *st) {
+ st->rrdcontexts.collected = false;
+
RRDINSTANCE *ri = rrdset_get_rrdinstance(st);
if(unlikely(!ri)) return;
@@ -455,8 +458,10 @@ inline void rrdinstance_rrdset_has_updated_retention(RRDSET *st) {
}
inline void rrdinstance_updated_rrdset_name(RRDSET *st) {
+ st->rrdcontexts.collected = false;
+
// the chart may not be initialized when this is called
- if(unlikely(!st->rrdinstance)) return;
+ if(unlikely(!st->rrdcontexts.rrdinstance)) return;
RRDINSTANCE *ri = rrdset_get_rrdinstance(st);
if(unlikely(!ri)) return;
@@ -491,6 +496,8 @@ inline void rrdinstance_updated_rrdset_flags_no_action(RRDINSTANCE *ri, RRDSET *
}
inline void rrdinstance_updated_rrdset_flags(RRDSET *st) {
+ st->rrdcontexts.collected = false;
+
RRDINSTANCE *ri = rrdset_get_rrdinstance(st);
if(unlikely(!ri)) return;
@@ -503,6 +510,11 @@ inline void rrdinstance_updated_rrdset_flags(RRDSET *st) {
}
inline void rrdinstance_collected_rrdset(RRDSET *st) {
+ if(st->rrdcontexts.collected)
+ return;
+
+ st->rrdcontexts.collected = true;
+
RRDINSTANCE *ri = rrdset_get_rrdinstance(st);
if(unlikely(!ri)) {
rrdcontext_updated_rrdset(st);
diff --git a/database/contexts/metric.c b/database/contexts/metric.c
index 55efde4e9..0f0785972 100644
--- a/database/contexts/metric.c
+++ b/database/contexts/metric.c
@@ -239,10 +239,10 @@ void rrdmetric_from_rrddim(RRDDIM *rd) {
if(unlikely(!rd->rrdset->rrdhost))
fatal("RRDMETRIC: rrdset '%s' does not have a rrdhost", rrdset_id(rd->rrdset));
- if(unlikely(!rd->rrdset->rrdinstance))
+ if(unlikely(!rd->rrdset->rrdcontexts.rrdinstance))
fatal("RRDMETRIC: rrdset '%s' does not have a rrdinstance", rrdset_id(rd->rrdset));
- RRDINSTANCE *ri = rrdinstance_acquired_value(rd->rrdset->rrdinstance);
+ RRDINSTANCE *ri = rrdinstance_acquired_value(rd->rrdset->rrdcontexts.rrdinstance);
RRDMETRIC trm = {
.id = string_dup(rd->id),
@@ -254,20 +254,21 @@ void rrdmetric_from_rrddim(RRDDIM *rd) {
RRDMETRIC_ACQUIRED *rma = (RRDMETRIC_ACQUIRED *)dictionary_set_and_acquire_item(ri->rrdmetrics, string2str(trm.id), &trm, sizeof(trm));
- if(rd->rrdmetric)
- rrdmetric_release(rd->rrdmetric);
+ if(rd->rrdcontexts.rrdmetric)
+ rrdmetric_release(rd->rrdcontexts.rrdmetric);
- rd->rrdmetric = rma;
+ rd->rrdcontexts.rrdmetric = rma;
+ rd->rrdcontexts.collected = false;
}
#define rrddim_get_rrdmetric(rd) rrddim_get_rrdmetric_with_trace(rd, __FUNCTION__)
static inline RRDMETRIC *rrddim_get_rrdmetric_with_trace(RRDDIM *rd, const char *function) {
- if(unlikely(!rd->rrdmetric)) {
+ if(unlikely(!rd->rrdcontexts.rrdmetric)) {
netdata_log_error("RRDMETRIC: RRDDIM '%s' is not linked to an RRDMETRIC at %s()", rrddim_id(rd), function);
return NULL;
}
- RRDMETRIC *rm = rrdmetric_acquired_value(rd->rrdmetric);
+ RRDMETRIC *rm = rrdmetric_acquired_value(rd->rrdcontexts.rrdmetric);
if(unlikely(!rm)) {
netdata_log_error("RRDMETRIC: RRDDIM '%s' lost the link to its RRDMETRIC at %s()", rrddim_id(rd), function);
return NULL;
@@ -288,11 +289,14 @@ inline void rrdmetric_rrddim_is_freed(RRDDIM *rd) {
rm->rrddim = NULL;
rrdmetric_trigger_updates(rm, __FUNCTION__ );
- rrdmetric_release(rd->rrdmetric);
- rd->rrdmetric = NULL;
+ rrdmetric_release(rd->rrdcontexts.rrdmetric);
+ rd->rrdcontexts.rrdmetric = NULL;
+ rd->rrdcontexts.collected = false;
}
inline void rrdmetric_updated_rrddim_flags(RRDDIM *rd) {
+ rd->rrdcontexts.collected = false;
+
RRDMETRIC *rm = rrddim_get_rrdmetric(rd);
if(unlikely(!rm)) return;
@@ -305,6 +309,11 @@ inline void rrdmetric_updated_rrddim_flags(RRDDIM *rd) {
}
inline void rrdmetric_collected_rrddim(RRDDIM *rd) {
+ if(rd->rrdcontexts.collected)
+ return;
+
+ rd->rrdcontexts.collected = true;
+
RRDMETRIC *rm = rrddim_get_rrdmetric(rd);
if(unlikely(!rm)) return;
@@ -316,4 +325,3 @@ inline void rrdmetric_collected_rrddim(RRDDIM *rd) {
rrdmetric_trigger_updates(rm, __FUNCTION__ );
}
-
diff --git a/database/contexts/query_target.c b/database/contexts/query_target.c
index d969691dd..95abc3e65 100644
--- a/database/contexts/query_target.c
+++ b/database/contexts/query_target.c
@@ -835,8 +835,8 @@ static ssize_t query_context_add(void *data, RRDCONTEXT_ACQUIRED *rca, bool quer
if(query_instance_add(qtl, qn, qc, qt->request.ria, queryable_context, false))
added++;
}
- else if(unlikely(qtl->st && qtl->st->rrdcontext == rca && qtl->st->rrdinstance)) {
- if(query_instance_add(qtl, qn, qc, qtl->st->rrdinstance, queryable_context, false))
+ else if(unlikely(qtl->st && qtl->st->rrdcontexts.rrdcontext == rca && qtl->st->rrdcontexts.rrdinstance)) {
+ if(query_instance_add(qtl, qn, qc, qtl->st->rrdcontexts.rrdinstance, queryable_context, false))
added++;
}
else {
@@ -894,11 +894,11 @@ static ssize_t query_node_add(void *data, RRDHOST *host, bool queryable_host) {
qn->node_id[0] = '\0';
// is the chart given valid?
- if(unlikely(qtl->st && (!qtl->st->rrdinstance || !qtl->st->rrdcontext))) {
+ if(unlikely(qtl->st && (!qtl->st->rrdcontexts.rrdinstance || !qtl->st->rrdcontexts.rrdcontext))) {
netdata_log_error("QUERY TARGET: RRDSET '%s' given, but it is not linked to rrdcontext structures. Linking it now.", rrdset_name(qtl->st));
rrdinstance_from_rrdset(qtl->st);
- if(unlikely(qtl->st && (!qtl->st->rrdinstance || !qtl->st->rrdcontext))) {
+ if(unlikely(qtl->st && (!qtl->st->rrdcontexts.rrdinstance || !qtl->st->rrdcontexts.rrdcontext))) {
netdata_log_error("QUERY TARGET: RRDSET '%s' given, but failed to be linked to rrdcontext structures. Switching to context query.",
rrdset_name(qtl->st));
@@ -918,7 +918,7 @@ static ssize_t query_node_add(void *data, RRDHOST *host, bool queryable_host) {
}
else if(unlikely(qtl->st)) {
// single chart data queries
- if(query_context_add(qtl, qtl->st->rrdcontext, true))
+ if(query_context_add(qtl, qtl->st->rrdcontexts.rrdcontext, true))
added++;
}
else {
diff --git a/database/contexts/rrdcontext.c b/database/contexts/rrdcontext.c
index 8538d17f2..9dee39be2 100644
--- a/database/contexts/rrdcontext.c
+++ b/database/contexts/rrdcontext.c
@@ -224,26 +224,31 @@ void rrdcontext_hub_checkpoint_command(void *ptr) {
struct ctxs_checkpoint *cmd = ptr;
if(!rrdhost_check_our_claim_id(cmd->claim_id)) {
- netdata_log_error("RRDCONTEXT: received checkpoint command for claim_id '%s', node id '%s', but this is not our claim id. Ours '%s', received '%s'. Ignoring command.",
- cmd->claim_id, cmd->node_id,
- localhost->aclk_state.claimed_id?localhost->aclk_state.claimed_id:"NOT SET",
- cmd->claim_id);
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
+ "RRDCONTEXT: received checkpoint command for claim_id '%s', node id '%s', "
+ "but this is not our claim id. Ours '%s', received '%s'. Ignoring command.",
+ cmd->claim_id, cmd->node_id,
+ localhost->aclk_state.claimed_id?localhost->aclk_state.claimed_id:"NOT SET",
+ cmd->claim_id);
return;
}
RRDHOST *host = rrdhost_find_by_node_id(cmd->node_id);
if(!host) {
- netdata_log_error("RRDCONTEXT: received checkpoint command for claim id '%s', node id '%s', but there is no node with such node id here. Ignoring command.",
- cmd->claim_id,
- cmd->node_id);
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
+ "RRDCONTEXT: received checkpoint command for claim id '%s', node id '%s', "
+ "but there is no node with such node id here. Ignoring command.",
+ cmd->claim_id, cmd->node_id);
return;
}
if(rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS)) {
- netdata_log_info("RRDCONTEXT: received checkpoint command for claim id '%s', node id '%s', while node '%s' has an active context streaming.",
- cmd->claim_id, cmd->node_id, rrdhost_hostname(host));
+ nd_log(NDLS_DAEMON, NDLP_NOTICE,
+ "RRDCONTEXT: received checkpoint command for claim id '%s', node id '%s', "
+ "while node '%s' has an active context streaming.",
+ cmd->claim_id, cmd->node_id, rrdhost_hostname(host));
// disable it temporarily, so that our worker will not attempt to send messages in parallel
rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS);
@@ -252,8 +257,10 @@ void rrdcontext_hub_checkpoint_command(void *ptr) {
uint64_t our_version_hash = rrdcontext_version_hash(host);
if(cmd->version_hash != our_version_hash) {
- netdata_log_error("RRDCONTEXT: received version hash %"PRIu64" for host '%s', does not match our version hash %"PRIu64". Sending snapshot of all contexts.",
- cmd->version_hash, rrdhost_hostname(host), our_version_hash);
+ nd_log(NDLS_DAEMON, NDLP_NOTICE,
+ "RRDCONTEXT: received version hash %"PRIu64" for host '%s', does not match our version hash %"PRIu64". "
+ "Sending snapshot of all contexts.",
+ cmd->version_hash, rrdhost_hostname(host), our_version_hash);
#ifdef ENABLE_ACLK
// prepare the snapshot
@@ -275,41 +282,55 @@ void rrdcontext_hub_checkpoint_command(void *ptr) {
#endif
}
- internal_error(true, "RRDCONTEXT: host '%s' enabling streaming of contexts", rrdhost_hostname(host));
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "RRDCONTEXT: host '%s' enabling streaming of contexts",
+ rrdhost_hostname(host));
+
rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS);
char node_str[UUID_STR_LEN];
uuid_unparse_lower(*host->node_id, node_str);
- netdata_log_access("ACLK REQ [%s (%s)]: STREAM CONTEXTS ENABLED", node_str, rrdhost_hostname(host));
+ nd_log(NDLS_ACCESS, NDLP_DEBUG,
+ "ACLK REQ [%s (%s)]: STREAM CONTEXTS ENABLED",
+ node_str, rrdhost_hostname(host));
}
void rrdcontext_hub_stop_streaming_command(void *ptr) {
struct stop_streaming_ctxs *cmd = ptr;
if(!rrdhost_check_our_claim_id(cmd->claim_id)) {
- netdata_log_error("RRDCONTEXT: received stop streaming command for claim_id '%s', node id '%s', but this is not our claim id. Ours '%s', received '%s'. Ignoring command.",
- cmd->claim_id, cmd->node_id,
- localhost->aclk_state.claimed_id?localhost->aclk_state.claimed_id:"NOT SET",
- cmd->claim_id);
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
+ "RRDCONTEXT: received stop streaming command for claim_id '%s', node id '%s', "
+ "but this is not our claim id. Ours '%s', received '%s'. Ignoring command.",
+ cmd->claim_id, cmd->node_id,
+ localhost->aclk_state.claimed_id?localhost->aclk_state.claimed_id:"NOT SET",
+ cmd->claim_id);
return;
}
RRDHOST *host = rrdhost_find_by_node_id(cmd->node_id);
if(!host) {
- netdata_log_error("RRDCONTEXT: received stop streaming command for claim id '%s', node id '%s', but there is no node with such node id here. Ignoring command.",
- cmd->claim_id, cmd->node_id);
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
+ "RRDCONTEXT: received stop streaming command for claim id '%s', node id '%s', "
+ "but there is no node with such node id here. Ignoring command.",
+ cmd->claim_id, cmd->node_id);
return;
}
if(!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS)) {
- netdata_log_error("RRDCONTEXT: received stop streaming command for claim id '%s', node id '%s', but node '%s' does not have active context streaming. Ignoring command.",
- cmd->claim_id, cmd->node_id, rrdhost_hostname(host));
+ nd_log(NDLS_DAEMON, NDLP_NOTICE,
+ "RRDCONTEXT: received stop streaming command for claim id '%s', node id '%s', "
+ "but node '%s' does not have active context streaming. Ignoring command.",
+ cmd->claim_id, cmd->node_id, rrdhost_hostname(host));
return;
}
- internal_error(true, "RRDCONTEXT: host '%s' disabling streaming of contexts", rrdhost_hostname(host));
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "RRDCONTEXT: host '%s' disabling streaming of contexts",
+ rrdhost_hostname(host));
+
rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS);
}