diff options
Diffstat (limited to '')
-rw-r--r-- | streaming/README.md | 10 | ||||
-rw-r--r-- | streaming/receiver.c | 18 | ||||
-rw-r--r-- | streaming/rrdpush.c | 37 | ||||
-rw-r--r-- | streaming/rrdpush.h | 12 | ||||
-rw-r--r-- | streaming/sender.c | 7 |
5 files changed, 63 insertions, 21 deletions
diff --git a/streaming/README.md b/streaming/README.md index 94ab1f2e..7f74fb31 100644 --- a/streaming/README.md +++ b/streaming/README.md @@ -6,19 +6,19 @@ custom_edit_url: https://github.com/netdata/netdata/edit/master/streaming/README # Streaming and replication -Each Netdata is able to replicate/mirror its database to another Netdata, by streaming collected -metrics, in real-time to it. This is quite different to [data archiving to third party time-series +Each Netdata is able to replicate/mirror its database to another Netdata, by streaming the collected +metrics in real-time to it. This is quite different to [data archiving to third party time-series databases](/exporting/README.md). When Netdata streams metrics to another Netdata, the receiving one is able to perform everything a Netdata instance is -capable of: +capable of. This includes the following: - Visualize metrics with a dashboard - Run health checks that trigger alarms and send alarm notifications -- Export metrics to a external time-series database +- Export metrics to an external time-series database The nodes that send metrics are called **child** nodes, and the nodes that receive metrics are called **parent** nodes. -There are also **proxies**, which collects metrics from a child and sends it to a parent. +There are also **proxy** nodes, which collects metrics from a child and sends it to a parent. ## Supported configurations diff --git a/streaming/receiver.c b/streaming/receiver.c index 11191f3c..e8f8528a 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -11,6 +11,7 @@ void destroy_receiver_state(struct receiver_state *rpt) { freez(rpt->machine_guid); freez(rpt->os); freez(rpt->timezone); + freez(rpt->abbrev_timezone); freez(rpt->tags); freez(rpt->client_ip); freez(rpt->client_port); @@ -49,7 +50,7 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) { } } -#include "../collectors/plugins.d/pluginsd_parser.h" +#include "collectors/plugins.d/pluginsd_parser.h" PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins_action) { @@ -220,6 +221,8 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp parser->plugins_action->overwrite_action = &pluginsd_overwrite_action; parser->plugins_action->chart_action = &pluginsd_chart_action; parser->plugins_action->set_action = &pluginsd_set_action; + parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action; + parser->plugins_action->clabel_action = &pluginsd_clabel_action; user->parser = parser; @@ -307,6 +310,8 @@ static int rrdpush_receive(struct receiver_state *rpt) , rpt->machine_guid , rpt->os , rpt->timezone + , rpt->abbrev_timezone + , rpt->utc_offset , rpt->tags , rpt->program_name , rpt->program_version @@ -341,13 +346,12 @@ static int rrdpush_receive(struct receiver_state *rpt) netdata_mutex_unlock(&rpt->host->receiver_lock); } +#ifdef NETDATA_INTERNAL_CHECKS int ssl = 0; #ifdef ENABLE_HTTPS if (rpt->ssl.conn != NULL) ssl = 1; #endif - -#ifdef NETDATA_INTERNAL_CHECKS info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'" , rpt->hostname , rpt->client_ip @@ -451,11 +455,11 @@ static int rrdpush_receive(struct receiver_state *rpt) cd.version = rpt->stream_version; -#if defined(ENABLE_ACLK) && !defined(ACLK_NG) +#if defined(ENABLE_ACLK) // in case we have cloud connection we inform cloud // new slave connected if (netdata_cloud_setting) - aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_CONNECT); + aclk_host_state_update(rpt->host, 1); #endif size_t count = streaming_parser(rpt, &cd, fp); @@ -465,11 +469,11 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, rpt->client_port, count); -#if defined(ENABLE_ACLK) && !defined(ACLK_NG) +#if defined(ENABLE_ACLK) // in case we have cloud connection we inform cloud // new slave connected if (netdata_cloud_setting) - aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_DISCONNECT); + aclk_host_state_update(rpt->host, 0); #endif // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index f54fc609..53a89769 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -1,7 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" -#include "../parser/parser.h" +#include "parser/parser.h" /* * rrdpush @@ -183,6 +183,24 @@ static inline int need_to_send_chart_definition(RRDSET *st) { return 0; } +// chart labels +void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) { + struct label_index *labels_c = &st->state->labels; + if (labels_c) { + netdata_rwlock_rdlock(&host->labels.labels_rwlock); + struct label *lbl = labels_c->head; + while(lbl) { + buffer_sprintf(host->sender->build, + "CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source); + + lbl = lbl->next; + } + if (labels_c->head) + buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n"); + netdata_rwlock_unlock(&host->labels.labels_rwlock); + } +} + // Send the current chart definition. // Assumes that collector thread has already called sender_start for mutex / buffer state. static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { @@ -224,6 +242,10 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { , (st->module_name)?st->module_name:"" ); + // send the chart labels + if (host->sender->version >= STREAM_VERSION_CLABELS) + rrdpush_send_clabels(host, st); + // send the dimensions RRDDIM *rd; rrddim_foreach_read(rd, st) { @@ -464,13 +486,14 @@ void *rrdpush_receiver_thread(void *ptr); int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { info("clients wants to STREAM metrics."); - char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL; + char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *abbrev_timezone = "UTC", *tags = NULL; + int32_t utc_offset = 0; int update_every = default_rrd_update_every; uint32_t stream_version = UINT_MAX; char buf[GUID_LEN + 1]; struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); - + system_info->hops = 1; while(url) { char *value = mystrsep(&url, "&"); if(!value || !*value) continue; @@ -493,6 +516,12 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { os = value; else if(!strcmp(name, "timezone")) timezone = value; + else if(!strcmp(name, "abbrev_timezone")) + abbrev_timezone = value; + else if(!strcmp(name, "utc_offset")) + utc_offset = (int32_t)strtol(value, NULL, 0); + else if(!strcmp(name, "hops")) + system_info->hops = (uint16_t) strtoul(value, NULL, 0); else if(!strcmp(name, "tags")) tags = value; else if(!strcmp(name, "ver")) @@ -680,6 +709,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { rpt->machine_guid = strdupz(machine_guid); rpt->os = strdupz(os); rpt->timezone = strdupz(timezone); + rpt->abbrev_timezone = strdupz(abbrev_timezone); + rpt->utc_offset = utc_offset; rpt->tags = (tags)?strdupz(tags):NULL; rpt->client_ip = strdupz(w->client_ip); rpt->client_port = strdupz(w->client_port); diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 225d0c29..027ccd10 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -3,17 +3,17 @@ #ifndef NETDATA_RRDPUSH_H #define NETDATA_RRDPUSH_H 1 -#include "../database/rrd.h" -#include "../libnetdata/libnetdata.h" +#include "database/rrd.h" +#include "libnetdata/libnetdata.h" #include "web/server/web_client.h" #include "daemon/common.h" #define CONNECTED_TO_SIZE 100 -// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 Gap-filling -#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3 -#define VERSION_GAP_FILLING 4 +#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 #define STREAM_VERSION_CLAIM 3 +#define STREAM_VERSION_CLABELS 4 +#define VERSION_GAP_FILLING 5 #define STREAMING_PROTOCOL_VERSION "1.1" #define START_STREAMING_PROMPT "Hit me baby, push them over..." @@ -72,6 +72,8 @@ struct receiver_state { char *machine_guid; char *os; char *timezone; // Unused? + char *abbrev_timezone; + int32_t utc_offset; char *tags; char *client_ip; // Duplicated in pluginsd char *client_port; // Duplicated in pluginsd diff --git a/streaming/sender.c b/streaming/sender.c index 1dee1f05..0abfac18 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -214,7 +214,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po char http[HTTP_HEADER_SIZE + 1]; int eol = snprintfz(http, HTTP_HEADER_SIZE, - "STREAM key=%s&hostname=%s®istry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s&ver=%u" + "STREAM key=%s&hostname=%s®istry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&abbrev_timezone=%s&utc_offset=%d&hops=%d&tags=%s&ver=%u" "&NETDATA_SYSTEM_OS_NAME=%s" "&NETDATA_SYSTEM_OS_ID=%s" "&NETDATA_SYSTEM_OS_ID_LIKE=%s" @@ -250,6 +250,9 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po , default_rrd_update_every , host->os , host->timezone + , host->abbrev_timezone + , host->utc_offset + , host->system_info->hops + 1 , (host->tags) ? host->tags : "" , STREAMING_PROTOCOL_CURRENT_VERSION , se.os_name @@ -424,7 +427,9 @@ void attempt_to_send(struct sender_state *s) { rrdpush_send_labels(s->host); +#ifdef NETDATA_INTERNAL_CHECKS struct circular_buffer *cb = s->buffer; +#endif netdata_thread_disable_cancelability(); netdata_mutex_lock(&s->mutex); |