diff options
Diffstat (limited to '')
-rw-r--r-- | streaming/rrdpush.c | 88 |
1 files changed, 83 insertions, 5 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 8829d1eea..77774d8d3 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -136,11 +136,7 @@ static inline int should_send_chart_matching(RRDSET *st) { if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION)) return ml_streaming_enabled(); - if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND); - rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); - } - else if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) { + if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) { RRDHOST *host = st->rrdhost; if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) || @@ -421,6 +417,86 @@ void rrdpush_claimed_id(RRDHOST *host) error("STREAM %s [send]: cannot write to internal pipe", host->hostname); } +int connect_to_one_of_destinations( + struct rrdpush_destinations *destinations, + int default_port, + struct timeval *timeout, + size_t *reconnects_counter, + char *connected_to, + size_t connected_to_size, + struct rrdpush_destinations **destination) +{ + int sock = -1; + + for (struct rrdpush_destinations *d = destinations; d; d = d->next) { + if (d->disabled_no_proper_reply) { + d->disabled_no_proper_reply = 0; + continue; + } else if (d->disabled_because_of_localhost) { + continue; + } else if (d->disabled_already_streaming && (d->disabled_already_streaming + 30 > now_realtime_sec())) { + continue; + } else if (d->disabled_because_of_denied_access) { + d->disabled_because_of_denied_access = 0; + continue; + } + + if (reconnects_counter) + *reconnects_counter += 1; + sock = connect_to_this(d->destination, default_port, timeout); + if (sock != -1) { + if (connected_to && connected_to_size) { + strncpy(connected_to, d->destination, connected_to_size); + connected_to[connected_to_size - 1] = '\0'; + } + *destination = d; + break; + } + } + + return sock; +} + +struct rrdpush_destinations *destinations_init(const char *dests) { + const char *s = dests; + struct rrdpush_destinations *destinations = NULL, *prev = NULL; + while(*s) { + const char *e = s; + + // skip path, moving both s(tart) and e(nd) + if(*e == '/') + while(!isspace(*e) && *e != ',') s = ++e; + + // skip separators, moving both s(tart) and e(nd) + while(isspace(*e) || *e == ',') s = ++e; + + // move e(nd) to the first separator + while(*e && !isspace(*e) && *e != ',' && *e != '/') e++; + + // is there anything? + if(!*s || s == e) break; + + char buf[e - s + 1]; + strncpyz(buf, s, e - s); + struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations)); + strncpyz(d->destination, buf, sizeof(d->destination)-1); + d->disabled_no_proper_reply = 0; + d->disabled_because_of_localhost = 0; + d->disabled_already_streaming = 0; + d->disabled_because_of_denied_access = 0; + d->next = NULL; + if (!destinations) { + destinations = d; + } else { + prev->next = d; + } + prev = d; + + s = e; + } + return destinations; +} + // ---------------------------------------------------------------------------- // rrdpush sender thread @@ -539,6 +615,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { system_info->ml_capable = strtoul(value, NULL, 0); else if(!strcmp(name, "ml_enabled")) system_info->ml_enabled = strtoul(value, NULL, 0); + else if(!strcmp(name, "mc_version")) + system_info->mc_version = strtoul(value, NULL, 0); else if(!strcmp(name, "tags")) tags = value; else if(!strcmp(name, "ver")) |