diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/README.md | 14 | ||||
-rw-r--r-- | streaming/compression.c | 2 | ||||
-rw-r--r-- | streaming/receiver.c | 627 | ||||
-rw-r--r-- | streaming/replication.c | 1104 | ||||
-rw-r--r-- | streaming/replication.h | 5 | ||||
-rw-r--r-- | streaming/rrdpush.c | 619 | ||||
-rw-r--r-- | streaming/rrdpush.h | 66 | ||||
-rw-r--r-- | streaming/sender.c | 275 |
8 files changed, 1741 insertions, 971 deletions
diff --git a/streaming/README.md b/streaming/README.md index 58eb2cc1..37d2c261 100644 --- a/streaming/README.md +++ b/streaming/README.md @@ -7,7 +7,7 @@ custom_edit_url: https://github.com/netdata/netdata/edit/master/streaming/README Each Netdata node is able to replicate/mirror its database to another Netdata node, by streaming the collected metrics in real-time. This is quite different to [data archiving to third party time-series -databases](/exporting/README.md). +databases](https://github.com/netdata/netdata/blob/master/exporting/README.md). The nodes that send metrics are called **child** nodes, and the nodes that receive metrics are called **parent** nodes. There are also **proxy** nodes, which collect metrics from a child and sends it to a parent. @@ -38,7 +38,7 @@ In a headless setup, the child acts as a plain data collector. It spawns all ext local database and accepting dashboard requests, it streams all metrics to the parent. This setup works great to reduce the memory footprint. Depending on the enabled plugins, memory usage is between 6 MiB and 40 MiB. To reduce the memory usage as much as -possible, refer to the [performance optimization guide](/docs/guides/configure/performance.md). +possible, refer to the [performance optimization guide](https://github.com/netdata/netdata/blob/master/docs/guides/configure/performance.md). ### Database Replication @@ -107,7 +107,7 @@ This also disables the registry (there cannot be a registry without an API). requests from its child nodes. 0 sets no limit, 1 means maximum once every second. If this is set, you may see error log entries "... too busy to accept new streaming request. Will be allowed in X secs". -You can [use](/exporting/README.md#configuration) the exporting engine to configure data archiving to an external database (it archives all databases maintained on +You can [use](https://github.com/netdata/netdata/blob/master/exporting/README.md#configuration) the exporting engine to configure data archiving to an external database (it archives all databases maintained on this host). ### Streaming configuration @@ -198,7 +198,7 @@ You can also use `default memory mode = dbengine` for an API key or `memory mode ##### Allow from -`allow from` settings are [Netdata simple patterns](/libnetdata/simple_pattern/README.md): string matches +`allow from` settings are [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md): string matches that use `*` as wildcard (any number of times) and a `!` prefix for a negative match. So: `allow from = !10.1.2.3 10.*` will allow all IPs in `10.*` except `10.1.2.3`. The order is important: left to right, the first positive or negative match is used. @@ -233,7 +233,7 @@ For Netdata v1.9+, streaming can also be monitored via `access.log`. ### Securing streaming communications Netdata does not activate TLS encryption by default. To encrypt streaming connections: -1. On the parent node (receiving node), [enable TLS support](/web/server/README.md#enabling-tls-support). +1. On the parent node (receiving node), [enable TLS support](https://github.com/netdata/netdata/blob/master/web/server/README.md#enabling-tls-support). 2. On the child's `stream.conf`, configure the destination as follows: ``` @@ -602,7 +602,7 @@ this writing, Netdata supports: - json document DBs - all the compatibles to the above (e.g. kairosdb, influxdb, etc) -Check the Netdata [exporting documentation](/docs/export/external-databases.md) for configuring this. +Check the Netdata [exporting documentation](https://github.com/netdata/netdata/blob/master/docs/export/external-databases.md) for configuring this. This is how such a solution will work: @@ -696,7 +696,7 @@ ERROR : STREAM_SENDER[CHILD HOSTNAME] : STREAM child HOSTNAME [send to PARENT HO Chart data needs to be consistent between child and parent nodes. If there are differences between chart data on a parent and a child, such as gaps in metrics collection, it most often means your child's `memory mode` does not match the parent's. To learn more about the different ways Netdata can store metrics, and thus keep chart -data consistent, read our [memory mode documentation](/database/README.md). +data consistent, read our [memory mode documentation](https://github.com/netdata/netdata/blob/master/database/README.md). ### Forbidding access diff --git a/streaming/compression.c b/streaming/compression.c index 7ba9dbf1..8f2517a8 100644 --- a/streaming/compression.c +++ b/streaming/compression.c @@ -244,7 +244,7 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state, cons , state->stream->size , state->stream->write_at , decompressed_size - , state->stream->write_at + decompressed_size - state->stream->size + , (size_t)(state->stream->write_at + decompressed_size - state->stream->size) ); state->stream->write_at += decompressed_size; diff --git a/streaming/receiver.c b/streaming/receiver.c index 61ee33bc..95652942 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -16,7 +16,8 @@ extern struct config stream_config; -void destroy_receiver_state(struct receiver_state *rpt) { +void receiver_state_free(struct receiver_state *rpt) { + freez(rpt->key); freez(rpt->hostname); freez(rpt->registry_hostname); @@ -29,43 +30,23 @@ void destroy_receiver_state(struct receiver_state *rpt) { freez(rpt->client_port); freez(rpt->program_name); freez(rpt->program_version); + #ifdef ENABLE_HTTPS - if(rpt->ssl.conn){ + if(rpt->ssl.conn) SSL_free(rpt->ssl.conn); - } #endif + #ifdef ENABLE_COMPRESSION if (rpt->decompressor) rpt->decompressor->destroy(&rpt->decompressor); #endif - freez(rpt); -} - -static void rrdpush_receiver_thread_cleanup(void *ptr) { - worker_unregister(); - static __thread int executed = 0; - if(!executed) { - executed = 1; - struct receiver_state *rpt = (struct receiver_state *) ptr; - // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch - // the host pointer as it is unpredictable when the RRDHOST is deleted. Do the cleanup from rrdhost_free(). - if (netdata_exit && rpt->host) { - rpt->exited = 1; - return; - } + if(rpt->system_info) + rrdhost_system_info_free(rpt->system_info); - // Make sure that we detach this thread and don't kill a freshly arriving receiver - if (!netdata_exit && rpt->host) { - netdata_mutex_lock(&rpt->host->receiver_lock); - if (rpt->host->receiver == rpt) - rpt->host->receiver = NULL; - netdata_mutex_unlock(&rpt->host->receiver_lock); - } + __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); - info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); - destroy_receiver_state(rpt); - } + freez(rpt); } #include "collectors/plugins.d/pluginsd_parser.h" @@ -105,11 +86,10 @@ PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user) if (host->aclk_state.claimed_id) freez(host->aclk_state.claimed_id); host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; - - metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL); - rrdhost_aclk_state_unlock(host); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE); + rrdpush_claimed_id(host); return PARSER_RC_OK; @@ -350,11 +330,13 @@ static void streaming_parser_thread_cleanup(void *ptr) { parser_destroy(parser); } +bool plugin_is_enabled(struct plugind *cd); + static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) { size_t result; PARSER_USER_OBJECT user = { - .enabled = cd->enabled, + .enabled = plugin_is_enabled(cd), .host = rpt->host, .opaque = rpt, .cd = cd, @@ -390,39 +372,50 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i size_t read_buffer_start = 0; char buffer[PLUGINSD_LINE_MAX + 2] = ""; - while(!netdata_exit) { + while(service_running(SERVICE_STREAMING)) { + netdata_thread_testcancel(); + if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) { bool have_new_data; - if(compressed_connection) + if(likely(compressed_connection)) have_new_data = receiver_read_compressed(rpt); else have_new_data = receiver_read_uncompressed(rpt); - if(!have_new_data) + if(unlikely(!have_new_data)) { + if(!rpt->exit.reason) + rpt->exit.reason = "SOCKET READ ERROR"; + break; + } rpt->last_msg_t = now_realtime_sec(); continue; } - if(unlikely(netdata_exit)) { - internal_error(true, "exiting..."); + if(unlikely(!service_running(SERVICE_STREAMING))) { + if(!rpt->exit.reason) + rpt->exit.reason = "NETDATA EXIT"; goto done; } - if(unlikely(rpt->shutdown)) { - internal_error(true, "parser shutdown..."); + if(unlikely(rpt->exit.shutdown)) { + if(!rpt->exit.reason) + rpt->exit.reason = "SHUTDOWN REQUESTED"; + goto done; } if (unlikely(parser_action(parser, buffer))) { internal_error(true, "parser_action() failed on keyword '%s'.", buffer); + + if(!rpt->exit.reason) + rpt->exit.reason = "PARSER FAILED"; + break; } } done: - internal_error(true, "Streaming receiver thread stopping..."); - result = user.count; // free parser with the pop function @@ -431,103 +424,240 @@ done: return result; } -static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) { +static void rrdpush_receiver_replication_reset(RRDHOST *host) { RRDSET *st; - rrdset_foreach_read(st, rpt->host) { + rrdset_foreach_read(st, host) { rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); } rrdset_foreach_done(st); - rrdhost_receiver_replicating_charts_zero(rpt->host); + rrdhost_receiver_replicating_charts_zero(host); +} + +bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { + bool signal_rrdcontext = false; + bool set_this = false; + + netdata_mutex_lock(&host->receiver_lock); + + if (!host->receiver || host->receiver == rpt) { + rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); + + host->receiver = rpt; + rpt->host = host; + + host->child_connect_time = now_realtime_sec(); + host->child_disconnected_time = 0; + host->child_last_chart_command = 0; + host->trigger_chart_obsoletion_check = 1; + + if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) { + if (rpt->config.alarms_delay > 0) { + host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay; + log_health( + "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", + rrdhost_hostname(host), + (int64_t) rpt->config.alarms_delay); + } + } + +// this is a test +// if(rpt->hops <= host->sender->hops) +// rrdpush_sender_thread_stop(host, "HOPS MISMATCH", false); + + signal_rrdcontext = true; + rrdpush_receiver_replication_reset(host); + + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + + set_this = true; + } + + netdata_mutex_unlock(&host->receiver_lock); + + if(signal_rrdcontext) + rrdcontext_host_child_connected(host); + + return set_this; +} + +static void rrdhost_clear_receiver(struct receiver_state *rpt) { + bool signal_rrdcontext = false; + + RRDHOST *host = rpt->host; + if(host) { + netdata_mutex_lock(&host->receiver_lock); + + // Make sure that we detach this thread and don't kill a freshly arriving receiver + if(host->receiver == rpt) { + host->trigger_chart_obsoletion_check = 0; + host->child_connect_time = 0; + host->child_disconnected_time = now_realtime_sec(); + + if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO) + host->health.health_enabled = 0; + + rrdpush_sender_thread_stop(host, "RECEIVER LEFT", false); + + signal_rrdcontext = true; + rrdpush_receiver_replication_reset(host); + + if (host->receiver == rpt) + host->receiver = NULL; + + rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN); + } + + netdata_mutex_unlock(&host->receiver_lock); + + if(signal_rrdcontext) + rrdcontext_host_child_disconnected(host); + } +} + +bool stop_streaming_receiver(RRDHOST *host, const char *reason) { + bool ret = false; + + netdata_mutex_lock(&host->receiver_lock); + + if(host->receiver) { + if(!host->receiver->exit.shutdown) { + host->receiver->exit.shutdown = true; + host->receiver->exit.reason = reason; + shutdown(host->receiver->fd, SHUT_RDWR); + } + + netdata_thread_cancel(host->receiver->thread); + } + + int count = 2000; + while (host->receiver && count-- > 0) { + netdata_mutex_unlock(&host->receiver_lock); + + // let the lock for the receiver thread to exit + sleep_usec(1 * USEC_PER_MS); + + netdata_mutex_lock(&host->receiver_lock); + } + + if(host->receiver) + error("STREAM '%s' [receive from [%s]:%s]: " + "thread %d takes too long to stop, giving up..." + , rrdhost_hostname(host) + , host->receiver->client_ip, host->receiver->client_port + , gettid()); + else + ret = true; + + netdata_mutex_unlock(&host->receiver_lock); + + return ret; +} + +void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) { + + log_stream_connection(rpt->client_ip, rpt->client_port, + (rpt->key && *rpt->key)? rpt->key : "-", + (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-", + (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-", + status); + + info("STREAM '%s' [receive from [%s]:%s]: " + "%s. " + "STATUS: %s%s%s%s" + , rpt->hostname + , rpt->client_ip, rpt->client_port + , msg + , status + , rpt->exit.reason?" (":"" + , rpt->exit.reason?rpt->exit.reason:"" + , rpt->exit.reason?")":"" + ); + +} + +static void rrdhost_reset_destinations(RRDHOST *host) { + for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) + d->postpone_reconnection_until = 0; } static int rrdpush_receive(struct receiver_state *rpt) { - int history = default_rrd_history_entries; - RRD_MEMORY_MODE mode = default_rrd_memory_mode; - int health_enabled = default_health_enabled; - int rrdpush_enabled = default_rrdpush_enabled; - char *rrdpush_destination = default_rrdpush_destination; - char *rrdpush_api_key = default_rrdpush_api_key; - char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; - bool rrdpush_enable_replication = default_rrdpush_enable_replication; - time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate; - time_t rrdpush_replication_step = default_rrdpush_replication_step; - time_t alarms_delay = 60; - - rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every); - if(rpt->update_every < 0) rpt->update_every = 1; - - history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", history); - history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", history); - if(history < 5) history = 5; - - mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode))); - mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode))); - - if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { - error("STREAM %s [receive from %s:%s]: dbengine is not enabled, falling back to default.", rpt->hostname, rpt->client_ip, rpt->client_port); - mode = default_rrd_memory_mode; + rpt->config.mode = default_rrd_memory_mode; + rpt->config.history = default_rrd_history_entries; + + rpt->config.health_enabled = (int)default_health_enabled; + rpt->config.alarms_delay = 60; + + rpt->config.rrdpush_enabled = (int)default_rrdpush_enabled; + rpt->config.rrdpush_destination = default_rrdpush_destination; + rpt->config.rrdpush_api_key = default_rrdpush_api_key; + rpt->config.rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; + + rpt->config.rrdpush_enable_replication = default_rrdpush_enable_replication; + rpt->config.rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate; + rpt->config.rrdpush_replication_step = default_rrdpush_replication_step; + + rpt->config.update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->config.update_every); + if(rpt->config.update_every < 0) rpt->config.update_every = 1; + + rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", rpt->config.history); + rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", rpt->config.history); + if(rpt->config.history < 5) rpt->config.history = 5; + + rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(rpt->config.mode))); + rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode))); + + if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { + error("STREAM '%s' [receive from %s:%s]: " + "dbengine is not enabled, falling back to default." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); + + rpt->config.mode = default_rrd_memory_mode; } - health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled); - health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled); + rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", rpt->config.health_enabled); + rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", rpt->config.health_enabled); - alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", alarms_delay); - alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", alarms_delay); + rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", rpt->config.alarms_delay); + rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", rpt->config.alarms_delay); - rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled); - rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rrdpush_enabled); + rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rpt->config.rrdpush_enabled); + rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rpt->config.rrdpush_enabled); - rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rrdpush_destination); - rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rrdpush_destination); + rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rpt->config.rrdpush_destination); + rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rpt->config.rrdpush_destination); - rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rrdpush_api_key); - rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rrdpush_api_key); + rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rpt->config.rrdpush_api_key); + rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rpt->config.rrdpush_api_key); - rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching); - rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching); + rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rpt->config.rrdpush_send_charts_matching); + rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rpt->config.rrdpush_send_charts_matching); - rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication); - rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication); + rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rpt->config.rrdpush_enable_replication); + rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rpt->config.rrdpush_enable_replication); - rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate); - rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate); + rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate); + rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate); - rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step); - rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step); + rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step); + rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step); #ifdef ENABLE_COMPRESSION - unsigned int rrdpush_compression = default_compression_enabled; - rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression); - rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression); - rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled); + rpt->config.rrdpush_compression = default_compression_enabled; + rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression); + rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression); + rpt->rrdpush_compression = (rpt->config.rrdpush_compression && default_compression_enabled); #endif //ENABLE_COMPRESSION (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); - if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT"); - error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child, or is this an inter-agent loop?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid); - char initial_response[HTTP_HEADER_SIZE + 1]; - snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); -#ifdef ENABLE_HTTPS - if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { -#else - if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { -#endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - close(rpt->fd); - return 0; - } - close(rpt->fd); - return 0; - } - - if (rpt->host==NULL) { - - rpt->host = rrdhost_find_or_create( + // find the host for this receiver + { + // this will also update the host with our system_info + RRDHOST *host = rrdhost_find_or_create( rpt->hostname , rpt->registry_hostname , rpt->machine_guid @@ -538,76 +668,41 @@ static int rrdpush_receive(struct receiver_state *rpt) , rpt->tags , rpt->program_name , rpt->program_version - , rpt->update_every - , history - , mode - , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO) - , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) - , rrdpush_destination - , rrdpush_api_key - , rrdpush_send_charts_matching - , rrdpush_enable_replication - , rrdpush_seconds_to_replicate - , rrdpush_replication_step + , rpt->config.update_every + , rpt->config.history + , rpt->config.mode + , (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO) + , (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && *rpt->config.rrdpush_api_key) + , rpt->config.rrdpush_destination + , rpt->config.rrdpush_api_key + , rpt->config.rrdpush_send_charts_matching + , rpt->config.rrdpush_enable_replication + , rpt->config.rrdpush_seconds_to_replicate + , rpt->config.rrdpush_replication_step , rpt->system_info , 0 ); - if(!rpt->host) { + if(!host) { + rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION"); close(rpt->fd); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - CANNOT ACQUIRE HOST"); - error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", rpt->hostname, rpt->client_ip, rpt->client_port); return 1; } - netdata_mutex_lock(&rpt->host->receiver_lock); - if (rpt->host->receiver == NULL) - rpt->host->receiver = rpt; - else { - error("Multiple receivers connected for %s concurrently, cancelling this one...", rpt->machine_guid); - netdata_mutex_unlock(&rpt->host->receiver_lock); + // system_info has been consumed by the host structure + rpt->system_info = NULL; + + if(!rrdhost_set_receiver(host, rpt)) { + rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION"); close(rpt->fd); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - BEATEN TO HOST CREATION"); return 1; } - netdata_mutex_unlock(&rpt->host->receiver_lock); - } - else { - rrd_wrlock(); - rrdhost_update( - rpt->host, - rpt->hostname, - rpt->registry_hostname, - rpt->machine_guid, - rpt->os, - rpt->timezone, - rpt->abbrev_timezone, - rpt->utc_offset, - rpt->tags, - rpt->program_name, - rpt->program_version, - rpt->update_every, - history, - mode, - (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO), - (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key), - rrdpush_destination, - rrdpush_api_key, - rrdpush_send_charts_matching, - rrdpush_enable_replication, - rrdpush_seconds_to_replicate, - rrdpush_replication_step, - rpt->system_info); - rrd_unlock(); } #ifdef NETDATA_INTERNAL_CHECKS - int ssl = 0; -#ifdef ENABLE_HTTPS - if (rpt->ssl.conn != NULL) - ssl = 1; -#endif - info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'" + info("STREAM '%s' [receive from [%s]:%s]: " + "client willing to stream metrics for host '%s' with machine_guid '%s': " + "update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'" , rpt->hostname , rpt->client_ip , rpt->client_port @@ -616,20 +711,26 @@ static int rrdpush_receive(struct receiver_state *rpt) , rpt->host->rrd_update_every , rpt->host->rrd_history_entries , rrd_memory_mode_name(rpt->host->rrd_memory_mode) - , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") - , ssl ? " SSL," : "" + , (rpt->config.health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((rpt->config.health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") +#ifdef ENABLE_HTTPS + , (rpt->ssl.conn != NULL) ? " SSL," : "" +#else + , "" +#endif , rrdhost_tags(rpt->host) ); #endif // NETDATA_INTERNAL_CHECKS struct plugind cd = { - .enabled = 1, .update_every = default_rrd_update_every, - .pid = 0, .serial_failures = 0, .successful_collections = 0, - .obsolete = 0, + .unsafe = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .running = true, + .enabled = true, + }, .started_t = now_realtime_sec(), .next = NULL, .capabilities = 0, @@ -648,76 +749,60 @@ static int rrdpush_receive(struct receiver_state *rpt) } #endif - info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - char initial_response[HTTP_HEADER_SIZE]; - if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) { - log_receiver_capabilities(rpt); - sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities); - } - else if (stream_has_capability(rpt, STREAM_CAP_VN)) { - log_receiver_capabilities(rpt); - sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities)); - } else if (stream_has_capability(rpt, STREAM_CAP_V2)) { - log_receiver_capabilities(rpt); - sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); - } else { // stream_has_capability(rpt, STREAM_CAP_V1) - log_receiver_capabilities(rpt); - sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1); - } - debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); + { + // info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + char initial_response[HTTP_HEADER_SIZE]; + if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities); + } + else if (stream_has_capability(rpt, STREAM_CAP_VN)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities)); + } + else if (stream_has_capability(rpt, STREAM_CAP_V2)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); + } + else { // stream_has_capability(rpt, STREAM_CAP_V1) + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1); + } + + debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); + if(send_timeout( #ifdef ENABLE_HTTPS - if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { -#else - if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { + &rpt->ssl, #endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - close(rpt->fd); - return 0; - } + rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { - // remove the non-blocking flag from the socket - if(sock_delnonblock(rpt->fd) < 0) - error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); - - struct timeval timeout; - timeout.tv_sec = 600; - timeout.tv_usec = 0; - if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) - error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); - - rrdhost_wrlock(rpt->host); -/* if(rpt->host->connected_senders > 0) { - rrdhost_unlock(rpt->host); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "REJECTED - ALREADY CONNECTED"); - info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", rpt->host->hostname, rpt->client_ip, rpt->client_port); - fclose(fp); - return 0; - } -*/ - -// rpt->host->connected_senders++; - if(health_enabled != CONFIG_BOOLEAN_NO) { - if(alarms_delay > 0) { - rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay; - log_health( - "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", - rrdhost_hostname(rpt->host), - (int64_t)alarms_delay); + rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION"); + close(rpt->fd); + return 0; } } - rpt->host->senders_connect_time = now_realtime_sec(); - rpt->host->senders_last_chart_command = 0; - rpt->host->trigger_chart_obsoletion_check = 1; - rrdhost_unlock(rpt->host); + { + // remove the non-blocking flag from the socket + if(sock_delnonblock(rpt->fd) < 0) + error("STREAM '%s' [receive from [%s]:%s]: " + "cannot remove the non-blocking flag from socket %d" + , rrdhost_hostname(rpt->host) + , rpt->client_ip, rpt->client_port + , rpt->fd); - // call the plugins.d processor to receive the metrics - info("STREAM %s [receive from [%s]:%s]: receiving metrics...", - rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + struct timeval timeout; + timeout.tv_sec = 600; + timeout.tv_usec = 0; + if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) + error("STREAM '%s' [receive from [%s]:%s]: " + "cannot set timeout for socket %d" + , rrdhost_hostname(rpt->host) + , rpt->client_ip, rpt->client_port + , rpt->fd); + } - log_stream_connection(rpt->client_ip, rpt->client_port, - rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED"); + rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED"); cd.capabilities = rpt->capabilities; @@ -728,12 +813,10 @@ static int rrdpush_receive(struct receiver_state *rpt) aclk_host_state_update(rpt->host, 1); #endif - rrdhost_set_is_parent_label(++localhost->senders_count); + rrdhost_set_is_parent_label(++localhost->connected_children_count); - rrdpush_receiver_replication_reset(rpt); - rrdcontext_host_child_connected(rpt->host); - - rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + // let it reconnect to parent immediately + rrdhost_reset_destinations(rpt->host); size_t count = streaming_parser(rpt, &cd, rpt->fd, #ifdef ENABLE_HTTPS @@ -745,15 +828,14 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); - log_stream_connection(rpt->client_ip, rpt->client_port, - rpt->key, rpt->host->machine_guid, rpt->hostname, - "DISCONNECTED"); + if(!rpt->exit.reason) + rpt->exit.reason = "PARSER EXIT"; - error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", - rpt->hostname, rpt->client_ip, rpt->client_port, count); - - rrdcontext_host_child_disconnected(rpt->host); - rrdpush_receiver_replication_reset(rpt); + { + char msg[100 + 1]; + snprintfz(msg, 100, "disconnected (completed %zu updates)", count); + rrdpush_receive_log_status(rpt, msg, "DISCONNECTED"); + } #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud @@ -762,48 +844,41 @@ static int rrdpush_receive(struct receiver_state *rpt) aclk_host_state_update(rpt->host, 0); #endif - rrdhost_set_is_parent_label(--localhost->senders_count); - - // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread - if (!netdata_exit && rpt->host) { - rrd_rdlock(); - rrdhost_wrlock(rpt->host); - netdata_mutex_lock(&rpt->host->receiver_lock); - if (rpt->host->receiver == rpt) { - rpt->host->senders_connect_time = 0; - rpt->host->trigger_chart_obsoletion_check = 0; - rpt->host->senders_disconnected_time = now_realtime_sec(); - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN); - if(health_enabled == CONFIG_BOOLEAN_AUTO) - rpt->host->health_enabled = 0; - } - rrdhost_unlock(rpt->host); - if (rpt->host->receiver == rpt) { - rrdpush_sender_thread_stop(rpt->host); - } - netdata_mutex_unlock(&rpt->host->receiver_lock); - rrd_unlock(); - } + rrdhost_set_is_parent_label(--localhost->connected_children_count); // cleanup close(rpt->fd); return (int)count; } +static void rrdpush_receiver_thread_cleanup(void *ptr) { + struct receiver_state *rpt = (struct receiver_state *) ptr; + worker_unregister(); + + rrdhost_clear_receiver(rpt); + + info("STREAM '%s' [receive from [%s]:%s]: " + "receive thread ended (task id %d)" + , rpt->hostname ? rpt->hostname : "-" + , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-" + , gettid()); + + receiver_state_free(rpt); +} + void *rrdpush_receiver_thread(void *ptr) { netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr); - struct receiver_state *rpt = (struct receiver_state *)ptr; - info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); - worker_register("STREAMRCV"); worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT); worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT); worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE); + + struct receiver_state *rpt = (struct receiver_state *)ptr; + info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); + rrdpush_receive(rpt); - worker_unregister(); netdata_thread_cleanup_pop(1); return NULL; } - diff --git a/streaming/replication.c b/streaming/replication.c index d659d701..7c1f16b4 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -3,28 +3,30 @@ #include "replication.h" #include "Judy.h" -#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 -#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 -#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 +#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL +#define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL +#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL +#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL #define WORKER_JOB_FIND_NEXT 1 #define WORKER_JOB_QUERYING 2 #define WORKER_JOB_DELETE_ENTRY 3 #define WORKER_JOB_FIND_CHART 4 -#define WORKER_JOB_CHECK_CONSISTENCY 5 -#define WORKER_JOB_BUFFER_COMMIT 6 -#define WORKER_JOB_CLEANUP 7 -#define WORKER_JOB_WAIT 8 +#define WORKER_JOB_PREPARE_QUERY 5 +#define WORKER_JOB_CHECK_CONSISTENCY 6 +#define WORKER_JOB_BUFFER_COMMIT 7 +#define WORKER_JOB_CLEANUP 8 +#define WORKER_JOB_WAIT 9 // master thread worker jobs -#define WORKER_JOB_STATISTICS 9 -#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 10 -#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 11 -#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 12 -#define WORKER_JOB_CUSTOM_METRIC_ADDED 13 -#define WORKER_JOB_CUSTOM_METRIC_DONE 14 -#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 15 -#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 16 +#define WORKER_JOB_STATISTICS 10 +#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12 +#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13 +#define WORKER_JOB_CUSTOM_METRIC_ADDED 14 +#define WORKER_JOB_CUSTOM_METRIC_DONE 15 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17 #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30 #define SECONDS_TO_RESET_POINT_IN_TIME 10 @@ -44,6 +46,12 @@ struct replication_query_statistics replication_get_query_statistics(void) { return ret; } +size_t replication_buffers_allocated = 0; + +size_t replication_allocated_buffers(void) { + return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED); +} + // ---------------------------------------------------------------------------- // sending replication replies @@ -51,137 +59,400 @@ struct replication_dimension { STORAGE_POINT sp; struct storage_engine_query_handle handle; bool enabled; + bool skip; DICTIONARY *dict; const DICTIONARY_ITEM *rda; RRDDIM *rd; }; -static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) { +struct replication_query { + RRDSET *st; + + struct { + time_t first_entry_t; + time_t last_entry_t; + } db; + + struct { // what the parent requested + time_t after; + time_t before; + bool enable_streaming; + } request; + + struct { // what the child will do + time_t after; + time_t before; + bool enable_streaming; + + bool locked_data_collection; + bool execute; + bool interrupted; + } query; + + time_t wall_clock_time; + + size_t points_read; + size_t points_generated; + + struct storage_engine_query_ops *ops; + struct replication_request *rq; + + size_t dimensions; + struct replication_dimension data[]; +}; + +static struct replication_query *replication_query_prepare( + RRDSET *st, + time_t db_first_entry, + time_t db_last_entry, + time_t requested_after, + time_t requested_before, + bool requested_enable_streaming, + time_t query_after, + time_t query_before, + bool query_enable_streaming, + time_t wall_clock_time +) { size_t dimensions = rrdset_number_of_dimensions(st); - size_t points_read = 0, points_generated = 0; + struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension)); + __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); - struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops; - struct replication_dimension data[dimensions]; - memset(data, 0, sizeof(data)); + q->dimensions = dimensions; + q->st = st; - if(enable_streaming && st->last_updated.tv_sec > before) { - internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)before, - (unsigned long long)st->last_updated.tv_sec - ); - before = st->last_updated.tv_sec; + q->db.first_entry_t = db_first_entry; + q->db.last_entry_t = db_last_entry; + + q->request.after = requested_after, + q->request.before = requested_before, + q->request.enable_streaming = requested_enable_streaming, + + q->query.after = query_after; + q->query.before = query_before; + q->query.enable_streaming = query_enable_streaming; + + q->wall_clock_time = wall_clock_time; + + if (!q->dimensions || !q->query.after || !q->query.before) { + q->query.execute = false; + q->dimensions = 0; + return q; + } + + if(q->query.enable_streaming) { + netdata_spinlock_lock(&st->data_collection_lock); + q->query.locked_data_collection = true; + + if (st->last_updated.tv_sec > q->query.before) { +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, + "STREAM_SENDER REPLAY: 'host:%s/chart:%s' " + "has start_streaming = true, " + "adjusting replication before timestamp from %llu to %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long) q->query.before, + (unsigned long long) st->last_updated.tv_sec + ); +#endif + q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time); + } } + q->ops = &st->rrdhost->db[0].eng->api.query_ops; + // prepare our array of dimensions - { - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - if(unlikely(!rd || !rd_dfe.item || !rd->exposed)) - continue; + size_t count = 0; + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if (unlikely(!rd || !rd_dfe.item || !rd->exposed)) + continue; - if (unlikely(rd_dfe.counter >= dimensions)) { - internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); - break; - } + if (unlikely(rd_dfe.counter >= q->dimensions)) { + internal_error(true, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + break; + } + + struct replication_dimension *d = &q->data[rd_dfe.counter]; + + d->dict = rd_dfe.dict; + d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); + d->rd = rd; - struct replication_dimension *d = &data[rd_dfe.counter]; + q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before, + q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW); + d->enabled = true; + d->skip = false; + count++; + } + rrddim_foreach_done(rd); + + if(!count) { + // no data for this chart - d->dict = rd_dfe.dict; - d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); - d->rd = rd; + q->query.execute = false; - ops->init(rd->tiers[0]->db_metric_handle, &d->handle, after, before); - d->enabled = true; + if(q->query.locked_data_collection) { + netdata_spinlock_unlock(&st->data_collection_lock); + q->query.locked_data_collection = false; } - rrddim_foreach_done(rd); + + } + else { + // we have data for this chart + + q->query.execute = true; + } + + return q; +} + +static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) { + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if(!rd->exposed) continue; + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n", + rrddim_id(rd), + (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec, + rd->last_collected_value, + rd->last_calculated_value, + rd->last_stored_value + ); + } + rrddim_foreach_done(rd); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n", + (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec, + (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec + ); +} + +static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) { + size_t dimensions = q->dimensions; + + if(wb && q->query.enable_streaming) + replication_send_chart_collection_state(wb, q->st); + + if(q->query.locked_data_collection) { + netdata_spinlock_unlock(&q->st->data_collection_lock); + q->query.locked_data_collection = false; + } + + // release all the dictionary items acquired + // finalize the queries + size_t queries = 0; + + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if (unlikely(!d->enabled)) continue; + + q->ops->finalize(&d->handle); + + dictionary_acquired_item_release(d->dict, d->rda); + + // update global statistics + queries++; + } + + if(executed) { + netdata_spinlock_lock(&replication_queries.spinlock); + replication_queries.queries_started += queries; + replication_queries.queries_finished += queries; + replication_queries.points_read += q->points_read; + replication_queries.points_generated += q->points_generated; + netdata_spinlock_unlock(&replication_queries.spinlock); } - time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before; + __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); + freez(q); +} + +static void replication_query_align_to_optimal_before(struct replication_query *q) { + if(!q->query.execute || q->query.enable_streaming) + return; + + size_t dimensions = q->dimensions; + time_t expanded_before = 0; + + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if(unlikely(!d->enabled)) continue; + + time_t new_before = q->ops->align_to_optimal_before(&d->handle); + if (!expanded_before || new_before < expanded_before) + expanded_before = new_before; + } + + if(expanded_before > q->query.before && // it is later than the original + (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page) + expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time + expanded_before < q->wall_clock_time) // it is not later than the wall clock time + q->query.before = expanded_before; +} + +static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { + replication_query_align_to_optimal_before(q); + + time_t after = q->query.after; + time_t before = q->query.before; + size_t dimensions = q->dimensions; + struct storage_engine_query_ops *ops = q->ops; + time_t wall_clock_time = q->wall_clock_time; + + size_t points_read = q->points_read, points_generated = q->points_generated; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + time_t actual_after = 0, actual_before = 0; +#endif + + time_t now = after + 1; + time_t last_end_time_in_buffer = 0; while(now <= before) { - time_t min_start_time = 0, min_end_time = 0; + time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0; for (size_t i = 0; i < dimensions ;i++) { - struct replication_dimension *d = &data[i]; - if(unlikely(!d->enabled)) continue; + struct replication_dimension *d = &q->data[i]; + if(unlikely(!d->enabled || d->skip)) continue; // fetch the first valid point for the dimension - int max_skip = 100; - while(d->sp.end_time < now && !ops->is_finished(&d->handle) && max_skip-- > 0) { + int max_skip = 1000; + while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) { d->sp = ops->next_metric(&d->handle); points_read++; } - internal_error(max_skip <= 0, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu", - rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(d->rd), (unsigned long long) now); + if(max_skip <= 0) { + d->skip = true; - if(unlikely(d->sp.end_time < now || storage_point_is_unset(d->sp) || storage_point_is_empty(d->sp))) - continue; + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)", + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd), + (unsigned long long) now); - if(unlikely(!min_start_time)) { - min_start_time = d->sp.start_time; - min_end_time = d->sp.end_time; - } - else { - min_start_time = MIN(min_start_time, d->sp.start_time); - min_end_time = MIN(min_end_time, d->sp.end_time); + continue; } - } - if(unlikely(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1)) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)min_start_time, - (unsigned long long)min_end_time, - (unsigned long long)wall_clock_time); - break; - } + if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s)) + // this dimension does not provide any data + continue; - if(unlikely(min_end_time < now)) { -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - internal_error(true, - "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu", - rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now); -#endif // NETDATA_LOG_REPLICATION_REQUESTS - break; + time_t update_every = d->sp.end_time_s - d->sp.start_time_s; + if(unlikely(!update_every)) + update_every = q->st->update_every; + + if(unlikely(!min_update_every)) + min_update_every = update_every; + + if(unlikely(!min_start_time)) + min_start_time = d->sp.start_time_s; + + if(unlikely(!min_end_time)) + min_end_time = d->sp.end_time_s; + + min_update_every = MIN(min_update_every, update_every); + max_update_every = MAX(max_update_every, update_every); + + min_start_time = MIN(min_start_time, d->sp.start_time_s); + max_start_time = MAX(max_start_time, d->sp.start_time_s); + + min_end_time = MIN(min_end_time, d->sp.end_time_s); + max_end_time = MAX(max_end_time, d->sp.end_time_s); } - if(unlikely(min_end_time <= min_start_time)) - min_start_time = min_end_time - st->update_every; + if (unlikely(min_update_every != max_update_every || + min_start_time != max_start_time)) { - if(unlikely(!actual_after)) { - actual_after = min_end_time; - actual_before = min_end_time; + time_t fix_min_start_time; + if(last_end_time_in_buffer && + last_end_time_in_buffer >= min_start_time && + last_end_time_in_buffer <= max_start_time) { + fix_min_start_time = last_end_time_in_buffer; + } + else + fix_min_start_time = min_end_time - min_update_every; + + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' " + "misaligned dimensions " + "update every (min: %ld, max: %ld), " + "start time (min: %ld, max: %ld), " + "end time (min %ld, max %ld), " + "now %ld, last end time sent %ld, " + "min start time is fixed to %ld", + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), + min_update_every, max_update_every, + min_start_time, max_start_time, + min_end_time, max_end_time, + now, last_end_time_in_buffer, + fix_min_start_time + ); + + min_start_time = fix_min_start_time; } - else + + if(likely(min_start_time <= now && min_end_time >= now)) { + // we have a valid point + + if (unlikely(min_end_time == min_start_time)) + min_start_time = min_end_time - q->st->update_every; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + if (unlikely(!actual_after)) + actual_after = min_end_time; + actual_before = min_end_time; +#endif - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n" - , (unsigned long long)min_start_time - , (unsigned long long)min_end_time - , (unsigned long long)wall_clock_time - ); + if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) { + q->query.before = last_end_time_in_buffer; + q->query.enable_streaming = false; - // output the replay values for this time - for (size_t i = 0; i < dimensions ;i++) { - struct replication_dimension *d = &data[i]; - if(unlikely(!d->enabled)) continue; + internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'. " + "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.", + buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost), + q->request.after, q->request.before, q->request.enable_streaming?"true":"false", + q->query.after, q->query.before, q->query.enable_streaming?"true":"false"); - if(likely(d->sp.start_time <= min_end_time && d->sp.end_time >= min_end_time)) - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n", - rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : ""); + q->query.interrupted = true; - else - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n", - rrddim_id(d->rd)); + break; + } + last_end_time_in_buffer = min_end_time; - points_generated++; - } + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n", + (unsigned long long) min_start_time, + (unsigned long long) min_end_time, + (unsigned long long) wall_clock_time + ); + + // output the replay values for this time + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if (unlikely(!d->enabled)) continue; + + if (likely( d->sp.start_time_s <= min_end_time && + d->sp.end_time_s >= min_end_time && + !storage_point_is_unset(d->sp) && + !storage_point_is_gap(d->sp))) { - now = min_end_time + 1; + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n", + rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : ""); + + points_generated++; + } + } + + now = min_end_time + 1; + } + else if(unlikely(min_end_time < now)) + // the query does not progress + break; + else + // we have gap - all points are in the future + now = min_start_time; } #ifdef NETDATA_LOG_REPLICATION_REQUESTS @@ -202,110 +473,89 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti (unsigned long long)after, (unsigned long long)before); #endif // NETDATA_LOG_REPLICATION_REQUESTS - // release all the dictionary items acquired - // finalize the queries - size_t queries = 0; - for(size_t i = 0; i < dimensions ;i++) { - struct replication_dimension *d = &data[i]; - if(unlikely(!d->enabled)) continue; + q->points_read = points_read; + q->points_generated = points_generated; - ops->finalize(&d->handle); + bool finished_with_gap = false; + if(last_end_time_in_buffer < before - q->st->update_every) + finished_with_gap = true; - dictionary_acquired_item_release(d->dict, d->rda); + return finished_with_gap; +} - // update global statistics - queries++; - } +static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) { + time_t wall_clock_time = now_realtime_sec(); - netdata_spinlock_lock(&replication_queries.spinlock); - replication_queries.queries_started += queries; - replication_queries.queries_finished += queries; - replication_queries.points_read += points_read; - replication_queries.points_generated += points_generated; - netdata_spinlock_unlock(&replication_queries.spinlock); + if(requested_after > requested_before) { + // flip them + time_t t = requested_before; + requested_before = requested_after; + requested_after = t; + } - return before; -} + if(requested_after > wall_clock_time) { + requested_after = 0; + requested_before = 0; + requested_enable_streaming = true; + } -static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) { - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - if(!rd->exposed) continue; - - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n", - rrddim_id(rd), - (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec, - rd->last_collected_value, - rd->last_calculated_value, - rd->last_stored_value - ); + if(requested_before > wall_clock_time) { + requested_before = wall_clock_time; + requested_enable_streaming = true; } - rrddim_foreach_done(rd); - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n", - (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec, - (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec - ); -} + time_t query_after = requested_after; + time_t query_before = requested_before; + bool query_enable_streaming = requested_enable_streaming; -bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) { - time_t query_after = after; - time_t query_before = before; - time_t now = now_realtime_sec(); - time_t tolerance = 2; // sometimes from the time we get this value, to the time we check, - // a data collection has been made - // so, we give this tolerance to detect invalid timestamps - - // find the first entry we have - time_t first_entry_local = rrdset_first_entry_t(st); - if(first_entry_local > now + tolerance) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)first_entry_local, (unsigned long long)now); - first_entry_local = now; + time_t db_first_entry = 0, db_last_entry = 0; + rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0); + + if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) { + // no data requested - just enable streaming + ; } + else { + if (query_after < db_first_entry) + query_after = db_first_entry; - if (query_after < first_entry_local) - query_after = first_entry_local; + if (query_before > db_last_entry) + query_before = db_last_entry; - // find the latest entry we have - time_t last_entry_local = st->last_updated.tv_sec; - if(!last_entry_local) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); - last_entry_local = rrdset_last_entry_t(st); - if(!last_entry_local) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); - last_entry_local = now; + // if the parent asked us to start streaming, then fill the rest with the data that we have + if (requested_enable_streaming) + query_before = db_last_entry; + + if (query_after > query_before) { + time_t tmp = query_before; + query_before = query_after; + query_after = tmp; } - } - if(last_entry_local > now + tolerance) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)last_entry_local, (unsigned long long)now); - last_entry_local = now; + query_enable_streaming = (requested_enable_streaming || + query_before == db_last_entry || + !requested_after || + !requested_before) ? true : false; } - if (query_before > last_entry_local) - query_before = last_entry_local; + return replication_query_prepare( + st, + db_first_entry, db_last_entry, + requested_after, requested_before, requested_enable_streaming, + query_after, query_before, query_enable_streaming, + wall_clock_time); +} - // if the parent asked us to start streaming, then fill the rest with the data that we have - if (start_streaming) - query_before = last_entry_local; +void replication_response_cancel_and_finalize(struct replication_query *q) { + replication_query_finalize(NULL, q, false); +} - if (query_after > query_before) { - time_t tmp = query_before; - query_before = query_after; - query_after = tmp; - } +static bool sender_is_still_connected_for_this_request(struct replication_request *rq); - bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false; +bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) { + struct replication_request *rq = q->rq; + RRDSET *st = q->st; + RRDHOST *host = st->rrdhost; // we might want to optimize this by filling a temporary buffer // and copying the result to the host's buffer in order to avoid @@ -314,25 +564,24 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); - if(after != 0 && before != 0) - before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now); - else { - after = 0; - before = 0; - enable_streaming = true; - } + bool locked_data_collection = q->query.locked_data_collection; + q->query.locked_data_collection = false; - // get again the world clock time - time_t world_clock_time = now_realtime_sec(); - if(enable_streaming) { - if(now < world_clock_time) { - // we needed time to execute this request - // so, the parent will need to replicate more data - enable_streaming = false; - } - else - replicate_chart_collection_state(wb, st); - } + bool finished_with_gap = false; + if(q->query.execute) + finished_with_gap = replication_query_execute(wb, q, max_msg_size); + + time_t after = q->request.after; + time_t before = q->query.before; + bool enable_streaming = q->query.enable_streaming; + + replication_query_finalize(wb, q, q->query.execute); + q = NULL; // IMPORTANT: q is invalid now + + // get a fresh retention to send to the parent + time_t wall_clock_time = now_realtime_sec(); + time_t db_first_entry, db_last_entry; + rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0); // end with first/last entries we have, and the first start time and // last end time of the data we sent @@ -342,7 +591,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t (int)st->update_every // child first db time, child end db time - , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local + , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry // start streaming boolean , enable_streaming ? "true" : "false" @@ -351,13 +600,40 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t , (unsigned long long)after, (unsigned long long)before // child world clock time - , (unsigned long long)world_clock_time + , (unsigned long long)wall_clock_time ); worker_is_busy(WORKER_JOB_BUFFER_COMMIT); sender_commit(host->sender, wb); worker_is_busy(WORKER_JOB_CLEANUP); + if(enable_streaming) { + if(sender_is_still_connected_for_this_request(rq)) { + // enable normal streaming if we have to + // but only if the sender buffer has not been flushed since we started + + if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + + if(!finished_with_gap) + st->upstream_resync_time_s = 0; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); +#endif + } + else + internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + } + } + + if(locked_data_collection) + netdata_spinlock_unlock(&st->data_collection_lock); + return enable_streaming; } @@ -376,14 +652,14 @@ struct replication_request_details { struct { time_t first_entry_t; // the first entry time the child has time_t last_entry_t; // the last entry time the child has - time_t world_time_t; // the current time of the child + time_t wall_clock_time; // the current time of the child + bool fixed_last_entry; // when set we set the last entry to wall clock time } child_db; struct { time_t first_entry_t; // the first entry time we have time_t last_entry_t; // the last entry time we have - bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future, and we fixed - time_t now; // the current local world clock time + time_t wall_clock_time; // the current local world clock time } local_db; struct { @@ -403,9 +679,36 @@ struct replication_request_details { } wanted; }; -static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) { +static void replicate_log_request(struct replication_request_details *r, const char *msg) { +#ifdef NETDATA_INTERNAL_CHECKS + internal_error(true, +#else + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, +#endif + "REPLAY ERROR: 'host:%s/chart:%s' child sent: " + "db from %ld to %ld%s, wall clock time %ld, " + "last request from %ld to %ld, " + "issue: %s - " + "sending replication request from %ld to %ld, start streaming %s", + rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st), + r->child_db.first_entry_t, + r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "", + r->child_db.wall_clock_time, + r->last_request.after, + r->last_request.before, + msg, + r->wanted.after, + r->wanted.before, + r->wanted.start_streaming ? "true" : "false"); +} + +static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) { RRDSET *st = r->st; + if(log) + replicate_log_request(r, msg); + if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t)) st->rrdhost->receiver->replication_first_time_t = r->wanted.after; @@ -422,7 +725,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c internal_error(true, "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: " - "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s" + "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s" , rrdhost_hostname(r->host), rrdset_id(r->st) , r->wanted.after, wanted_after_buf , r->wanted.before, wanted_before_buf @@ -432,7 +735,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c , r->child_db.first_entry_t, r->child_db.last_entry_t , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD" , r->local_db.first_entry_t, r->local_db.last_entry_t - , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now + , r->local_db.now , r->gap.from, r->gap.to , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL" , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : "" @@ -459,7 +762,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c } bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st, - time_t first_entry_child, time_t last_entry_child, time_t child_world_time, + time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time, time_t prev_first_entry_wanted, time_t prev_last_entry_wanted) { struct replication_request_details r = { @@ -472,16 +775,16 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST .st = st, .child_db = { - .first_entry_t = first_entry_child, - .last_entry_t = last_entry_child, - .world_time_t = child_world_time, + .first_entry_t = child_first_entry, + .last_entry_t = child_last_entry, + .wall_clock_time = child_wall_clock_time, + .fixed_last_entry = false, }, .local_db = { - .first_entry_t = rrdset_first_entry_t(st), - .last_entry_t = rrdset_last_entry_t(st), - .last_entry_t_adjusted_to_now = false, - .now = now_realtime_sec(), + .first_entry_t = 0, + .last_entry_t = 0, + .wall_clock_time = now_realtime_sec(), }, .last_request = { @@ -496,12 +799,14 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST }, }; - // check our local database retention - if(r.local_db.last_entry_t > r.local_db.now) { - r.local_db.last_entry_t = r.local_db.now; - r.local_db.last_entry_t_adjusted_to_now = true; + if(r.child_db.last_entry_t > r.child_db.wall_clock_time) { + replicate_log_request(&r, "child's db last entry > child's wall clock time"); + r.child_db.last_entry_t = r.child_db.wall_clock_time; + r.child_db.fixed_last_entry = true; } + rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0); + // let's find the GAP we have if(!r.last_request.after || !r.last_request.before) { // there is no previous request @@ -511,7 +816,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST r.gap.from = r.local_db.last_entry_t; else // we don't have any data, the gap is the max timeframe we are allowed to replicate - r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate; + r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate; } else { @@ -522,27 +827,30 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST } // we want all the data up to now - r.gap.to = r.local_db.now; + r.gap.to = r.local_db.wall_clock_time; // The gap is now r.gap.from -> r.gap.to if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) - return send_replay_chart_cmd(&r, "empty replication request, replication is disabled"); - - if (unlikely(!r.child_db.last_entry_t)) - return send_replay_chart_cmd(&r, "empty replication request, child has no stored data"); + return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false); if (unlikely(!rrdset_number_of_dimensions(st))) - return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions"); + return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false); + + if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false); - if (r.child_db.first_entry_t <= 0) - return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid"); + if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0)) + return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true); - if (r.child_db.first_entry_t > r.child_db.last_entry_t) - return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)"); + if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time)) + return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true); - if (r.local_db.last_entry_t > r.child_db.last_entry_t) - return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one"); + if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true); + + if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false); // let's find what the child can provide to fill that gap @@ -564,15 +872,22 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST if(r.wanted.before > r.child_db.last_entry_t) r.wanted.before = r.child_db.last_entry_t; - if(r.wanted.after > r.wanted.before) - r.wanted.after = r.wanted.before; + if(r.wanted.after > r.wanted.before) { + r.wanted.after = 0; + r.wanted.before = 0; + r.wanted.start_streaming = true; + return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true); + } // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child - r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t); + r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step || + r.wanted.before >= r.child_db.last_entry_t || + r.wanted.before >= r.child_db.wall_clock_time || + r.wanted.before >= r.local_db.wall_clock_time); // the wanted timeframe is now r.wanted.after -> r.wanted.before // send it - return send_replay_chart_cmd(&r, "OK"); + return send_replay_chart_cmd(&r, "OK", false); } // ---------------------------------------------------------------------------- @@ -585,13 +900,20 @@ struct replication_request { STRING *chart_id; // the chart of the request time_t after; // the start time of the query (maybe zero) key for sorting (JudyL) time_t before; // the end time of the query (maybe zero) - bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request Word_t unique_id; // auto-increment, later requests have bigger - bool found; // used as a result boolean for the find call + + bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming bool indexed_in_judy; // true when the request is indexed in judy bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full + bool not_indexed_preprocessing; // true when the request is not indexed, but it is pending in preprocessing + + // prepare ahead members - preprocessing + bool found; // used as a result boolean for the find call + bool executed; // used to detect if we have skipped requests while preprocessing + RRDSET *st; // caching of the chart during preprocessing + struct replication_query *q; // the preprocessing query initialization }; // replication sort entry in JudyL array @@ -631,6 +953,7 @@ static struct replication_thread { struct { size_t executed; // the number of replication requests executed size_t latest_first_time; // the 'after' timestamp of the last request we executed + size_t memory; // the total memory allocated by replication } atomic; // access should be with atomic operations struct { @@ -663,6 +986,7 @@ static struct replication_thread { .atomic = { .executed = 0, .latest_first_time = 0, + .memory = 0, }, .main_thread = { .last_executed = 0, @@ -671,6 +995,10 @@ static struct replication_thread { }, }; +size_t replication_allocated_memory(void) { + return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED); +} + #define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED) #define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED) @@ -723,6 +1051,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc fatal_when_replication_is_not_locked_for_me(); struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry)); + __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); rrdpush_sender_pending_replication_requests_plus_one(rq->sender); @@ -734,11 +1063,13 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc rq->unique_id = rse->unique_id; rq->indexed_in_judy = false; rq->not_indexed_buffer_full = false; + rq->not_indexed_preprocessing = false; return rse; } static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { freez(rse); + __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); } static void replication_sort_entry_add(struct replication_request *rq) { @@ -747,6 +1078,7 @@ static void replication_sort_entry_add(struct replication_request *rq) { if(rrdpush_sender_replication_buffer_full_get(rq->sender)) { rq->indexed_in_judy = false; rq->not_indexed_buffer_full = true; + rq->not_indexed_preprocessing = false; replication_globals.unsafe.pending_no_room++; replication_recursive_unlock(); return; @@ -771,23 +1103,33 @@ static void replication_sort_entry_add(struct replication_request *rq) { Pvoid_t *inner_judy_ptr; // find the outer judy entry, using after as key - inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); - if(!inner_judy_ptr) - inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); + size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); + size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR)) + fatal("REPLICATION: corrupted outer judyL"); // add it to the inner judy, using unique_id as key + size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr); Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); + size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr); + if(unlikely(!item || item == PJERR)) + fatal("REPLICATION: corrupted inner judyL"); + *item = rse; rq->indexed_in_judy = true; rq->not_indexed_buffer_full = false; + rq->not_indexed_preprocessing = false; if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t) replication_globals.unsafe.first_time_t = rq->after; replication_recursive_unlock(); + + __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED); } -static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) { +static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) { fatal_when_replication_is_not_locked_for_me(); bool inner_judy_deleted = false; @@ -798,19 +1140,30 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender); rse->rq->indexed_in_judy = false; + rse->rq->not_indexed_preprocessing = preprocessing; + + size_t memory_saved = 0; // delete it from the inner judy + size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0); + size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); + memory_saved = mem_before_inner_judyl - mem_after_inner_judyl; // if no items left, delete it from the outer judy if(**inner_judy_ppptr == NULL) { + size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0); + size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + memory_saved += mem_before_outer_judyl - mem_after_outer_judyl; inner_judy_deleted = true; } // free memory replication_sort_entry_destroy(rse); + __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED); + return inner_judy_deleted; } @@ -826,7 +1179,7 @@ static void replication_sort_entry_del(struct replication_request *rq, bool buff Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0); if (our_item_pptr) { rse_to_delete = *our_item_pptr; - replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr); + replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr, false); if(buffer_full) { replication_globals.unsafe.pending_no_room++; @@ -844,13 +1197,6 @@ static void replication_sort_entry_del(struct replication_request *rq, bool buff replication_recursive_unlock(); } -static inline PPvoid_t JudyLFirstOrNext(Pcvoid_t PArray, Word_t * PIndex, bool first) { - if(unlikely(first)) - return JudyLFirst(PArray, PIndex, PJE0); - - return JudyLNext(PArray, PIndex, PJE0); -} - static struct replication_request replication_request_get_first_available() { Pvoid_t *inner_judy_pptr; @@ -881,7 +1227,7 @@ static struct replication_request replication_request_get_first_available() { } bool find_same_after = true; - while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, find_same_after))) { + while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) { Pvoid_t *our_item_pptr; if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after)) @@ -898,14 +1244,11 @@ static struct replication_request replication_request_get_first_available() { // set the return result to found rq_to_return.found = true; - if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr, true)) // we removed the item from the outer JudyL break; } - // call JudyLNext from now on - find_same_after = false; - // prepare for the next iteration on the outer loop replication_globals.unsafe.queue.unique_id = 0; } @@ -945,7 +1288,7 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ replication_recursive_lock(); - if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) { + if(!rq->indexed_in_judy && rq->not_indexed_buffer_full && !rq->not_indexed_preprocessing) { // we can replace this command internal_error( true, @@ -958,7 +1301,7 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ rq->before = rq_new->before; rq->start_streaming = rq_new->start_streaming; } - else if(!rq->indexed_in_judy) { + else if(!rq->indexed_in_judy && !rq->not_indexed_preprocessing) { replication_sort_entry_add(rq); internal_error( true, @@ -1001,55 +1344,57 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma string_freez(rq->chart_id); } +static bool sender_is_still_connected_for_this_request(struct replication_request *rq) { + return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender); +}; + static bool replication_execute_request(struct replication_request *rq, bool workers) { bool ret = false; - if(likely(workers)) - worker_is_busy(WORKER_JOB_FIND_CHART); + if(!rq->st) { + if(likely(workers)) + worker_is_busy(WORKER_JOB_FIND_CHART); + + rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); + } - RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); - if(!st) { + if(!rq->st) { internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found", rrdhost_hostname(rq->sender->host), string2str(rq->chart_id)); goto cleanup; } - if(likely(workers)) - worker_is_busy(WORKER_JOB_QUERYING); - netdata_thread_disable_cancelability(); - // send the replication data - bool start_streaming = replicate_chart_response( - st->rrdhost, st, rq->start_streaming, rq->after, rq->before); + if(!rq->q) { + if(likely(workers)) + worker_is_busy(WORKER_JOB_PREPARE_QUERY); - netdata_thread_enable_cancelability(); + rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before); + } - if(start_streaming && rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender)) { - // enable normal streaming if we have to - // but only if the sender buffer has not been flushed since we started + if(likely(workers)) + worker_is_busy(WORKER_JOB_QUERYING); - if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { - rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); - rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); - rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + // send the replication data + rq->q->rq = rq; + replication_response_execute_and_finalize( + rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL)); -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); -#endif - } - else - internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating", - rrdhost_hostname(st->rrdhost), string2str(rq->chart_id)); - } + rq->q = NULL; + netdata_thread_enable_cancelability(); __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED); ret = true; cleanup: + if(rq->q) { + replication_response_cancel_and_finalize(rq->q); + rq->q = NULL; + } + string_freez(rq->chart_id); worker_is_idle(); return ret; @@ -1068,6 +1413,7 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender), .indexed_in_judy = false, .not_indexed_buffer_full = false, + .not_indexed_preprocessing = false, }; if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) @@ -1079,13 +1425,13 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, void replication_sender_delete_pending_requests(struct sender_state *sender) { // allow the dictionary destructor to go faster on locks - replication_recursive_lock(); dictionary_flush(sender->replication.requests); - replication_recursive_unlock(); } void replication_init_sender(struct sender_state *sender) { - sender->replication.requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(struct replication_request)); + dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender); dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender); dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender); @@ -1107,9 +1453,8 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { struct replication_request *rq; dfe_start_read(s->replication.requests, rq) { - if(rq->indexed_in_judy && !rq->not_indexed_buffer_full) { + if(rq->indexed_in_judy) replication_sort_entry_del(rq, true); - } } dfe_done(rq); @@ -1122,9 +1467,8 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { struct replication_request *rq; dfe_start_read(s->replication.requests, rq) { - if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) { + if(!rq->indexed_in_judy && (rq->not_indexed_buffer_full || rq->not_indexed_preprocessing)) replication_sort_entry_add(rq); - } } dfe_done(rq); @@ -1214,6 +1558,7 @@ static void replication_initialize_workers(bool master) { worker_register_job_name(WORKER_JOB_QUERYING, "querying"); worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete"); worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart"); + worker_register_job_name(WORKER_JOB_PREPARE_QUERY, "prepare query"); worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency"); worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit"); worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup"); @@ -1235,24 +1580,137 @@ static void replication_initialize_workers(bool master) { #define REQUEST_QUEUE_EMPTY (-1) #define REQUEST_CHART_NOT_FOUND (-2) -static int replication_execute_next_pending_request(void) { - worker_is_busy(WORKER_JOB_FIND_NEXT); - struct replication_request rq = replication_request_get_first_available(); +static int replication_execute_next_pending_request(bool cancel) { + static __thread int max_requests_ahead = 0; + static __thread struct replication_request *rqs = NULL; + static __thread int rqs_last_executed = 0, rqs_last_prepared = 0; + static __thread size_t queue_rounds = 0; (void)queue_rounds; + struct replication_request *rq; + + if(unlikely(cancel)) { + if(rqs) { + size_t cancelled = 0; + do { + if (++rqs_last_executed >= max_requests_ahead) + rqs_last_executed = 0; + + rq = &rqs[rqs_last_executed]; + + if (rq->q) { + internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); + internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq"); + + replication_response_cancel_and_finalize(rq->q); + rq->q = NULL; + cancelled++; + } + + rq->executed = true; + rq->found = false; + + } while (rqs_last_executed != rqs_last_prepared); + + internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled); + } + return REQUEST_QUEUE_EMPTY; + } + + if(unlikely(!rqs)) { + max_requests_ahead = get_netdata_cpus() / 2; + + if(max_requests_ahead > libuv_worker_threads * 2) + max_requests_ahead = libuv_worker_threads * 2; + + if(max_requests_ahead < 2) + max_requests_ahead = 2; + + rqs = callocz(max_requests_ahead, sizeof(struct replication_request)); + __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED); + } + + // fill the queue + do { + if(++rqs_last_prepared >= max_requests_ahead) { + rqs_last_prepared = 0; + queue_rounds++; + } + + internal_fatal(rqs[rqs_last_prepared].q, + "REPLAY FATAL: slot is used by query that has not been executed!"); + + worker_is_busy(WORKER_JOB_FIND_NEXT); + rqs[rqs_last_prepared] = replication_request_get_first_available(); + rq = &rqs[rqs_last_prepared]; + + if(rq->found) { + if (!rq->st) { + worker_is_busy(WORKER_JOB_FIND_CHART); + rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); + } + + if (rq->st && !rq->q) { + worker_is_busy(WORKER_JOB_PREPARE_QUERY); + rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before); + } - if(unlikely(!rq.found)) { + rq->executed = false; + } + + } while(rq->found && rqs_last_prepared != rqs_last_executed); + + // pick the first usable + do { + if (++rqs_last_executed >= max_requests_ahead) + rqs_last_executed = 0; + + rq = &rqs[rqs_last_executed]; + + if(rq->found) { + internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); + + if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) { + // the sender has reconnected since this request was queued, + // we can safely throw it away, since the parent will resend it + replication_response_cancel_and_finalize(rq->q); + rq->executed = true; + rq->found = false; + rq->q = NULL; + } + else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) { + // the sender buffer is full, so we can ignore this request, + // it has already been marked as 'preprocessed' in the dictionary, + // and the sender will put it back in when there is + // enough room in the buffer for processing replication requests + replication_response_cancel_and_finalize(rq->q); + rq->executed = true; + rq->found = false; + rq->q = NULL; + } + else { + // we can execute this, + // delete it from the dictionary + worker_is_busy(WORKER_JOB_DELETE_ENTRY); + dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id)); + } + } + else + internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!"); + + } while(!rq->found && rqs_last_executed != rqs_last_prepared); + + if(unlikely(!rq->found)) { worker_is_idle(); return REQUEST_QUEUE_EMPTY; } - // delete the request from the dictionary - worker_is_busy(WORKER_JOB_DELETE_ENTRY); - if(!dictionary_del(rq.sender->replication.requests, string2str(rq.chart_id))) - error("REPLAY ERROR: 'host:%s/chart:%s' failed to be deleted from sender pending charts index", - rrdhost_hostname(rq.sender->host), string2str(rq.chart_id)); + replication_set_latest_first_time(rq->after); - replication_set_latest_first_time(rq.after); + bool chart_found = replication_execute_request(rq, true); + rq->executed = true; + rq->found = false; + rq->q = NULL; - if(unlikely(!replication_execute_request(&rq, true))) { + if(unlikely(!chart_found)) { worker_is_idle(); return REQUEST_CHART_NOT_FOUND; } @@ -1262,6 +1720,7 @@ static int replication_execute_next_pending_request(void) { } static void replication_worker_cleanup(void *ptr __maybe_unused) { + replication_execute_next_pending_request(true); worker_unregister(); } @@ -1270,8 +1729,9 @@ static void *replication_worker_thread(void *ptr) { netdata_thread_cleanup_push(replication_worker_cleanup, ptr); - while(!netdata_exit) { - if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + while(service_running(SERVICE_REPLICATION)) { + if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) { + sender_thread_buffer_free(); worker_is_busy(WORKER_JOB_WAIT); worker_is_idle(); sleep_usec(1 * USEC_PER_SEC); @@ -1286,13 +1746,17 @@ static void replication_main_cleanup(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + replication_execute_next_pending_request(true); + int threads = (int)replication_globals.main_thread.threads; for(int i = 0; i < threads ;i++) { netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL); freez(replication_globals.main_thread.threads_ptrs[i]); + __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED); } freez(replication_globals.main_thread.threads_ptrs); replication_globals.main_thread.threads_ptrs = NULL; + __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); // custom code worker_unregister(); @@ -1312,10 +1776,14 @@ void *replication_thread_main(void *ptr __maybe_unused) { if(--threads) { replication_globals.main_thread.threads = threads; replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *)); + __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); for(int i = 0; i < threads ;i++) { + char tag[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2); replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t)); - netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], "REPLICATION", + __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED); + netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag, NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL); } } @@ -1333,7 +1801,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { size_t last_executed = 0; size_t last_sender_resets = 0; - while(!netdata_exit) { + while(service_running(SERVICE_REPLICATION)) { // statistics usec_t now_mono_ut = now_monotonic_usec(); @@ -1395,7 +1863,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { worker_is_idle(); } - if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) { worker_is_busy(WORKER_JOB_WAIT); replication_recursive_lock(); @@ -1403,14 +1871,16 @@ void *replication_thread_main(void *ptr __maybe_unused) { // the timeout also defines now frequently we will traverse all the pending requests // when the outbound buffers of all senders is full usec_t timeout; - if(slow) + if(slow) { // no work to be done, wait for a request to come in timeout = 1000 * USEC_PER_MS; + sender_thread_buffer_free(); + } else if(replication_globals.unsafe.pending > 0) { - if(replication_globals.unsafe.sender_resets == last_sender_resets) { + if(replication_globals.unsafe.sender_resets == last_sender_resets) timeout = 1000 * USEC_PER_MS; - } + else { // there are pending requests waiting to be executed, // but none could be executed at this time. diff --git a/streaming/replication.h b/streaming/replication.h index 00462cc3..f5b64706 100644 --- a/streaming/replication.h +++ b/streaming/replication.h @@ -21,7 +21,7 @@ typedef int (*send_command)(const char *txt, void *data); bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *rh, RRDSET *rs, - time_t first_entry_child, time_t last_entry_child, time_t child_world_time, + time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time, time_t response_first_start_time, time_t response_last_end_time); void replication_init_sender(struct sender_state *sender); @@ -30,4 +30,7 @@ void replication_sender_delete_pending_requests(struct sender_state *sender); void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming); void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s); +size_t replication_allocated_memory(void); +size_t replication_allocated_buffers(void); + #endif /* REPLICATION_H */ diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index a57f1b08..256fa828 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -108,7 +108,7 @@ int rrdpush_init() { default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate); default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB, "seconds per replication step", default_rrdpush_replication_step); - rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time); + rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s); #ifdef ENABLE_COMPRESSION default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, @@ -295,40 +295,14 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) { - time_t first_entry_local = rrdset_first_entry_t_of_tier(st, 0); - time_t last_entry_local = st->last_updated.tv_sec; - - if(unlikely(!last_entry_local)) - last_entry_local = rrdset_last_entry_t(st); + time_t db_first_time_t, db_last_time_t; time_t now = now_realtime_sec(); - if(unlikely(last_entry_local > now)) { - internal_error(true, - "RRDSET REPLAY ERROR: 'host:%s/chart:%s' last updated time %ld is in the future, adjusting it to now %ld", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - last_entry_local, now); - last_entry_local = now; - } - - if(unlikely(first_entry_local && last_entry_local && first_entry_local >= last_entry_local)) { - internal_error(true, - "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first updated time %ld is equal or bigger than last updated time %ld, adjusting it last updated time - update every", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - first_entry_local, last_entry_local); - first_entry_local = last_entry_local - st->update_every; - } - - if(unlikely(!first_entry_local && last_entry_local)) { - internal_error(true, - "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first time %ld, last time %ld, setting both to last time", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - first_entry_local, last_entry_local); - first_entry_local = last_entry_local; - } + rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_time_t, &db_last_time_t, now, 0); buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n", - (unsigned long long)first_entry_local, - (unsigned long long)last_entry_local, + (unsigned long long)db_first_time_t, + (unsigned long long)db_last_time_t, (unsigned long long)now); rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); @@ -342,17 +316,17 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { #endif } - st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); + st->upstream_resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); return replication_progress; } // sends the current chart dimensions -static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) { +static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s __maybe_unused, RRDSET_FLAGS flags) { buffer_fast_strcat(wb, "BEGIN \"", 7); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "\" ", 2); - if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time) + if(st->last_collected_time.tv_sec > st->upstream_resync_time_s) buffer_print_llu(wb, st->usec_since_last_update); else buffer_fast_strcat(wb, "0", 1); @@ -399,6 +373,7 @@ bool rrdset_push_chart_definition_now(RRDSET *st) { BUFFER *wb = sender_start(host->sender); rrdpush_send_chart_definition(wb, st); sender_commit(host->sender, wb); + sender_thread_buffer_free(); return true; } @@ -463,6 +438,8 @@ void rrdpush_send_host_labels(RRDHOST *host) { buffer_sprintf(wb, "OVERWRITE %s\n", "labels"); sender_commit(host->sender, wb); + + sender_thread_buffer_free(); } void rrdpush_claimed_id(RRDHOST *host) @@ -480,6 +457,8 @@ void rrdpush_claimed_id(RRDHOST *host) rrdhost_aclk_state_unlock(host); sender_commit(host->sender, wb); + + sender_thread_buffer_free(); } int connect_to_one_of_destinations( @@ -496,20 +475,11 @@ int connect_to_one_of_destinations( for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) { time_t now = now_realtime_sec(); - if(d->postpone_reconnection_until > now) { - info( - "STREAM %s: skipping destination '%s' (default port: %d) due to last error (code: %d, %s), will retry it in %d seconds", - rrdhost_hostname(host), - string2str(d->destination), - default_port, - d->last_handshake, d->last_error?d->last_error:"unset reason description", - (int)(d->postpone_reconnection_until - now)); - + if(d->postpone_reconnection_until > now) continue; - } info( - "STREAM %s: attempting to connect to '%s' (default port: %d)...", + "STREAM %s: connecting to '%s' (default port: %d)...", rrdhost_hostname(host), string2str(d->destination), default_port); @@ -528,8 +498,8 @@ int connect_to_one_of_destinations( // move the current item to the end of the list // without this, this destination will break the loop again and again // not advancing the destinations to find one that may work - DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, d, prev, next); - DOUBLE_LINKED_LIST_APPEND_UNSAFE(host->destinations, d, prev, next); + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, d, prev, next); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(host->destinations, d, prev, next); break; } @@ -550,7 +520,9 @@ bool destinations_init_add_one(char *entry, void *data) { struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations)); d->destination = string_strdupz(entry); - DOUBLE_LINKED_LIST_APPEND_UNSAFE(t->list, d, prev, next); + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED); + + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next); t->count++; info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); @@ -577,9 +549,10 @@ void rrdpush_destinations_init(RRDHOST *host) { void rrdpush_destinations_free(RRDHOST *host) { while (host->destinations) { struct rrdpush_destinations *tmp = host->destinations; - DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, tmp, prev, next); + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, tmp, prev, next); string_freez(tmp->destination); freez(tmp); + __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED); } host->destinations = NULL; @@ -590,25 +563,16 @@ void rrdpush_destinations_free(RRDHOST *host) { // Either the receiver lost the connection or the host is being destroyed. // The sender mutex guards thread creation, any spurious data is wiped on reconnection. -void rrdpush_sender_thread_stop(RRDHOST *host) { - +void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) { if (!host->sender) return; netdata_mutex_lock(&host->sender->mutex); - netdata_thread_t thr = 0; if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { - rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); - - info("STREAM %s [send]: signaling sending thread to stop...", rrdhost_hostname(host)); - - // signal the thread that we want to join it - rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN); - // copy the thread id, so that we will be waiting for the right one - // even if a new one has been spawn - thr = host->rrdpush_sender_thread; + host->sender->exit.shutdown = true; + host->sender->exit.reason = reason; // signal it to cancel netdata_thread_cancel(host->rrdpush_sender_thread); @@ -616,11 +580,14 @@ void rrdpush_sender_thread_stop(RRDHOST *host) { netdata_mutex_unlock(&host->sender->mutex); - if(thr != 0) { - info("STREAM %s [send]: waiting for the sending thread to stop...", rrdhost_hostname(host)); - void *result; - netdata_thread_join(thr, &result); - info("STREAM %s [send]: sending thread has exited.", rrdhost_hostname(host)); + if(wait) { + netdata_mutex_lock(&host->sender->mutex); + while(host->sender->tid) { + netdata_mutex_unlock(&host->sender->mutex); + sleep_usec(10 * USEC_PER_MS); + netdata_mutex_lock(&host->sender->mutex); + } + netdata_mutex_unlock(&host->sender->mutex); } } @@ -638,9 +605,9 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) { if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { char tag[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", rrdhost_hostname(host)); + snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host)); - if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host->sender)) + if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender)) error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); else rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); @@ -654,7 +621,7 @@ int rrdpush_receiver_permission_denied(struct web_client *w) { // to prevent an attacker from gaining info about the error buffer_flush(w->response.data); buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info."); - return 401; + return HTTP_RESP_UNAUTHORIZED; } int rrdpush_receiver_too_busy_now(struct web_client *w) { @@ -662,21 +629,42 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) { // to prevent an attacker from gaining info about the error buffer_flush(w->response.data); buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later."); - return 503; + return HTTP_RESP_SERVICE_UNAVAILABLE; } void *rrdpush_receiver_thread(void *ptr); int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { - info("clients wants to STREAM metrics."); - char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *abbrev_timezone = "UTC", *tags = NULL; - int32_t utc_offset = 0; - int update_every = default_rrd_update_every; - uint32_t stream_version = UINT_MAX; - char buf[GUID_LEN + 1]; + if(!service_running(ABILITY_STREAMING_CONNECTIONS)) + return rrdpush_receiver_too_busy_now(w); + + struct receiver_state *rpt = callocz(1, sizeof(*rpt)); + rpt->last_msg_t = now_realtime_sec(); + rpt->capabilities = STREAM_CAP_INVALID; + rpt->hops = 1; + + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); + + rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info)); + rpt->system_info->hops = rpt->hops; + + rpt->fd = w->ifd; + rpt->client_ip = strdupz(w->client_ip); + rpt->client_port = strdupz(w->client_port); + + rpt->config.update_every = default_rrd_update_every; + +#ifdef ENABLE_HTTPS + rpt->ssl.conn = w->ssl.conn; + rpt->ssl.flags = w->ssl.flags; + + w->ssl.conn = NULL; + w->ssl.flags = NETDATA_SSL_START; +#endif + + // parse the parameters and fill rpt and rpt->system_info - struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); - system_info->hops = 1; while(url) { char *value = mystrsep(&url, "&"); if(!value || !*value) continue; @@ -685,178 +673,307 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { if(!name || !*name) continue; if(!value || !*value) continue; - if(!strcmp(name, "key")) - key = value; - else if(!strcmp(name, "hostname")) - hostname = value; - else if(!strcmp(name, "registry_hostname")) - registry_hostname = value; - else if(!strcmp(name, "machine_guid")) - machine_guid = value; + if(!strcmp(name, "key") && !rpt->key) + rpt->key = strdupz(value); + + else if(!strcmp(name, "hostname") && !rpt->hostname) + rpt->hostname = strdupz(value); + + else if(!strcmp(name, "registry_hostname") && !rpt->registry_hostname) + rpt->registry_hostname = strdupz(value); + + else if(!strcmp(name, "machine_guid") && !rpt->machine_guid) + rpt->machine_guid = strdupz(value); + else if(!strcmp(name, "update_every")) - update_every = (int)strtoul(value, NULL, 0); - else if(!strcmp(name, "os")) - os = value; - else if(!strcmp(name, "timezone")) - timezone = value; - else if(!strcmp(name, "abbrev_timezone")) - abbrev_timezone = value; + rpt->config.update_every = (int)strtoul(value, NULL, 0); + + else if(!strcmp(name, "os") && !rpt->os) + rpt->os = strdupz(value); + + else if(!strcmp(name, "timezone") && !rpt->timezone) + rpt->timezone = strdupz(value); + + else if(!strcmp(name, "abbrev_timezone") && !rpt->abbrev_timezone) + rpt->abbrev_timezone = strdupz(value); + else if(!strcmp(name, "utc_offset")) - utc_offset = (int32_t)strtol(value, NULL, 0); + rpt->utc_offset = (int32_t)strtol(value, NULL, 0); + else if(!strcmp(name, "hops")) - system_info->hops = (uint16_t) strtoul(value, NULL, 0); + rpt->hops = rpt->system_info->hops = (uint16_t) strtoul(value, NULL, 0); + else if(!strcmp(name, "ml_capable")) - system_info->ml_capable = strtoul(value, NULL, 0); + rpt->system_info->ml_capable = strtoul(value, NULL, 0); + else if(!strcmp(name, "ml_enabled")) - system_info->ml_enabled = strtoul(value, NULL, 0); + rpt->system_info->ml_enabled = strtoul(value, NULL, 0); + else if(!strcmp(name, "mc_version")) - system_info->mc_version = strtoul(value, NULL, 0); - else if(!strcmp(name, "tags")) - tags = value; - else if(!strcmp(name, "ver")) - stream_version = convert_stream_version_to_capabilities(strtoul(value, NULL, 0)); + rpt->system_info->mc_version = strtoul(value, NULL, 0); + + else if(!strcmp(name, "tags") && !rpt->tags) + rpt->tags = strdupz(value); + + else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID)) + rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0)); + else { // An old Netdata child does not have a compatible streaming protocol, map to something sane. if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME")) name = "NETDATA_HOST_OS_NAME"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID")) name = "NETDATA_HOST_OS_ID"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE")) name = "NETDATA_HOST_OS_ID_LIKE"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION")) name = "NETDATA_HOST_OS_VERSION"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID")) name = "NETDATA_HOST_OS_VERSION_ID"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION")) name = "NETDATA_HOST_OS_DETECTION"; - else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && stream_version == UINT_MAX) { - stream_version = convert_stream_version_to_capabilities(1); - } - if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) { - info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", - w->client_ip, w->client_port, name, value); + else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID)) + rpt->capabilities = convert_stream_version_to_capabilities(1); + + if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) { + info("STREAM '%s' [receive from [%s]:%s]: " + "request has parameter '%s' = '%s', which is not used." + , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-" + , rpt->client_ip, rpt->client_port + , name, value); } } } - if (stream_version == UINT_MAX) - stream_version = convert_stream_version_to_capabilities(0); + if (rpt->capabilities & STREAM_CAP_INVALID) + // no version is supplied, assume version 0; + rpt->capabilities = convert_stream_version_to_capabilities(0); - if(!key || !*key) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO KEY"); - error("STREAM [receive from [%s]:%s]: request without an API key. Forbidding access.", w->client_ip, w->client_port); - return rrdpush_receiver_permission_denied(w); + // find the program name and version + if(w->user_agent && w->user_agent[0]) { + char *t = strchr(w->user_agent, '/'); + if(t && *t) { + *t = '\0'; + t++; + } + + rpt->program_name = strdupz(w->user_agent); + if(t && *t) rpt->program_version = strdupz(t); } - if(!hostname || !*hostname) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO HOSTNAME"); - error("STREAM [receive from [%s]:%s]: request without a hostname. Forbidding access.", w->client_ip, w->client_port); + // check if we should accept this connection + + if(!rpt->key || !*rpt->key) { + rrdpush_receive_log_status( + rpt, + "request without an API key", + "NO API KEY PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } - if(!machine_guid || !*machine_guid) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO MACHINE GUID"); - error("STREAM [receive from [%s]:%s]: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port); + if(!rpt->hostname || !*rpt->hostname) { + rrdpush_receive_log_status( + rpt, + "request without a hostname", + "NO HOSTNAME PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } - if(regenerate_guid(key, buf) == -1) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID KEY"); - error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key); + if(!rpt->registry_hostname) + rpt->registry_hostname = strdupz(rpt->hostname); + + if(!rpt->machine_guid || !*rpt->machine_guid) { + rrdpush_receive_log_status( + rpt, + "request without a machine GUID", + "NO MACHINE GUID PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } - if(regenerate_guid(machine_guid, buf) == -1) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID MACHINE GUID"); - error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid); - return rrdpush_receiver_permission_denied(w); + { + char buf[GUID_LEN + 1]; + + if (regenerate_guid(rpt->key, buf) == -1) { + rrdpush_receive_log_status( + rpt, + "API key is not a valid UUID (use the command uuidgen to generate one)", + "INVALID API KEY PERMISSION DENIED"); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + + if (regenerate_guid(rpt->machine_guid, buf) == -1) { + rrdpush_receive_log_status( + rpt, + "machine GUID is not a valid UUID", + "INVALID MACHINE GUID PERMISSION DENIED"); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } } - const char *api_key_type = appconfig_get(&stream_config, key, "type", "api"); + const char *api_key_type = appconfig_get(&stream_config, rpt->key, "type", "api"); if(!api_key_type || !*api_key_type) api_key_type = "unknown"; if(strcmp(api_key_type, "api") != 0) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - API KEY GIVEN IS NOT API KEY"); - error("STREAM [receive from [%s]:%s]: API key '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, key, api_key_type); + rrdpush_receive_log_status( + rpt, + "API key is a machine GUID", + "INVALID API KEY PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } - if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ENABLED"); - error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key); + if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) { + rrdpush_receive_log_status( + rpt, + "API key is not enabled", + "API KEY DISABLED PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } { - SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT); + SIMPLE_PATTERN *key_allow_from = simple_pattern_create( + appconfig_get(&stream_config, rpt->key, "allow from", "*"), + NULL, SIMPLE_PATTERN_EXACT); + if(key_allow_from) { if(!simple_pattern_matches(key_allow_from, w->client_ip)) { simple_pattern_free(key_allow_from); - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP"); - error("STREAM [receive from [%s]:%s]: API key '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, key); + + rrdpush_receive_log_status( + rpt, + "API key is not allowed from this IP", + "NOT ALLOWED IP PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } + simple_pattern_free(key_allow_from); } } - const char *machine_guid_type = appconfig_get(&stream_config, machine_guid, "type", "machine"); - if(!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown"; - if(strcmp(machine_guid_type, "machine") != 0) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID GIVEN IS NOT A MACHINE GUID"); - error("STREAM [receive from [%s]:%s]: machine GUID '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid, machine_guid_type); - return rrdpush_receiver_permission_denied(w); + { + const char *machine_guid_type = appconfig_get(&stream_config, rpt->machine_guid, "type", "machine"); + if (!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown"; + + if (strcmp(machine_guid_type, "machine") != 0) { + rrdpush_receive_log_status( + rpt, + "machine GUID is an API key", + "INVALID MACHINE GUID PERMISSION DENIED"); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } } - if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ENABLED"); - error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid); + if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) { + rrdpush_receive_log_status( + rpt, + "machine GUID is not enabled", + "MACHINE GUID DISABLED PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } { - SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT); + SIMPLE_PATTERN *machine_allow_from = simple_pattern_create( + appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"), + NULL, SIMPLE_PATTERN_EXACT); + if(machine_allow_from) { if(!simple_pattern_matches(machine_allow_from, w->client_ip)) { simple_pattern_free(machine_allow_from); - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP"); - error("STREAM [receive from [%s]:%s]: Machine GUID '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, machine_guid); + + rrdpush_receive_log_status( + rpt, + "machine GUID is not allowed from this IP", + "NOT ALLOWED IP PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } + simple_pattern_free(machine_allow_from); } } + if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { + + rrdpush_receive_log_status( + rpt, + "machine GUID is my own", + "LOCALHOST PERMISSION DENIED"); + + char initial_response[HTTP_HEADER_SIZE + 1]; + snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); + + if(send_timeout( +#ifdef ENABLE_HTTPS + &rpt->ssl, +#endif + rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { + + error("STREAM '%s' [receive from [%s]:%s]: " + "failed to reply." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); + } + + close(rpt->fd); + receiver_state_free(rpt); + return web_client_socket_is_now_used_for_streaming(w); + } + if(unlikely(web_client_streaming_rate_t > 0)) { - static netdata_mutex_t stream_rate_mutex = NETDATA_MUTEX_INITIALIZER; - static volatile time_t last_stream_accepted_t = 0; + static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER; + static time_t last_stream_accepted_t = 0; - netdata_mutex_lock(&stream_rate_mutex); time_t now = now_realtime_sec(); + netdata_spinlock_lock(&spinlock); if(unlikely(last_stream_accepted_t == 0)) last_stream_accepted_t = now; if(now - last_stream_accepted_t < web_client_streaming_rate_t) { - netdata_mutex_unlock(&stream_rate_mutex); - rrdhost_system_info_free(system_info); - error("STREAM [receive from [%s]:%s]: too busy to accept new streaming request. Will be allowed in %ld secs.", w->client_ip, w->client_port, (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t))); + netdata_spinlock_unlock(&spinlock); + + char msg[100 + 1]; + snprintfz(msg, 100, + "rate limit, will accept new connection in %ld secs", + (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t))); + + rrdpush_receive_log_status( + rpt, + msg, + "RATE LIMIT TRY LATER"); + + receiver_state_free(rpt); return rrdpush_receiver_too_busy_now(w); } last_stream_accepted_t = now; - netdata_mutex_unlock(&stream_rate_mutex); + netdata_spinlock_unlock(&spinlock); } /* @@ -867,117 +984,85 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { * lock to prevent race-hazard (two threads try to create the host concurrently, one wins and the other does a * lookup to the now-attached structure). */ - struct receiver_state *rpt = callocz(1, sizeof(*rpt)); - rrd_rdlock(); - RRDHOST *host = rrdhost_find_by_guid(machine_guid); - if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */ - host = NULL; - if (host) { - rrdhost_wrlock(host); - netdata_mutex_lock(&host->receiver_lock); - rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); - host->senders_disconnected_time = 0; - if (host->receiver != NULL) { - time_t age = now_realtime_sec() - host->receiver->last_msg_t; - if (age > 30) { - host->receiver->shutdown = 1; - shutdown(host->receiver->fd, SHUT_RDWR); - host->receiver = NULL; // Thread holds reference to structure - info( - "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - " - "existing connection is dead (%"PRId64" sec), accepting new connection.", - rrdhost_hostname(host), - w->client_ip, - w->client_port, - (int64_t)age); - } - else { - netdata_mutex_unlock(&host->receiver_lock); - rrdhost_unlock(host); - rrd_unlock(); - log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, rrdhost_hostname(host), - "REJECTED - ALREADY CONNECTED"); - info( - "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - " - "existing connection is active (within last %"PRId64" sec), rejecting new connection.", - rrdhost_hostname(host), - w->client_ip, - w->client_port, - (int64_t)age); - // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up - buffer_flush(w->response.data); - buffer_strcat(w->response.data, "This GUID is already streaming to this server"); - freez(rpt); - return 409; + { + time_t age = 0; + bool receiver_stale = false; + bool receiver_working = false; + + rrd_rdlock(); + RRDHOST *host = rrdhost_find_by_guid(rpt->machine_guid); + if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */ + host = NULL; + + if (host) { + netdata_mutex_lock(&host->receiver_lock); + if (host->receiver) { + age = now_realtime_sec() - host->receiver->last_msg_t; + + if (age < 30) + receiver_working = true; + else + receiver_stale = true; } + netdata_mutex_unlock(&host->receiver_lock); } - host->receiver = rpt; - netdata_mutex_unlock(&host->receiver_lock); - rrdhost_unlock(host); - } - rrd_unlock(); - - rpt->last_msg_t = now_realtime_sec(); - - rpt->host = host; - rpt->fd = w->ifd; - rpt->key = strdupz(key); - rpt->hostname = strdupz(hostname); - rpt->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname); - rpt->machine_guid = strdupz(machine_guid); - rpt->os = strdupz(os); - rpt->timezone = strdupz(timezone); - rpt->abbrev_timezone = strdupz(abbrev_timezone); - rpt->utc_offset = utc_offset; - rpt->tags = (tags)?strdupz(tags):NULL; - rpt->client_ip = strdupz(w->client_ip); - rpt->client_port = strdupz(w->client_port); - rpt->update_every = update_every; - rpt->system_info = system_info; - rpt->capabilities = stream_version; -#ifdef ENABLE_HTTPS - rpt->ssl.conn = w->ssl.conn; - rpt->ssl.flags = w->ssl.flags; - - w->ssl.conn = NULL; - w->ssl.flags = NETDATA_SSL_START; -#endif - - if(w->user_agent && w->user_agent[0]) { - char *t = strchr(w->user_agent, '/'); - if(t && *t) { - *t = '\0'; - t++; + rrd_unlock(); + + if (receiver_stale && stop_streaming_receiver(host, "STALE RECEIVER")) { + // we stopped the receiver + // we can proceed with this connection + receiver_stale = false; + + info("STREAM '%s' [receive from [%s]:%s]: " + "stopped previous stale receiver to accept this one." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); } - rpt->program_name = strdupz(w->user_agent); - if(t && *t) rpt->program_version = strdupz(t); + if (receiver_working || receiver_stale) { + // another receiver is already connected + // try again later + + char msg[200 + 1]; + snprintfz(msg, 200, + "multiple connections for same host, " + "old connection was used %ld secs ago%s", + age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)"); + + rrdpush_receive_log_status( + rpt, + msg, + "ALREADY CONNECTED"); + + // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up + buffer_flush(w->response.data); + buffer_strcat(w->response.data, "This GUID is already streaming to this server"); + receiver_state_free(rpt); + return HTTP_RESP_CONFLICT; + } } - - debug(D_SYSTEM, "starting STREAM receive thread."); char tag[FILENAME_MAX + 1]; - snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port); - - if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) - error("Failed to create new STREAM receive thread for client."); - - // prevent the caller from closing the streaming socket - if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) { - web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET); - } - else { - if(w->ifd == w->ofd) - w->ifd = w->ofd = -1; - else - w->ifd = -1; + snprintfz(tag, FILENAME_MAX, THREAD_TAG_STREAM_RECEIVER "[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port); + + if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) { + rrdpush_receive_log_status( + rpt, + "can't create receiver thread", + "INTERNAL SERVER ERROR"); + + buffer_flush(w->response.data); + buffer_strcat(w->response.data, "Can't handle this request"); + receiver_state_free(rpt); + return HTTP_RESP_INTERNAL_SERVER_ERROR; } - buffer_flush(w->response.data); - return 200; + // prevent the caller from closing the streaming socket + return web_client_socket_is_now_used_for_streaming(w); } static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) { @@ -995,7 +1080,7 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) } void log_receiver_capabilities(struct receiver_state *rpt) { - BUFFER *wb = buffer_create(100); + BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, rpt->capabilities); info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", @@ -1005,7 +1090,7 @@ void log_receiver_capabilities(struct receiver_state *rpt) { } void log_sender_capabilities(struct sender_state *s) { - BUFFER *wb = buffer_create(100); + BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, s->capabilities); info("STREAM %s [send to %s]: established link with negotiated capabilities: %s", diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index a0c7e8de..94c1320e 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -3,12 +3,14 @@ #ifndef NETDATA_RRDPUSH_H #define NETDATA_RRDPUSH_H 1 -#include "database/rrd.h" #include "libnetdata/libnetdata.h" -#include "web/server/web_client.h" #include "daemon/common.h" +#include "web/server/web_client.h" +#include "database/rrd.h" #define CONNECTED_TO_SIZE 100 +#define CBUFFER_INITIAL_SIZE (16 * 1024) +#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2) // ---------------------------------------------------------------------------- // obsolete versions - do not use anymore @@ -22,6 +24,9 @@ typedef enum { // do not use the first 3 bits + // they used to be versions 1, 2 and 3 + // before we introduce capabilities + STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels) STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol) @@ -37,6 +42,7 @@ typedef enum { STREAM_CAP_REPLICATION = (1 << 12), // replication supported STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data + STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit // needed for negotiating errors between parent and child } STREAM_CAPABILITIES; @@ -125,8 +131,8 @@ struct decompressor_state { // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. typedef enum { - SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown - SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression + SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown + SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression } SENDER_FLAGS; struct sender_state { @@ -155,6 +161,8 @@ struct sender_state { int rrdpush_sender_pipe[2]; // collector to sender thread signaling int rrdpush_sender_socket; + uint16_t hops; + #ifdef ENABLE_COMPRESSION struct compressor_state *compressor; #endif @@ -163,6 +171,11 @@ struct sender_state { #endif struct { + bool shutdown; + const char *reason; + } exit; + + struct { DICTIONARY *requests; // de-duplication of replication requests, per chart struct { @@ -176,9 +189,13 @@ struct sender_state { struct { size_t buffer_used_percentage; // the current utilization of the sending buffer usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC + time_t last_buffer_recreate_s; // true when the sender buffer should be re-created } atomic; }; +#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED) +#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED) + #define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST) #define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST) @@ -216,13 +233,34 @@ struct receiver_state { char *program_name; // Duplicated in pluginsd char *program_version; struct rrdhost_system_info *system_info; - int update_every; STREAM_CAPABILITIES capabilities; time_t last_msg_t; char read_buffer[PLUGINSD_LINE_MAX + 1]; int read_len; - unsigned int shutdown:1; // Tell the thread to exit - unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!) + + uint16_t hops; + + struct { + bool shutdown; // signal the streaming parser to exit + const char *reason; // the reason of disconnection to log + } exit; + + struct { + RRD_MEMORY_MODE mode; + int history; + int update_every; + int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO + time_t alarms_delay; + int rrdpush_enabled; + char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig + char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig + bool rrdpush_enable_replication; + time_t rrdpush_seconds_to_replicate; + time_t rrdpush_replication_step; + char *rrdpush_destination; // DONT FREE - it is allocated in appconfig + unsigned int rrdpush_compression; + } config; + #ifdef ENABLE_HTTPS struct netdata_ssl ssl; #endif @@ -260,11 +298,8 @@ extern unsigned int remote_clock_resync_iterations; void rrdpush_destinations_init(RRDHOST *host); void rrdpush_destinations_free(RRDHOST *host); -void sender_init(RRDHOST *host); - BUFFER *sender_start(struct sender_state *s); void sender_commit(struct sender_state *s, BUFFER *wb); -void sender_cancel(struct sender_state *s); int rrdpush_init(); bool rrdpush_receiver_needs_dbengine(); int configured_as_parent(); @@ -274,8 +309,11 @@ void *rrdpush_sender_thread(void *ptr); void rrdpush_send_host_labels(RRDHOST *host); void rrdpush_claimed_id(RRDHOST *host); +#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended +#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended + int rrdpush_receiver_thread_spawn(struct web_client *w, char *url); -void rrdpush_sender_thread_stop(RRDHOST *host); +void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait); void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva); void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); @@ -295,11 +333,17 @@ struct compressor_state *create_compressor(); struct decompressor_state *create_decompressor(); #endif +void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status); void log_receiver_capabilities(struct receiver_state *rpt); void log_sender_capabilities(struct sender_state *s); STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version); int32_t stream_capabilities_to_vn(uint32_t caps); +void receiver_state_free(struct receiver_state *rpt); +bool stop_streaming_receiver(RRDHOST *host, const char *reason); + +void sender_thread_buffer_free(void); + #include "replication.h" #endif //NETDATA_RRDPUSH_H diff --git a/streaming/sender.c b/streaming/sender.c index 62097e39..854b57fc 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -36,36 +36,41 @@ extern char *netdata_ssl_ca_file; static __thread BUFFER *sender_thread_buffer = NULL; static __thread bool sender_thread_buffer_used = false; +static __thread time_t sender_thread_buffer_last_reset_s = 0; void sender_thread_buffer_free(void) { - if(sender_thread_buffer) { - buffer_free(sender_thread_buffer); - sender_thread_buffer = NULL; - } + buffer_free(sender_thread_buffer); + sender_thread_buffer = NULL; + sender_thread_buffer_used = false; } // Collector thread starting a transmission -BUFFER *sender_start(struct sender_state *s __maybe_unused) { - if(!sender_thread_buffer) - sender_thread_buffer = buffer_create(1024); - - if(sender_thread_buffer_used) +BUFFER *sender_start(struct sender_state *s) { + if(unlikely(sender_thread_buffer_used)) fatal("STREAMING: thread buffer is used multiple times concurrently."); + if(unlikely(rrdpush_sender_last_buffer_recreate_get(s) > sender_thread_buffer_last_reset_s)) { + if(unlikely(sender_thread_buffer && sender_thread_buffer->size > THREAD_BUFFER_INITIAL_SIZE)) { + buffer_free(sender_thread_buffer); + sender_thread_buffer = NULL; + } + } + + if(unlikely(!sender_thread_buffer)) { + sender_thread_buffer = buffer_create(THREAD_BUFFER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_streaming); + sender_thread_buffer_last_reset_s = rrdpush_sender_last_buffer_recreate_get(s); + } + sender_thread_buffer_used = true; buffer_flush(sender_thread_buffer); return sender_thread_buffer; } -void sender_cancel(struct sender_state *s __maybe_unused) { - sender_thread_buffer_used = false; -} - static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); #ifdef ENABLE_COMPRESSION /* -* In case of stream compression buffer oveflow +* In case of stream compression buffer overflow * Inform the user through the error log file and * deactivate compression by downgrading the stream protocol. */ @@ -99,11 +104,11 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { netdata_mutex_lock(&s->mutex); - if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { + if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", - rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE); + rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE); - s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; + s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; } #ifdef ENABLE_COMPRESSION @@ -150,17 +155,17 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { } } - if(cbuffer_add_unsafe(s->host->sender->buffer, dst, dst_len)) + if(cbuffer_add_unsafe(s->buffer, dst, dst_len)) s->flags |= SENDER_FLAG_OVERFLOW; src = src + size_to_compress; src_len -= size_to_compress; } } - else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) + else if(cbuffer_add_unsafe(s->buffer, src, src_len)) s->flags |= SENDER_FLAG_OVERFLOW; #else - if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) + if(cbuffer_add_unsafe(s->buffer, src, src_len)) s->flags |= SENDER_FLAG_OVERFLOW; #endif @@ -186,6 +191,7 @@ void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQU BUFFER *wb = sender_start(host->sender); rrdpush_sender_add_host_variable_to_buffer(wb, rva); sender_commit(host->sender, wb); + sender_thread_buffer_free(); } } @@ -214,6 +220,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp); (void)ret; sender_commit(host->sender, wb); + sender_thread_buffer_free(); debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); } @@ -222,14 +229,12 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { // resets all the chart, so that their definitions // will be resent to the central netdata static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { - error("Clearing stream_collected_metrics flag in charts of host %s", rrdhost_hostname(host)); - RRDSET *st; rrdset_foreach_read(st, host) { rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); - st->upstream_resync_time = 0; + st->upstream_resync_time_s = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) @@ -241,6 +246,30 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { rrdhost_sender_replicating_charts_zero(host); } +static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t now_s, bool have_mutex, bool force) { + static __thread time_t last_reset_time_s = 0; + + if(!force && now_s - last_reset_time_s < 300) + return; + + if(!have_mutex) + netdata_mutex_lock(&s->mutex); + + rrdpush_sender_last_buffer_recreate_set(s, now_s); + last_reset_time_s = now_s; + + if(s->buffer && s->buffer->size > CBUFFER_INITIAL_SIZE) { + size_t max = s->buffer->max_size; + cbuffer_free(s->buffer); + s->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, max, &netdata_buffers_statistics.cbuffers_streaming); + } + + sender_thread_buffer_free(); + + if(!have_mutex) + netdata_mutex_unlock(&s->mutex); +} + static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { rrdpush_sender_set_flush_time(host->sender); @@ -248,6 +277,7 @@ static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { // flush the output buffer from any data it may have cbuffer_flush(host->sender->buffer); + rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true); replication_recalculate_buffer_used_ratio_unsafe(host->sender); netdata_mutex_unlock(&host->sender->mutex); @@ -268,6 +298,9 @@ static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) { static void rrdpush_sender_on_connect(RRDHOST *host) { rrdpush_sender_cbuffer_flush(host); rrdpush_sender_charts_and_replication_reset(host); +} + +static void rrdpush_sender_after_connect(RRDHOST *host) { rrdpush_sender_thread_send_custom_host_variables(host); } @@ -418,13 +451,17 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender return true; } - error("STREAM %s [send to %s]: %s.", rrdhost_hostname(host), s->connected_to, error); - worker_is_busy(worker_job_id); rrdpush_sender_thread_close_socket(host); host->destination->last_error = error; host->destination->last_handshake = version; host->destination->postpone_reconnection_until = now_realtime_sec() + delay; + + char buf[LOG_DATE_LENGTH]; + log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until); + error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", + rrdhost_hostname(host), s->connected_to, error, delay, buf); + return false; } @@ -449,11 +486,11 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p ); if(unlikely(s->rrdpush_sender_socket == -1)) { - error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination); + // error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination); return false; } - info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to); + // info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to); #ifdef ENABLE_HTTPS if(netdata_ssl_client_ctx){ @@ -499,6 +536,8 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p stream_encoded_t se; rrdpush_encode_variable(&se, host); + host->sender->hops = host->system_info->hops + 1; + char http[HTTP_HEADER_SIZE + 1]; int eol = snprintfz(http, HTTP_HEADER_SIZE, "STREAM " @@ -557,7 +596,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p , rrdhost_timezone(host) , rrdhost_abbrev_timezone(host) , host->utc_offset - , host->system_info->hops + 1 + , host->sender->hops , host->system_info->ml_capable , host->system_info->ml_enabled , host->system_info->mc_version @@ -657,7 +696,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p return false; } - info("STREAM %s [send to %s]: waiting response from remote netdata...", rrdhost_hostname(host), s->connected_to); + // info("STREAM %s [send to %s]: waiting response from remote netdata...", rrdhost_hostname(host), s->connected_to); bytes = recv_timeout( #ifdef ENABLE_HTTPS @@ -726,6 +765,8 @@ static bool attempt_to_connect(struct sender_state *state) // let the data collection threads know we are ready rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + rrdpush_sender_after_connect(state->host); + return true; } @@ -738,7 +779,13 @@ static bool attempt_to_connect(struct sender_state *state) state->sent_bytes_on_this_connection = 0; // slow re-connection on repeating errors - sleep_usec(USEC_PER_SEC * state->reconnect_delay); // seconds + usec_t now_ut = now_monotonic_usec(); + usec_t end_ut = now_ut + USEC_PER_SEC * state->reconnect_delay; + while(now_ut < end_ut) { + netdata_thread_testcancel(); + sleep_usec(500 * USEC_PER_MS); // seconds + now_ut = now_monotonic_usec(); + } return false; } @@ -757,8 +804,8 @@ static ssize_t attempt_to_send(struct sender_state *s) { debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); #ifdef ENABLE_HTTPS - SSL *conn = s->host->sender->ssl.conn ; - if(conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + SSL *conn = s->ssl.conn ; + if(conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) ret = netdata_ssl_write(conn, chunk, outstanding); else ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); @@ -793,16 +840,18 @@ static ssize_t attempt_read(struct sender_state *s) { ssize_t ret = 0; #ifdef ENABLE_HTTPS - if (s->host->sender->ssl.conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { + if (s->ssl.conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { size_t desired = sizeof(s->read_buffer) - s->read_len - 1; - ret = netdata_ssl_read(s->host->sender->ssl.conn, s->read_buffer, desired); + ret = netdata_ssl_read(s->ssl.conn, s->read_buffer, desired); if (ret > 0 ) { s->read_len += (int)ret; return ret; } - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); - rrdpush_sender_thread_close_socket(s->host); + if (ret == -1) { + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); + rrdpush_sender_thread_close_socket(s->host); + } return ret; } #endif @@ -852,6 +901,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { pluginsd_function_result_end_to_buffer(wb); sender_commit(s, wb); + sender_thread_buffer_free(); internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).", rrdhost_hostname(s->host), s->connected_to, @@ -876,7 +926,7 @@ void execute_commands(struct sender_state *s) { log_access("STREAM: %d from '%s' for host '%s': %s", gettid(), s->connected_to, rrdhost_hostname(s->host), start); - internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start); + // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start); char *words[PLUGINSD_MAX_WORDS] = { NULL }; size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); @@ -906,7 +956,7 @@ void execute_commands(struct sender_state *s) { tmp->received_ut = now_realtime_usec(); tmp->sender = s; tmp->transaction = string_strdupz(transaction); - BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1); + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp); if(code != HTTP_RESP_OK) { @@ -1019,59 +1069,83 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) { } } -static void rrdpush_sender_thread_cleanup_callback(void *ptr) { - struct rrdpush_sender_thread_data *data = ptr; - worker_unregister(); - - RRDHOST *host = data->host; +static bool rrdhost_set_sender(RRDHOST *host) { + if(unlikely(!host->sender)) return false; + bool ret = false; netdata_mutex_lock(&host->sender->mutex); + if(!host->sender->tid) { + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); + host->sender->tid = gettid(); + ret = true; + } + netdata_mutex_unlock(&host->sender->mutex); + + return ret; +} - info("STREAM %s [send]: sending thread cleans up...", rrdhost_hostname(host)); +static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) { + if(unlikely(!host->sender)) return; - rrdpush_sender_thread_close_socket(host); - rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); + if(host->sender->tid == gettid()) { + host->sender->tid = 0; + host->sender->exit.shutdown = false; + host->sender->exit.reason = NULL; + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + } +} - if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN)) { - info("STREAM %s [send]: sending thread detaches itself.", rrdhost_hostname(host)); - netdata_thread_detach(netdata_thread_self()); +static bool rrdhost_sender_should_exit(struct sender_state *s) { + // check for outstanding cancellation requests + netdata_thread_testcancel(); + + if(unlikely(!service_running(SERVICE_STREAMING))) { + if(!s->exit.reason) + s->exit.reason = "NETDATA EXIT"; + return true; } - rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); + if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) { + if(!s->exit.reason) + s->exit.reason = "NON STREAMABLE HOST"; + return true; + } - info("STREAM %s [send]: sending thread now exits.", rrdhost_hostname(host)); + if(unlikely(s->exit.shutdown)) { + if(!s->exit.reason) + s->exit.reason = "SENDER SHUTDOWN REQUESTED"; + return true; + } - netdata_mutex_unlock(&host->sender->mutex); + if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) { + if(!s->exit.reason) + s->exit.reason = "RECEIVER LEFT"; + return true; + } - freez(data->pipe_buffer); - freez(data); + return false; } -void sender_init(RRDHOST *host) -{ - if (host->sender) - return; +static void rrdpush_sender_thread_cleanup_callback(void *ptr) { + struct rrdpush_sender_thread_data *s = ptr; + worker_unregister(); + + RRDHOST *host = s->host; - host->sender = callocz(1, sizeof(*host->sender)); - host->sender->host = host; - host->sender->buffer = cbuffer_new(1024, 1024 * 1024); - host->sender->capabilities = STREAM_OUR_CAPABILITIES; + netdata_mutex_lock(&host->sender->mutex); + info("STREAM %s [send]: sending thread exits %s", + rrdhost_hostname(host), + host->sender->exit.reason ? host->sender->exit.reason : ""); - host->sender->rrdpush_sender_pipe[PIPE_READ] = -1; - host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1; - host->sender->rrdpush_sender_socket = -1; + rrdpush_sender_thread_close_socket(host); + rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); -#ifdef ENABLE_COMPRESSION - if(default_compression_enabled) { - host->sender->flags |= SENDER_FLAG_COMPRESSION; - host->sender->compressor = create_compressor(); - } - else - host->sender->flags &= ~SENDER_FLAG_COMPRESSION; -#endif + rrdhost_clear_sender___while_having_sender_mutex(host); + netdata_mutex_unlock(&host->sender->mutex); - netdata_mutex_init(&host->sender->mutex); - replication_init_sender(host->sender); + freez(s->pipe_buffer); + freez(s); } void *rrdpush_sender_thread(void *ptr) { @@ -1103,13 +1177,18 @@ void *rrdpush_sender_thread(void *ptr) { worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE); struct sender_state *s = ptr; - s->tid = gettid(); if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination || !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key || !*s->host->rrdpush_send_api_key) { error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", - rrdhost_hostname(s->host), s->tid); + rrdhost_hostname(s->host), gettid()); + return NULL; + } + + if(!rrdhost_set_sender(s->host)) { + error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.", + rrdhost_hostname(s->host), gettid()); return NULL; } @@ -1125,7 +1204,7 @@ void *rrdpush_sender_thread(void *ptr) { } #endif - info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), s->tid); + info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid()); s->timeout = (int)appconfig_get_number( &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600); @@ -1166,28 +1245,33 @@ void *rrdpush_sender_thread(void *ptr) { thread_data->sender_state = s; thread_data->host = s->host; - // reset our cleanup flags - rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN); - netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data); - for(; rrdhost_has_rrdpush_sender_enabled(s->host) && !netdata_exit ;) { - // check for outstanding cancellation requests - netdata_thread_testcancel(); + size_t iterations = 0; + time_t now_s = now_monotonic_sec(); + while(!rrdhost_sender_should_exit(s)) { + iterations++; // The connection attempt blocks (after which we use the socket in nonblocking) if(unlikely(s->rrdpush_sender_socket == -1)) { worker_is_busy(WORKER_SENDER_JOB_CONNECT); + + now_s = now_monotonic_sec(); + rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true); + rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); s->flags &= ~SENDER_FLAG_OVERFLOW; s->read_len = 0; s->buffer->read = 0; s->buffer->write = 0; - if(unlikely(!attempt_to_connect(s))) + if(!attempt_to_connect(s)) continue; - s->last_traffic_seen_t = now_monotonic_sec(); + if(rrdhost_sender_should_exit(s)) + break; + + now_s = s->last_traffic_seen_t = now_monotonic_sec(); rrdpush_claimed_id(s->host); rrdpush_send_host_labels(s->host); @@ -1197,8 +1281,11 @@ void *rrdpush_sender_thread(void *ptr) { continue; } + if(iterations % 1000 == 0) + now_s = now_monotonic_sec(); + // If the TCP window never opened then something is wrong, restart connection - if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout && + if(unlikely(now_s - s->last_traffic_seen_t > s->timeout && !rrdpush_sender_pending_replication_requests(s) && !rrdpush_sender_replicating_charts(s) )) { @@ -1209,11 +1296,13 @@ void *rrdpush_sender_thread(void *ptr) { } netdata_mutex_lock(&s->mutex); - size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, NULL); - size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer); + size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL); + size_t available = cbuffer_available_size_unsafe(s->buffer); + if (unlikely(!outstanding)) + rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false); netdata_mutex_unlock(&s->mutex); - worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->host->sender->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->host->sender->buffer->max_size); + worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size); if(outstanding) s->send_attempts++; @@ -1246,12 +1335,14 @@ void *rrdpush_sender_thread(void *ptr) { .revents = 0, } }; + int poll_rc = poll(fds, 2, 1000); debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...", fds[Collector].revents, fds[Socket].revents, outstanding); - if(unlikely(netdata_exit)) break; + if(unlikely(rrdhost_sender_should_exit(s))) + break; internal_error(fds[Collector].fd != s->rrdpush_sender_pipe[PIPE_READ], "STREAM %s [send to %s]: pipe changed after poll().", rrdhost_hostname(s->host), s->connected_to); @@ -1261,7 +1352,9 @@ void *rrdpush_sender_thread(void *ptr) { // Spurious wake-ups without error - loop again if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) { + netdata_thread_testcancel(); debug(D_STREAM, "Spurious wakeup"); + now_s = now_monotonic_sec(); continue; } |