diff options
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r-- | streaming/rrdpush.c | 186 |
1 files changed, 134 insertions, 52 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 62b537f0..c481871c 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -49,7 +49,6 @@ bool default_rrdpush_enable_replication = true; time_t default_rrdpush_seconds_to_replicate = 86400; time_t default_rrdpush_replication_step = 600; #ifdef ENABLE_HTTPS -int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL; char *netdata_ssl_ca_path = NULL; char *netdata_ssl_ca_file = NULL; #endif @@ -137,24 +136,10 @@ int rrdpush_init() { } #ifdef ENABLE_HTTPS - if (netdata_use_ssl_on_stream == NETDATA_SSL_OPTIONAL) { - if (default_rrdpush_destination){ - char *test = strstr(default_rrdpush_destination,":SSL"); - if(test){ - *test = 0X00; - netdata_use_ssl_on_stream = NETDATA_SSL_FORCE; - } - } - } + netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate); - bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO); - - if(invalid_certificate == CONFIG_BOOLEAN_YES){ - if(netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE){ - info("Netdata is configured to accept invalid SSL certificate."); - netdata_ssl_validate_server = NETDATA_SSL_INVALID_CERTIFICATE; - } - } + if(!netdata_ssl_validate_certificate_sender) + info("SSL: streaming senders will skip SSL certificates verification."); netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL); netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL); @@ -390,7 +375,7 @@ bool rrdset_push_chart_definition_now(RRDSET *st) { BUFFER *wb = sender_start(host->sender); rrdpush_send_chart_definition(wb, st); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); return true; @@ -458,7 +443,7 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); } - sender_commit(st->rrdhost->sender, rsb->wb); + sender_commit(st->rrdhost->sender, rsb->wb, STREAM_TRAFFIC_TYPE_DATA); *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; } @@ -498,7 +483,7 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock if(unlikely(!exposed_upstream)) { BUFFER *wb = sender_start(host->sender); replication_in_progress = rrdpush_send_chart_definition(wb, st); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); } if(replication_in_progress) @@ -529,7 +514,7 @@ void rrdpush_send_host_labels(RRDHOST *host) { rrdlabels_walkthrough_read(host->rrdlabels, send_labels_callback, wb); buffer_sprintf(wb, "OVERWRITE %s\n", "labels"); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); } @@ -548,7 +533,7 @@ void rrdpush_claimed_id(RRDHOST *host) buffer_sprintf(wb, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") ); rrdhost_aclk_state_unlock(host); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); } @@ -579,6 +564,7 @@ int connect_to_one_of_destinations( if (reconnects_counter) *reconnects_counter += 1; + d->last_attempt = now; sock = connect_to_this(string2str(d->destination), default_port, timeout); if (sock != -1) { @@ -610,6 +596,14 @@ bool destinations_init_add_one(char *entry, void *data) { struct destinations_init_tmp *t = data; struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations)); + char *colon_ssl = strstr(entry, ":SSL"); + if(colon_ssl) { + *colon_ssl = '\0'; + d->ssl = true; + } + else + d->ssl = false; + d->destination = string_strdupz(entry); __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED); @@ -712,7 +706,7 @@ int rrdpush_receiver_permission_denied(struct web_client *w) { // we always respond with the same message and error code // to prevent an attacker from gaining info about the error buffer_flush(w->response.data); - buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info."); + buffer_strcat(w->response.data, START_STREAMING_ERROR_NOT_PERMITTED); return HTTP_RESP_UNAUTHORIZED; } @@ -720,10 +714,35 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) { // we always respond with the same message and error code // to prevent an attacker from gaining info about the error buffer_flush(w->response.data); - buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later."); + buffer_strcat(w->response.data, START_STREAMING_ERROR_BUSY_TRY_LATER); return HTTP_RESP_SERVICE_UNAVAILABLE; } +static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struct receiver_state *rpt) { + rpt->fd = w->ifd; + +#ifdef ENABLE_HTTPS + rpt->ssl.conn = w->ssl.conn; + rpt->ssl.state = w->ssl.state; + + w->ssl = NETDATA_SSL_UNSET_CONNECTION; +#endif + + WEB_CLIENT_IS_DEAD(w); + + if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) { + web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET); + } + else { + if(w->ifd == w->ofd) + w->ifd = w->ofd = -1; + else + w->ifd = -1; + } + + buffer_flush(w->response.data); +} + void *rrdpush_receiver_thread(void *ptr); int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) { @@ -741,20 +760,16 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info)); rpt->system_info->hops = rpt->hops; - rpt->fd = w->ifd; + rpt->fd = -1; rpt->client_ip = strdupz(w->client_ip); rpt->client_port = strdupz(w->client_port); - rpt->config.update_every = default_rrd_update_every; - #ifdef ENABLE_HTTPS - rpt->ssl.conn = w->ssl.conn; - rpt->ssl.flags = w->ssl.flags; - - w->ssl.conn = NULL; - w->ssl.flags = NETDATA_SSL_START; + rpt->ssl = NETDATA_SSL_UNSET_CONNECTION; #endif + rpt->config.update_every = default_rrd_update_every; + // parse the parameters and fill rpt and rpt->system_info while(decoded_query_string) { @@ -1011,6 +1026,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { + rrdpush_receiver_takeover_web_connection(w, rpt); + rrdpush_receive_log_status( rpt, "machine GUID is my own", @@ -1032,9 +1049,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri ); } - close(rpt->fd); receiver_state_free(rpt); - return web_client_socket_is_now_used_for_streaming(w); + return HTTP_RESP_OK; } if(unlikely(web_client_streaming_rate_t > 0)) { @@ -1130,7 +1146,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up buffer_flush(w->response.data); - buffer_strcat(w->response.data, "This GUID is already streaming to this server"); + buffer_strcat(w->response.data, START_STREAMING_ERROR_ALREADY_STREAMING); receiver_state_free(rpt); return HTTP_RESP_CONFLICT; } @@ -1138,8 +1154,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri debug(D_SYSTEM, "starting STREAM receive thread."); - char tag[FILENAME_MAX + 1]; - snprintfz(tag, FILENAME_MAX, THREAD_TAG_STREAM_RECEIVER "[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port); + rrdpush_receiver_takeover_web_connection(w, rpt); + + char tag[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_RECEIVER "[%s]", rpt->hostname); + tag[NETDATA_THREAD_TAG_MAX] = '\0'; if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) { rrdpush_receive_log_status( @@ -1154,23 +1173,86 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } // prevent the caller from closing the streaming socket - return web_client_socket_is_now_used_for_streaming(w); + return HTTP_RESP_OK; +} + +void rrdpush_reset_destinations_postpone_time(RRDHOST *host) { + struct rrdpush_destinations *d; + for (d = host->destinations; d; d = d->next) + d->postpone_reconnection_until = 0; } +static struct { + STREAM_HANDSHAKE err; + const char *str; +} handshake_errors[] = { + { STREAM_HANDSHAKE_OK_V5, "OK_V5" }, + { STREAM_HANDSHAKE_OK_V4, "OK_V4" }, + { STREAM_HANDSHAKE_OK_V3, "OK_V3" }, + { STREAM_HANDSHAKE_OK_V2, "OK_V2" }, + { STREAM_HANDSHAKE_OK_V1, "OK_V1" }, + { STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, "BAD HANDSHAKE" }, + { STREAM_HANDSHAKE_ERROR_LOCALHOST, "LOCALHOST" }, + { STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, "ALREADY CONNECTED" }, + { STREAM_HANDSHAKE_ERROR_DENIED, "DENIED" }, + { STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT, "SEND TIMEOUT" }, + { STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT, "RECEIVE TIMEOUT" }, + { STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE, "INVALID CERTIFICATE" }, + { STREAM_HANDSHAKE_ERROR_SSL_ERROR, "SSL ERROR" }, + { STREAM_HANDSHAKE_ERROR_CANT_CONNECT, "CANT CONNECT" }, + { STREAM_HANDSHAKE_BUSY_TRY_LATER, "BUSY TRY LATER" }, + { STREAM_HANDSHAKE_INTERNAL_ERROR, "INTERNAL ERROR" }, + { STREAM_HANDSHAKE_INITIALIZATION, "INITIALIZING" }, + { 0, NULL }, +}; + +const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error) { + for(size_t i = 0; handshake_errors[i].str ; i++) { + if(handshake_error == handshake_errors[i].err) + return handshake_errors[i].str; + } + + return ""; +} + +static struct { + STREAM_CAPABILITIES cap; + const char *str; +} capability_names[] = { + { STREAM_CAP_V1, "V1" }, + { STREAM_CAP_V2, "V2" }, + { STREAM_CAP_VN, "VN" }, + { STREAM_CAP_VCAPS, "VCAPS" }, + { STREAM_CAP_HLABELS, "HLABELS" }, + { STREAM_CAP_CLAIM, "CLAIM" }, + { STREAM_CAP_CLABELS, "CLABELS" }, + { STREAM_CAP_COMPRESSION, "COMPRESSION" }, + { STREAM_CAP_FUNCTIONS, "FUNCTIONS" }, + { STREAM_CAP_REPLICATION, "REPLICATION" }, + { STREAM_CAP_BINARY, "BINARY" }, + { STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, + { STREAM_CAP_IEEE754, "IEEE754" }, + { 0 , NULL }, +}; + static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) { - if(caps & STREAM_CAP_V1) buffer_strcat(wb, "V1 "); - if(caps & STREAM_CAP_V2) buffer_strcat(wb, "V2 "); - if(caps & STREAM_CAP_VN) buffer_strcat(wb, "VN "); - if(caps & STREAM_CAP_VCAPS) buffer_strcat(wb, "VCAPS "); - if(caps & STREAM_CAP_HLABELS) buffer_strcat(wb, "HLABELS "); - if(caps & STREAM_CAP_CLAIM) buffer_strcat(wb, "CLAIM "); - if(caps & STREAM_CAP_CLABELS) buffer_strcat(wb, "CLABELS "); - if(caps & STREAM_CAP_COMPRESSION) buffer_strcat(wb, "COMPRESSION "); - if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS "); - if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION "); - if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY "); - if(caps & STREAM_CAP_INTERPOLATED) buffer_strcat(wb, "INTERPOLATED "); - if(caps & STREAM_CAP_IEEE754) buffer_strcat(wb, "IEEE754 "); + for(size_t i = 0; capability_names[i].str ; i++) { + if(caps & capability_names[i].cap) { + buffer_strcat(wb, capability_names[i].str); + buffer_strcat(wb, " "); + } + } +} + +void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) { + buffer_json_member_add_array(wb, key); + + for(size_t i = 0; capability_names[i].str ; i++) { + if(caps & capability_names[i].cap) + buffer_json_add_array_item_string(wb, capability_names[i].str); + } + + buffer_json_array_close(wb); } void log_receiver_capabilities(struct receiver_state *rpt) { |