diff options
Diffstat (limited to 'src/streaming/sender-destinations.c')
-rw-r--r-- | src/streaming/sender-destinations.c | 143 |
1 files changed, 143 insertions, 0 deletions
diff --git a/src/streaming/sender-destinations.c b/src/streaming/sender-destinations.c new file mode 100644 index 000000000..5e67ca039 --- /dev/null +++ b/src/streaming/sender-destinations.c @@ -0,0 +1,143 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "sender-internals.h" + +void rrdpush_reset_destinations_postpone_time(RRDHOST *host) { + uint32_t wait = (host->sender) ? host->sender->reconnect_delay : 5; + time_t now = now_realtime_sec(); + for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) + d->postpone_reconnection_until = now + wait; +} + +void rrdpush_sender_ssl_init(RRDHOST *host) { + static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER; + spinlock_lock(&sp); + + if(netdata_ssl_streaming_sender_ctx || !host) { + spinlock_unlock(&sp); + return; + } + + for(struct rrdpush_destinations *d = host->destinations; d ; d = d->next) { + if (d->ssl) { + // we need to initialize SSL + + netdata_ssl_initialize_ctx(NETDATA_SSL_STREAMING_SENDER_CTX); + ssl_security_location_for_context(netdata_ssl_streaming_sender_ctx, stream_conf_ssl_ca_file, stream_conf_ssl_ca_path); + + // stop the loop + break; + } + } + + spinlock_unlock(&sp); +} + +int connect_to_one_of_destinations( + RRDHOST *host, + int default_port, + struct timeval *timeout, + size_t *reconnects_counter, + char *connected_to, + size_t connected_to_size, + struct rrdpush_destinations **destination) +{ + int sock = -1; + + for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) { + time_t now = now_realtime_sec(); + + if(nd_thread_signaled_to_cancel()) + return -1; + + if(d->postpone_reconnection_until > now) + continue; + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "STREAM %s: connecting to '%s' (default port: %d)...", + rrdhost_hostname(host), string2str(d->destination), default_port); + + if (reconnects_counter) + *reconnects_counter += 1; + + d->since = now; + d->attempts++; + sock = connect_to_this(string2str(d->destination), default_port, timeout); + + if (sock != -1) { + if (connected_to && connected_to_size) + strncpyz(connected_to, string2str(d->destination), connected_to_size); + + *destination = d; + + // move the current item to the end of the list + // without this, this destination will break the loop again and again + // not advancing the destinations to find one that may work + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, d, prev, next); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(host->destinations, d, prev, next); + + break; + } + } + + return sock; +} + +struct destinations_init_tmp { + RRDHOST *host; + struct rrdpush_destinations *list; + int count; +}; + +static bool destinations_init_add_one(char *entry, void *data) { + struct destinations_init_tmp *t = data; + + struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations)); + char *colon_ssl = strstr(entry, ":SSL"); + if(colon_ssl) { + *colon_ssl = '\0'; + d->ssl = true; + } + else + d->ssl = false; + + d->destination = string_strdupz(entry); + + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED); + + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next); + + t->count++; + nd_log_daemon(NDLP_INFO, "STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); + + return false; // we return false, so that we will get all defined destinations +} + +void rrdpush_destinations_init(RRDHOST *host) { + if(!host->rrdpush.send.destination) return; + + rrdpush_destinations_free(host); + + struct destinations_init_tmp t = { + .host = host, + .list = NULL, + .count = 0, + }; + + foreach_entry_in_connection_string(host->rrdpush.send.destination, destinations_init_add_one, &t); + + host->destinations = t.list; +} + +void rrdpush_destinations_free(RRDHOST *host) { + while (host->destinations) { + struct rrdpush_destinations *tmp = host->destinations; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, tmp, prev, next); + string_freez(tmp->destination); + freez(tmp); + __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED); + } + + host->destinations = NULL; +} + |