summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-06-14 19:20:36 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-06-14 19:20:36 +0000
commitdd24e74edfbafc09eaeb2dde0fda7eb3e1e86d0b (patch)
tree1e52f4dac2622ab377c7649f218fb49003b4cbb9 /streaming/rrdpush.c
parentReleasing debian version 1.39.1-2. (diff)
downloadnetdata-dd24e74edfbafc09eaeb2dde0fda7eb3e1e86d0b.tar.xz
netdata-dd24e74edfbafc09eaeb2dde0fda7eb3e1e86d0b.zip
Merging upstream version 1.40.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r--streaming/rrdpush.c186
1 files changed, 134 insertions, 52 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 62b537f0..c481871c 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -49,7 +49,6 @@ bool default_rrdpush_enable_replication = true;
time_t default_rrdpush_seconds_to_replicate = 86400;
time_t default_rrdpush_replication_step = 600;
#ifdef ENABLE_HTTPS
-int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL;
char *netdata_ssl_ca_path = NULL;
char *netdata_ssl_ca_file = NULL;
#endif
@@ -137,24 +136,10 @@ int rrdpush_init() {
}
#ifdef ENABLE_HTTPS
- if (netdata_use_ssl_on_stream == NETDATA_SSL_OPTIONAL) {
- if (default_rrdpush_destination){
- char *test = strstr(default_rrdpush_destination,":SSL");
- if(test){
- *test = 0X00;
- netdata_use_ssl_on_stream = NETDATA_SSL_FORCE;
- }
- }
- }
+ netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate);
- bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO);
-
- if(invalid_certificate == CONFIG_BOOLEAN_YES){
- if(netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
- info("Netdata is configured to accept invalid SSL certificate.");
- netdata_ssl_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
- }
- }
+ if(!netdata_ssl_validate_certificate_sender)
+ info("SSL: streaming senders will skip SSL certificates verification.");
netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL);
netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL);
@@ -390,7 +375,7 @@ bool rrdset_push_chart_definition_now(RRDSET *st) {
BUFFER *wb = sender_start(host->sender);
rrdpush_send_chart_definition(wb, st);
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
return true;
@@ -458,7 +443,7 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
}
- sender_commit(st->rrdhost->sender, rsb->wb);
+ sender_commit(st->rrdhost->sender, rsb->wb, STREAM_TRAFFIC_TYPE_DATA);
*rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
}
@@ -498,7 +483,7 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(!exposed_upstream)) {
BUFFER *wb = sender_start(host->sender);
replication_in_progress = rrdpush_send_chart_definition(wb, st);
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
}
if(replication_in_progress)
@@ -529,7 +514,7 @@ void rrdpush_send_host_labels(RRDHOST *host) {
rrdlabels_walkthrough_read(host->rrdlabels, send_labels_callback, wb);
buffer_sprintf(wb, "OVERWRITE %s\n", "labels");
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
}
@@ -548,7 +533,7 @@ void rrdpush_claimed_id(RRDHOST *host)
buffer_sprintf(wb, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") );
rrdhost_aclk_state_unlock(host);
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
}
@@ -579,6 +564,7 @@ int connect_to_one_of_destinations(
if (reconnects_counter)
*reconnects_counter += 1;
+ d->last_attempt = now;
sock = connect_to_this(string2str(d->destination), default_port, timeout);
if (sock != -1) {
@@ -610,6 +596,14 @@ bool destinations_init_add_one(char *entry, void *data) {
struct destinations_init_tmp *t = data;
struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
+ char *colon_ssl = strstr(entry, ":SSL");
+ if(colon_ssl) {
+ *colon_ssl = '\0';
+ d->ssl = true;
+ }
+ else
+ d->ssl = false;
+
d->destination = string_strdupz(entry);
__atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
@@ -712,7 +706,7 @@ int rrdpush_receiver_permission_denied(struct web_client *w) {
// we always respond with the same message and error code
// to prevent an attacker from gaining info about the error
buffer_flush(w->response.data);
- buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info.");
+ buffer_strcat(w->response.data, START_STREAMING_ERROR_NOT_PERMITTED);
return HTTP_RESP_UNAUTHORIZED;
}
@@ -720,10 +714,35 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) {
// we always respond with the same message and error code
// to prevent an attacker from gaining info about the error
buffer_flush(w->response.data);
- buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later.");
+ buffer_strcat(w->response.data, START_STREAMING_ERROR_BUSY_TRY_LATER);
return HTTP_RESP_SERVICE_UNAVAILABLE;
}
+static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struct receiver_state *rpt) {
+ rpt->fd = w->ifd;
+
+#ifdef ENABLE_HTTPS
+ rpt->ssl.conn = w->ssl.conn;
+ rpt->ssl.state = w->ssl.state;
+
+ w->ssl = NETDATA_SSL_UNSET_CONNECTION;
+#endif
+
+ WEB_CLIENT_IS_DEAD(w);
+
+ if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) {
+ web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET);
+ }
+ else {
+ if(w->ifd == w->ofd)
+ w->ifd = w->ofd = -1;
+ else
+ w->ifd = -1;
+ }
+
+ buffer_flush(w->response.data);
+}
+
void *rrdpush_receiver_thread(void *ptr);
int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) {
@@ -741,20 +760,16 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info));
rpt->system_info->hops = rpt->hops;
- rpt->fd = w->ifd;
+ rpt->fd = -1;
rpt->client_ip = strdupz(w->client_ip);
rpt->client_port = strdupz(w->client_port);
- rpt->config.update_every = default_rrd_update_every;
-
#ifdef ENABLE_HTTPS
- rpt->ssl.conn = w->ssl.conn;
- rpt->ssl.flags = w->ssl.flags;
-
- w->ssl.conn = NULL;
- w->ssl.flags = NETDATA_SSL_START;
+ rpt->ssl = NETDATA_SSL_UNSET_CONNECTION;
#endif
+ rpt->config.update_every = default_rrd_update_every;
+
// parse the parameters and fill rpt and rpt->system_info
while(decoded_query_string) {
@@ -1011,6 +1026,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
+ rrdpush_receiver_takeover_web_connection(w, rpt);
+
rrdpush_receive_log_status(
rpt,
"machine GUID is my own",
@@ -1032,9 +1049,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
);
}
- close(rpt->fd);
receiver_state_free(rpt);
- return web_client_socket_is_now_used_for_streaming(w);
+ return HTTP_RESP_OK;
}
if(unlikely(web_client_streaming_rate_t > 0)) {
@@ -1130,7 +1146,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
// Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
buffer_flush(w->response.data);
- buffer_strcat(w->response.data, "This GUID is already streaming to this server");
+ buffer_strcat(w->response.data, START_STREAMING_ERROR_ALREADY_STREAMING);
receiver_state_free(rpt);
return HTTP_RESP_CONFLICT;
}
@@ -1138,8 +1154,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
debug(D_SYSTEM, "starting STREAM receive thread.");
- char tag[FILENAME_MAX + 1];
- snprintfz(tag, FILENAME_MAX, THREAD_TAG_STREAM_RECEIVER "[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
+ rrdpush_receiver_takeover_web_connection(w, rpt);
+
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_RECEIVER "[%s]", rpt->hostname);
+ tag[NETDATA_THREAD_TAG_MAX] = '\0';
if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) {
rrdpush_receive_log_status(
@@ -1154,23 +1173,86 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
// prevent the caller from closing the streaming socket
- return web_client_socket_is_now_used_for_streaming(w);
+ return HTTP_RESP_OK;
+}
+
+void rrdpush_reset_destinations_postpone_time(RRDHOST *host) {
+ struct rrdpush_destinations *d;
+ for (d = host->destinations; d; d = d->next)
+ d->postpone_reconnection_until = 0;
}
+static struct {
+ STREAM_HANDSHAKE err;
+ const char *str;
+} handshake_errors[] = {
+ { STREAM_HANDSHAKE_OK_V5, "OK_V5" },
+ { STREAM_HANDSHAKE_OK_V4, "OK_V4" },
+ { STREAM_HANDSHAKE_OK_V3, "OK_V3" },
+ { STREAM_HANDSHAKE_OK_V2, "OK_V2" },
+ { STREAM_HANDSHAKE_OK_V1, "OK_V1" },
+ { STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, "BAD HANDSHAKE" },
+ { STREAM_HANDSHAKE_ERROR_LOCALHOST, "LOCALHOST" },
+ { STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, "ALREADY CONNECTED" },
+ { STREAM_HANDSHAKE_ERROR_DENIED, "DENIED" },
+ { STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT, "SEND TIMEOUT" },
+ { STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT, "RECEIVE TIMEOUT" },
+ { STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE, "INVALID CERTIFICATE" },
+ { STREAM_HANDSHAKE_ERROR_SSL_ERROR, "SSL ERROR" },
+ { STREAM_HANDSHAKE_ERROR_CANT_CONNECT, "CANT CONNECT" },
+ { STREAM_HANDSHAKE_BUSY_TRY_LATER, "BUSY TRY LATER" },
+ { STREAM_HANDSHAKE_INTERNAL_ERROR, "INTERNAL ERROR" },
+ { STREAM_HANDSHAKE_INITIALIZATION, "INITIALIZING" },
+ { 0, NULL },
+};
+
+const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error) {
+ for(size_t i = 0; handshake_errors[i].str ; i++) {
+ if(handshake_error == handshake_errors[i].err)
+ return handshake_errors[i].str;
+ }
+
+ return "";
+}
+
+static struct {
+ STREAM_CAPABILITIES cap;
+ const char *str;
+} capability_names[] = {
+ { STREAM_CAP_V1, "V1" },
+ { STREAM_CAP_V2, "V2" },
+ { STREAM_CAP_VN, "VN" },
+ { STREAM_CAP_VCAPS, "VCAPS" },
+ { STREAM_CAP_HLABELS, "HLABELS" },
+ { STREAM_CAP_CLAIM, "CLAIM" },
+ { STREAM_CAP_CLABELS, "CLABELS" },
+ { STREAM_CAP_COMPRESSION, "COMPRESSION" },
+ { STREAM_CAP_FUNCTIONS, "FUNCTIONS" },
+ { STREAM_CAP_REPLICATION, "REPLICATION" },
+ { STREAM_CAP_BINARY, "BINARY" },
+ { STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
+ { STREAM_CAP_IEEE754, "IEEE754" },
+ { 0 , NULL },
+};
+
static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
- if(caps & STREAM_CAP_V1) buffer_strcat(wb, "V1 ");
- if(caps & STREAM_CAP_V2) buffer_strcat(wb, "V2 ");
- if(caps & STREAM_CAP_VN) buffer_strcat(wb, "VN ");
- if(caps & STREAM_CAP_VCAPS) buffer_strcat(wb, "VCAPS ");
- if(caps & STREAM_CAP_HLABELS) buffer_strcat(wb, "HLABELS ");
- if(caps & STREAM_CAP_CLAIM) buffer_strcat(wb, "CLAIM ");
- if(caps & STREAM_CAP_CLABELS) buffer_strcat(wb, "CLABELS ");
- if(caps & STREAM_CAP_COMPRESSION) buffer_strcat(wb, "COMPRESSION ");
- if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS ");
- if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION ");
- if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY ");
- if(caps & STREAM_CAP_INTERPOLATED) buffer_strcat(wb, "INTERPOLATED ");
- if(caps & STREAM_CAP_IEEE754) buffer_strcat(wb, "IEEE754 ");
+ for(size_t i = 0; capability_names[i].str ; i++) {
+ if(caps & capability_names[i].cap) {
+ buffer_strcat(wb, capability_names[i].str);
+ buffer_strcat(wb, " ");
+ }
+ }
+}
+
+void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) {
+ buffer_json_member_add_array(wb, key);
+
+ for(size_t i = 0; capability_names[i].str ; i++) {
+ if(caps & capability_names[i].cap)
+ buffer_json_add_array_item_string(wb, capability_names[i].str);
+ }
+
+ buffer_json_array_close(wb);
}
void log_receiver_capabilities(struct receiver_state *rpt) {