summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/rrdpush.c88
1 files changed, 83 insertions, 5 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 8829d1eea..77774d8d3 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -136,11 +136,7 @@ static inline int should_send_chart_matching(RRDSET *st) {
if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION))
return ml_streaming_enabled();
- if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
- rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
- }
- else if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) {
+ if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) {
RRDHOST *host = st->rrdhost;
if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) ||
@@ -421,6 +417,86 @@ void rrdpush_claimed_id(RRDHOST *host)
error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
}
+int connect_to_one_of_destinations(
+ struct rrdpush_destinations *destinations,
+ int default_port,
+ struct timeval *timeout,
+ size_t *reconnects_counter,
+ char *connected_to,
+ size_t connected_to_size,
+ struct rrdpush_destinations **destination)
+{
+ int sock = -1;
+
+ for (struct rrdpush_destinations *d = destinations; d; d = d->next) {
+ if (d->disabled_no_proper_reply) {
+ d->disabled_no_proper_reply = 0;
+ continue;
+ } else if (d->disabled_because_of_localhost) {
+ continue;
+ } else if (d->disabled_already_streaming && (d->disabled_already_streaming + 30 > now_realtime_sec())) {
+ continue;
+ } else if (d->disabled_because_of_denied_access) {
+ d->disabled_because_of_denied_access = 0;
+ continue;
+ }
+
+ if (reconnects_counter)
+ *reconnects_counter += 1;
+ sock = connect_to_this(d->destination, default_port, timeout);
+ if (sock != -1) {
+ if (connected_to && connected_to_size) {
+ strncpy(connected_to, d->destination, connected_to_size);
+ connected_to[connected_to_size - 1] = '\0';
+ }
+ *destination = d;
+ break;
+ }
+ }
+
+ return sock;
+}
+
+struct rrdpush_destinations *destinations_init(const char *dests) {
+ const char *s = dests;
+ struct rrdpush_destinations *destinations = NULL, *prev = NULL;
+ while(*s) {
+ const char *e = s;
+
+ // skip path, moving both s(tart) and e(nd)
+ if(*e == '/')
+ while(!isspace(*e) && *e != ',') s = ++e;
+
+ // skip separators, moving both s(tart) and e(nd)
+ while(isspace(*e) || *e == ',') s = ++e;
+
+ // move e(nd) to the first separator
+ while(*e && !isspace(*e) && *e != ',' && *e != '/') e++;
+
+ // is there anything?
+ if(!*s || s == e) break;
+
+ char buf[e - s + 1];
+ strncpyz(buf, s, e - s);
+ struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
+ strncpyz(d->destination, buf, sizeof(d->destination)-1);
+ d->disabled_no_proper_reply = 0;
+ d->disabled_because_of_localhost = 0;
+ d->disabled_already_streaming = 0;
+ d->disabled_because_of_denied_access = 0;
+ d->next = NULL;
+ if (!destinations) {
+ destinations = d;
+ } else {
+ prev->next = d;
+ }
+ prev = d;
+
+ s = e;
+ }
+ return destinations;
+}
+
// ----------------------------------------------------------------------------
// rrdpush sender thread
@@ -539,6 +615,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
system_info->ml_capable = strtoul(value, NULL, 0);
else if(!strcmp(name, "ml_enabled"))
system_info->ml_enabled = strtoul(value, NULL, 0);
+ else if(!strcmp(name, "mc_version"))
+ system_info->mc_version = strtoul(value, NULL, 0);
else if(!strcmp(name, "tags"))
tags = value;
else if(!strcmp(name, "ver"))