1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
// SPDX-License-Identifier: GPL-3.0-or-later
#include "sender-internals.h"
void rrdpush_reset_destinations_postpone_time(RRDHOST *host) {
uint32_t wait = (host->sender) ? host->sender->reconnect_delay : 5;
time_t now = now_realtime_sec();
for (struct rrdpush_destinations *d = host->destinations; d; d = d->next)
d->postpone_reconnection_until = now + wait;
}
void rrdpush_sender_ssl_init(RRDHOST *host) {
static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
spinlock_lock(&sp);
if(netdata_ssl_streaming_sender_ctx || !host) {
spinlock_unlock(&sp);
return;
}
for(struct rrdpush_destinations *d = host->destinations; d ; d = d->next) {
if (d->ssl) {
// we need to initialize SSL
netdata_ssl_initialize_ctx(NETDATA_SSL_STREAMING_SENDER_CTX);
ssl_security_location_for_context(netdata_ssl_streaming_sender_ctx, stream_conf_ssl_ca_file, stream_conf_ssl_ca_path);
// stop the loop
break;
}
}
spinlock_unlock(&sp);
}
int connect_to_one_of_destinations(
RRDHOST *host,
int default_port,
struct timeval *timeout,
size_t *reconnects_counter,
char *connected_to,
size_t connected_to_size,
struct rrdpush_destinations **destination)
{
int sock = -1;
for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) {
time_t now = now_realtime_sec();
if(nd_thread_signaled_to_cancel())
return -1;
if(d->postpone_reconnection_until > now)
continue;
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"STREAM %s: connecting to '%s' (default port: %d)...",
rrdhost_hostname(host), string2str(d->destination), default_port);
if (reconnects_counter)
*reconnects_counter += 1;
d->since = now;
d->attempts++;
sock = connect_to_this(string2str(d->destination), default_port, timeout);
if (sock != -1) {
if (connected_to && connected_to_size)
strncpyz(connected_to, string2str(d->destination), connected_to_size);
*destination = d;
// move the current item to the end of the list
// without this, this destination will break the loop again and again
// not advancing the destinations to find one that may work
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, d, prev, next);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(host->destinations, d, prev, next);
break;
}
}
return sock;
}
struct destinations_init_tmp {
RRDHOST *host;
struct rrdpush_destinations *list;
int count;
};
static bool destinations_init_add_one(char *entry, void *data) {
struct destinations_init_tmp *t = data;
struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
char *colon_ssl = strstr(entry, ":SSL");
if(colon_ssl) {
*colon_ssl = '\0';
d->ssl = true;
}
else
d->ssl = false;
d->destination = string_strdupz(entry);
__atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next);
t->count++;
nd_log_daemon(NDLP_INFO, "STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
return false; // we return false, so that we will get all defined destinations
}
void rrdpush_destinations_init(RRDHOST *host) {
if(!host->rrdpush.send.destination) return;
rrdpush_destinations_free(host);
struct destinations_init_tmp t = {
.host = host,
.list = NULL,
.count = 0,
};
foreach_entry_in_connection_string(host->rrdpush.send.destination, destinations_init_add_one, &t);
host->destinations = t.list;
}
void rrdpush_destinations_free(RRDHOST *host) {
while (host->destinations) {
struct rrdpush_destinations *tmp = host->destinations;
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, tmp, prev, next);
string_freez(tmp->destination);
freez(tmp);
__atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
}
host->destinations = NULL;
}
|