summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/README.md10
-rw-r--r--streaming/receiver.c18
-rw-r--r--streaming/rrdpush.c37
-rw-r--r--streaming/rrdpush.h12
-rw-r--r--streaming/sender.c7
5 files changed, 63 insertions, 21 deletions
diff --git a/streaming/README.md b/streaming/README.md
index 94ab1f2e..7f74fb31 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -6,19 +6,19 @@ custom_edit_url: https://github.com/netdata/netdata/edit/master/streaming/README
# Streaming and replication
-Each Netdata is able to replicate/mirror its database to another Netdata, by streaming collected
-metrics, in real-time to it. This is quite different to [data archiving to third party time-series
+Each Netdata is able to replicate/mirror its database to another Netdata, by streaming the collected
+metrics in real-time to it. This is quite different to [data archiving to third party time-series
databases](/exporting/README.md).
When Netdata streams metrics to another Netdata, the receiving one is able to perform everything a Netdata instance is
-capable of:
+capable of. This includes the following:
- Visualize metrics with a dashboard
- Run health checks that trigger alarms and send alarm notifications
-- Export metrics to a external time-series database
+- Export metrics to an external time-series database
The nodes that send metrics are called **child** nodes, and the nodes that receive metrics are called **parent** nodes.
-There are also **proxies**, which collects metrics from a child and sends it to a parent.
+There are also **proxy** nodes, which collects metrics from a child and sends it to a parent.
## Supported configurations
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 11191f3c..e8f8528a 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -11,6 +11,7 @@ void destroy_receiver_state(struct receiver_state *rpt) {
freez(rpt->machine_guid);
freez(rpt->os);
freez(rpt->timezone);
+ freez(rpt->abbrev_timezone);
freez(rpt->tags);
freez(rpt->client_ip);
freez(rpt->client_port);
@@ -49,7 +50,7 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) {
}
}
-#include "../collectors/plugins.d/pluginsd_parser.h"
+#include "collectors/plugins.d/pluginsd_parser.h"
PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
@@ -220,6 +221,8 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
parser->plugins_action->overwrite_action = &pluginsd_overwrite_action;
parser->plugins_action->chart_action = &pluginsd_chart_action;
parser->plugins_action->set_action = &pluginsd_set_action;
+ parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action;
+ parser->plugins_action->clabel_action = &pluginsd_clabel_action;
user->parser = parser;
@@ -307,6 +310,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
, rpt->machine_guid
, rpt->os
, rpt->timezone
+ , rpt->abbrev_timezone
+ , rpt->utc_offset
, rpt->tags
, rpt->program_name
, rpt->program_version
@@ -341,13 +346,12 @@ static int rrdpush_receive(struct receiver_state *rpt)
netdata_mutex_unlock(&rpt->host->receiver_lock);
}
+#ifdef NETDATA_INTERNAL_CHECKS
int ssl = 0;
#ifdef ENABLE_HTTPS
if (rpt->ssl.conn != NULL)
ssl = 1;
#endif
-
-#ifdef NETDATA_INTERNAL_CHECKS
info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'"
, rpt->hostname
, rpt->client_ip
@@ -451,11 +455,11 @@ static int rrdpush_receive(struct receiver_state *rpt)
cd.version = rpt->stream_version;
-#if defined(ENABLE_ACLK) && !defined(ACLK_NG)
+#if defined(ENABLE_ACLK)
// in case we have cloud connection we inform cloud
// new slave connected
if (netdata_cloud_setting)
- aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_CONNECT);
+ aclk_host_state_update(rpt->host, 1);
#endif
size_t count = streaming_parser(rpt, &cd, fp);
@@ -465,11 +469,11 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
rpt->client_port, count);
-#if defined(ENABLE_ACLK) && !defined(ACLK_NG)
+#if defined(ENABLE_ACLK)
// in case we have cloud connection we inform cloud
// new slave connected
if (netdata_cloud_setting)
- aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_DISCONNECT);
+ aclk_host_state_update(rpt->host, 0);
#endif
// During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index f54fc609..53a89769 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
-#include "../parser/parser.h"
+#include "parser/parser.h"
/*
* rrdpush
@@ -183,6 +183,24 @@ static inline int need_to_send_chart_definition(RRDSET *st) {
return 0;
}
+// chart labels
+void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) {
+ struct label_index *labels_c = &st->state->labels;
+ if (labels_c) {
+ netdata_rwlock_rdlock(&host->labels.labels_rwlock);
+ struct label *lbl = labels_c->head;
+ while(lbl) {
+ buffer_sprintf(host->sender->build,
+ "CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source);
+
+ lbl = lbl->next;
+ }
+ if (labels_c->head)
+ buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n");
+ netdata_rwlock_unlock(&host->labels.labels_rwlock);
+ }
+}
+
// Send the current chart definition.
// Assumes that collector thread has already called sender_start for mutex / buffer state.
static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
@@ -224,6 +242,10 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
, (st->module_name)?st->module_name:""
);
+ // send the chart labels
+ if (host->sender->version >= STREAM_VERSION_CLABELS)
+ rrdpush_send_clabels(host, st);
+
// send the dimensions
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
@@ -464,13 +486,14 @@ void *rrdpush_receiver_thread(void *ptr);
int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
info("clients wants to STREAM metrics.");
- char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL;
+ char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *abbrev_timezone = "UTC", *tags = NULL;
+ int32_t utc_offset = 0;
int update_every = default_rrd_update_every;
uint32_t stream_version = UINT_MAX;
char buf[GUID_LEN + 1];
struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
-
+ system_info->hops = 1;
while(url) {
char *value = mystrsep(&url, "&");
if(!value || !*value) continue;
@@ -493,6 +516,12 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
os = value;
else if(!strcmp(name, "timezone"))
timezone = value;
+ else if(!strcmp(name, "abbrev_timezone"))
+ abbrev_timezone = value;
+ else if(!strcmp(name, "utc_offset"))
+ utc_offset = (int32_t)strtol(value, NULL, 0);
+ else if(!strcmp(name, "hops"))
+ system_info->hops = (uint16_t) strtoul(value, NULL, 0);
else if(!strcmp(name, "tags"))
tags = value;
else if(!strcmp(name, "ver"))
@@ -680,6 +709,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
rpt->machine_guid = strdupz(machine_guid);
rpt->os = strdupz(os);
rpt->timezone = strdupz(timezone);
+ rpt->abbrev_timezone = strdupz(abbrev_timezone);
+ rpt->utc_offset = utc_offset;
rpt->tags = (tags)?strdupz(tags):NULL;
rpt->client_ip = strdupz(w->client_ip);
rpt->client_port = strdupz(w->client_port);
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 225d0c29..027ccd10 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -3,17 +3,17 @@
#ifndef NETDATA_RRDPUSH_H
#define NETDATA_RRDPUSH_H 1
-#include "../database/rrd.h"
-#include "../libnetdata/libnetdata.h"
+#include "database/rrd.h"
+#include "libnetdata/libnetdata.h"
#include "web/server/web_client.h"
#include "daemon/common.h"
#define CONNECTED_TO_SIZE 100
-// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 Gap-filling
-#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3
-#define VERSION_GAP_FILLING 4
+#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4
#define STREAM_VERSION_CLAIM 3
+#define STREAM_VERSION_CLABELS 4
+#define VERSION_GAP_FILLING 5
#define STREAMING_PROTOCOL_VERSION "1.1"
#define START_STREAMING_PROMPT "Hit me baby, push them over..."
@@ -72,6 +72,8 @@ struct receiver_state {
char *machine_guid;
char *os;
char *timezone; // Unused?
+ char *abbrev_timezone;
+ int32_t utc_offset;
char *tags;
char *client_ip; // Duplicated in pluginsd
char *client_port; // Duplicated in pluginsd
diff --git a/streaming/sender.c b/streaming/sender.c
index 1dee1f05..0abfac18 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -214,7 +214,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
char http[HTTP_HEADER_SIZE + 1];
int eol = snprintfz(http, HTTP_HEADER_SIZE,
- "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s&ver=%u"
+ "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&abbrev_timezone=%s&utc_offset=%d&hops=%d&tags=%s&ver=%u"
"&NETDATA_SYSTEM_OS_NAME=%s"
"&NETDATA_SYSTEM_OS_ID=%s"
"&NETDATA_SYSTEM_OS_ID_LIKE=%s"
@@ -250,6 +250,9 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
, default_rrd_update_every
, host->os
, host->timezone
+ , host->abbrev_timezone
+ , host->utc_offset
+ , host->system_info->hops + 1
, (host->tags) ? host->tags : ""
, STREAMING_PROTOCOL_CURRENT_VERSION
, se.os_name
@@ -424,7 +427,9 @@ void attempt_to_send(struct sender_state *s) {
rrdpush_send_labels(s->host);
+#ifdef NETDATA_INTERNAL_CHECKS
struct circular_buffer *cb = s->buffer;
+#endif
netdata_thread_disable_cancelability();
netdata_mutex_lock(&s->mutex);