summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r--streaming/rrdpush.c619
1 files changed, 352 insertions, 267 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index a57f1b080..256fa8282 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -108,7 +108,7 @@ int rrdpush_init() {
default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate);
default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB, "seconds per replication step", default_rrdpush_replication_step);
- rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time);
+ rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s);
#ifdef ENABLE_COMPRESSION
default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
@@ -295,40 +295,14 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) {
- time_t first_entry_local = rrdset_first_entry_t_of_tier(st, 0);
- time_t last_entry_local = st->last_updated.tv_sec;
-
- if(unlikely(!last_entry_local))
- last_entry_local = rrdset_last_entry_t(st);
+ time_t db_first_time_t, db_last_time_t;
time_t now = now_realtime_sec();
- if(unlikely(last_entry_local > now)) {
- internal_error(true,
- "RRDSET REPLAY ERROR: 'host:%s/chart:%s' last updated time %ld is in the future, adjusting it to now %ld",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- last_entry_local, now);
- last_entry_local = now;
- }
-
- if(unlikely(first_entry_local && last_entry_local && first_entry_local >= last_entry_local)) {
- internal_error(true,
- "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first updated time %ld is equal or bigger than last updated time %ld, adjusting it last updated time - update every",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- first_entry_local, last_entry_local);
- first_entry_local = last_entry_local - st->update_every;
- }
-
- if(unlikely(!first_entry_local && last_entry_local)) {
- internal_error(true,
- "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first time %ld, last time %ld, setting both to last time",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- first_entry_local, last_entry_local);
- first_entry_local = last_entry_local;
- }
+ rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_time_t, &db_last_time_t, now, 0);
buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n",
- (unsigned long long)first_entry_local,
- (unsigned long long)last_entry_local,
+ (unsigned long long)db_first_time_t,
+ (unsigned long long)db_last_time_t,
(unsigned long long)now);
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
@@ -342,17 +316,17 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
#endif
}
- st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
+ st->upstream_resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
return replication_progress;
}
// sends the current chart dimensions
-static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) {
+static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s __maybe_unused, RRDSET_FLAGS flags) {
buffer_fast_strcat(wb, "BEGIN \"", 7);
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "\" ", 2);
- if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time)
+ if(st->last_collected_time.tv_sec > st->upstream_resync_time_s)
buffer_print_llu(wb, st->usec_since_last_update);
else
buffer_fast_strcat(wb, "0", 1);
@@ -399,6 +373,7 @@ bool rrdset_push_chart_definition_now(RRDSET *st) {
BUFFER *wb = sender_start(host->sender);
rrdpush_send_chart_definition(wb, st);
sender_commit(host->sender, wb);
+ sender_thread_buffer_free();
return true;
}
@@ -463,6 +438,8 @@ void rrdpush_send_host_labels(RRDHOST *host) {
buffer_sprintf(wb, "OVERWRITE %s\n", "labels");
sender_commit(host->sender, wb);
+
+ sender_thread_buffer_free();
}
void rrdpush_claimed_id(RRDHOST *host)
@@ -480,6 +457,8 @@ void rrdpush_claimed_id(RRDHOST *host)
rrdhost_aclk_state_unlock(host);
sender_commit(host->sender, wb);
+
+ sender_thread_buffer_free();
}
int connect_to_one_of_destinations(
@@ -496,20 +475,11 @@ int connect_to_one_of_destinations(
for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) {
time_t now = now_realtime_sec();
- if(d->postpone_reconnection_until > now) {
- info(
- "STREAM %s: skipping destination '%s' (default port: %d) due to last error (code: %d, %s), will retry it in %d seconds",
- rrdhost_hostname(host),
- string2str(d->destination),
- default_port,
- d->last_handshake, d->last_error?d->last_error:"unset reason description",
- (int)(d->postpone_reconnection_until - now));
-
+ if(d->postpone_reconnection_until > now)
continue;
- }
info(
- "STREAM %s: attempting to connect to '%s' (default port: %d)...",
+ "STREAM %s: connecting to '%s' (default port: %d)...",
rrdhost_hostname(host),
string2str(d->destination),
default_port);
@@ -528,8 +498,8 @@ int connect_to_one_of_destinations(
// move the current item to the end of the list
// without this, this destination will break the loop again and again
// not advancing the destinations to find one that may work
- DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, d, prev, next);
- DOUBLE_LINKED_LIST_APPEND_UNSAFE(host->destinations, d, prev, next);
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, d, prev, next);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(host->destinations, d, prev, next);
break;
}
@@ -550,7 +520,9 @@ bool destinations_init_add_one(char *entry, void *data) {
struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
d->destination = string_strdupz(entry);
- DOUBLE_LINKED_LIST_APPEND_UNSAFE(t->list, d, prev, next);
+ __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
+
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next);
t->count++;
info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
@@ -577,9 +549,10 @@ void rrdpush_destinations_init(RRDHOST *host) {
void rrdpush_destinations_free(RRDHOST *host) {
while (host->destinations) {
struct rrdpush_destinations *tmp = host->destinations;
- DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, tmp, prev, next);
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, tmp, prev, next);
string_freez(tmp->destination);
freez(tmp);
+ __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
}
host->destinations = NULL;
@@ -590,25 +563,16 @@ void rrdpush_destinations_free(RRDHOST *host) {
// Either the receiver lost the connection or the host is being destroyed.
// The sender mutex guards thread creation, any spurious data is wiped on reconnection.
-void rrdpush_sender_thread_stop(RRDHOST *host) {
-
+void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) {
if (!host->sender)
return;
netdata_mutex_lock(&host->sender->mutex);
- netdata_thread_t thr = 0;
if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
- rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
-
- info("STREAM %s [send]: signaling sending thread to stop...", rrdhost_hostname(host));
-
- // signal the thread that we want to join it
- rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN);
- // copy the thread id, so that we will be waiting for the right one
- // even if a new one has been spawn
- thr = host->rrdpush_sender_thread;
+ host->sender->exit.shutdown = true;
+ host->sender->exit.reason = reason;
// signal it to cancel
netdata_thread_cancel(host->rrdpush_sender_thread);
@@ -616,11 +580,14 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
netdata_mutex_unlock(&host->sender->mutex);
- if(thr != 0) {
- info("STREAM %s [send]: waiting for the sending thread to stop...", rrdhost_hostname(host));
- void *result;
- netdata_thread_join(thr, &result);
- info("STREAM %s [send]: sending thread has exited.", rrdhost_hostname(host));
+ if(wait) {
+ netdata_mutex_lock(&host->sender->mutex);
+ while(host->sender->tid) {
+ netdata_mutex_unlock(&host->sender->mutex);
+ sleep_usec(10 * USEC_PER_MS);
+ netdata_mutex_lock(&host->sender->mutex);
+ }
+ netdata_mutex_unlock(&host->sender->mutex);
}
}
@@ -638,9 +605,9 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) {
if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", rrdhost_hostname(host));
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host));
- if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host->sender))
+ if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender))
error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
else
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
@@ -654,7 +621,7 @@ int rrdpush_receiver_permission_denied(struct web_client *w) {
// to prevent an attacker from gaining info about the error
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info.");
- return 401;
+ return HTTP_RESP_UNAUTHORIZED;
}
int rrdpush_receiver_too_busy_now(struct web_client *w) {
@@ -662,21 +629,42 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) {
// to prevent an attacker from gaining info about the error
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later.");
- return 503;
+ return HTTP_RESP_SERVICE_UNAVAILABLE;
}
void *rrdpush_receiver_thread(void *ptr);
int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
- info("clients wants to STREAM metrics.");
- char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *abbrev_timezone = "UTC", *tags = NULL;
- int32_t utc_offset = 0;
- int update_every = default_rrd_update_every;
- uint32_t stream_version = UINT_MAX;
- char buf[GUID_LEN + 1];
+ if(!service_running(ABILITY_STREAMING_CONNECTIONS))
+ return rrdpush_receiver_too_busy_now(w);
+
+ struct receiver_state *rpt = callocz(1, sizeof(*rpt));
+ rpt->last_msg_t = now_realtime_sec();
+ rpt->capabilities = STREAM_CAP_INVALID;
+ rpt->hops = 1;
+
+ __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
+ __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
+
+ rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info));
+ rpt->system_info->hops = rpt->hops;
+
+ rpt->fd = w->ifd;
+ rpt->client_ip = strdupz(w->client_ip);
+ rpt->client_port = strdupz(w->client_port);
+
+ rpt->config.update_every = default_rrd_update_every;
+
+#ifdef ENABLE_HTTPS
+ rpt->ssl.conn = w->ssl.conn;
+ rpt->ssl.flags = w->ssl.flags;
+
+ w->ssl.conn = NULL;
+ w->ssl.flags = NETDATA_SSL_START;
+#endif
+
+ // parse the parameters and fill rpt and rpt->system_info
- struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
- system_info->hops = 1;
while(url) {
char *value = mystrsep(&url, "&");
if(!value || !*value) continue;
@@ -685,178 +673,307 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
if(!name || !*name) continue;
if(!value || !*value) continue;
- if(!strcmp(name, "key"))
- key = value;
- else if(!strcmp(name, "hostname"))
- hostname = value;
- else if(!strcmp(name, "registry_hostname"))
- registry_hostname = value;
- else if(!strcmp(name, "machine_guid"))
- machine_guid = value;
+ if(!strcmp(name, "key") && !rpt->key)
+ rpt->key = strdupz(value);
+
+ else if(!strcmp(name, "hostname") && !rpt->hostname)
+ rpt->hostname = strdupz(value);
+
+ else if(!strcmp(name, "registry_hostname") && !rpt->registry_hostname)
+ rpt->registry_hostname = strdupz(value);
+
+ else if(!strcmp(name, "machine_guid") && !rpt->machine_guid)
+ rpt->machine_guid = strdupz(value);
+
else if(!strcmp(name, "update_every"))
- update_every = (int)strtoul(value, NULL, 0);
- else if(!strcmp(name, "os"))
- os = value;
- else if(!strcmp(name, "timezone"))
- timezone = value;
- else if(!strcmp(name, "abbrev_timezone"))
- abbrev_timezone = value;
+ rpt->config.update_every = (int)strtoul(value, NULL, 0);
+
+ else if(!strcmp(name, "os") && !rpt->os)
+ rpt->os = strdupz(value);
+
+ else if(!strcmp(name, "timezone") && !rpt->timezone)
+ rpt->timezone = strdupz(value);
+
+ else if(!strcmp(name, "abbrev_timezone") && !rpt->abbrev_timezone)
+ rpt->abbrev_timezone = strdupz(value);
+
else if(!strcmp(name, "utc_offset"))
- utc_offset = (int32_t)strtol(value, NULL, 0);
+ rpt->utc_offset = (int32_t)strtol(value, NULL, 0);
+
else if(!strcmp(name, "hops"))
- system_info->hops = (uint16_t) strtoul(value, NULL, 0);
+ rpt->hops = rpt->system_info->hops = (uint16_t) strtoul(value, NULL, 0);
+
else if(!strcmp(name, "ml_capable"))
- system_info->ml_capable = strtoul(value, NULL, 0);
+ rpt->system_info->ml_capable = strtoul(value, NULL, 0);
+
else if(!strcmp(name, "ml_enabled"))
- system_info->ml_enabled = strtoul(value, NULL, 0);
+ rpt->system_info->ml_enabled = strtoul(value, NULL, 0);
+
else if(!strcmp(name, "mc_version"))
- system_info->mc_version = strtoul(value, NULL, 0);
- else if(!strcmp(name, "tags"))
- tags = value;
- else if(!strcmp(name, "ver"))
- stream_version = convert_stream_version_to_capabilities(strtoul(value, NULL, 0));
+ rpt->system_info->mc_version = strtoul(value, NULL, 0);
+
+ else if(!strcmp(name, "tags") && !rpt->tags)
+ rpt->tags = strdupz(value);
+
+ else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID))
+ rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0));
+
else {
// An old Netdata child does not have a compatible streaming protocol, map to something sane.
if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME"))
name = "NETDATA_HOST_OS_NAME";
+
else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID"))
name = "NETDATA_HOST_OS_ID";
+
else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE"))
name = "NETDATA_HOST_OS_ID_LIKE";
+
else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION"))
name = "NETDATA_HOST_OS_VERSION";
+
else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID"))
name = "NETDATA_HOST_OS_VERSION_ID";
+
else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION"))
name = "NETDATA_HOST_OS_DETECTION";
- else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && stream_version == UINT_MAX) {
- stream_version = convert_stream_version_to_capabilities(1);
- }
- if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
- info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.",
- w->client_ip, w->client_port, name, value);
+ else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID))
+ rpt->capabilities = convert_stream_version_to_capabilities(1);
+
+ if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) {
+ info("STREAM '%s' [receive from [%s]:%s]: "
+ "request has parameter '%s' = '%s', which is not used."
+ , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-"
+ , rpt->client_ip, rpt->client_port
+ , name, value);
}
}
}
- if (stream_version == UINT_MAX)
- stream_version = convert_stream_version_to_capabilities(0);
+ if (rpt->capabilities & STREAM_CAP_INVALID)
+ // no version is supplied, assume version 0;
+ rpt->capabilities = convert_stream_version_to_capabilities(0);
- if(!key || !*key) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO KEY");
- error("STREAM [receive from [%s]:%s]: request without an API key. Forbidding access.", w->client_ip, w->client_port);
- return rrdpush_receiver_permission_denied(w);
+ // find the program name and version
+ if(w->user_agent && w->user_agent[0]) {
+ char *t = strchr(w->user_agent, '/');
+ if(t && *t) {
+ *t = '\0';
+ t++;
+ }
+
+ rpt->program_name = strdupz(w->user_agent);
+ if(t && *t) rpt->program_version = strdupz(t);
}
- if(!hostname || !*hostname) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO HOSTNAME");
- error("STREAM [receive from [%s]:%s]: request without a hostname. Forbidding access.", w->client_ip, w->client_port);
+ // check if we should accept this connection
+
+ if(!rpt->key || !*rpt->key) {
+ rrdpush_receive_log_status(
+ rpt,
+ "request without an API key",
+ "NO API KEY PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
- if(!machine_guid || !*machine_guid) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO MACHINE GUID");
- error("STREAM [receive from [%s]:%s]: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port);
+ if(!rpt->hostname || !*rpt->hostname) {
+ rrdpush_receive_log_status(
+ rpt,
+ "request without a hostname",
+ "NO HOSTNAME PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
- if(regenerate_guid(key, buf) == -1) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID KEY");
- error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key);
+ if(!rpt->registry_hostname)
+ rpt->registry_hostname = strdupz(rpt->hostname);
+
+ if(!rpt->machine_guid || !*rpt->machine_guid) {
+ rrdpush_receive_log_status(
+ rpt,
+ "request without a machine GUID",
+ "NO MACHINE GUID PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
- if(regenerate_guid(machine_guid, buf) == -1) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID MACHINE GUID");
- error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid);
- return rrdpush_receiver_permission_denied(w);
+ {
+ char buf[GUID_LEN + 1];
+
+ if (regenerate_guid(rpt->key, buf) == -1) {
+ rrdpush_receive_log_status(
+ rpt,
+ "API key is not a valid UUID (use the command uuidgen to generate one)",
+ "INVALID API KEY PERMISSION DENIED");
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if (regenerate_guid(rpt->machine_guid, buf) == -1) {
+ rrdpush_receive_log_status(
+ rpt,
+ "machine GUID is not a valid UUID",
+ "INVALID MACHINE GUID PERMISSION DENIED");
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
}
- const char *api_key_type = appconfig_get(&stream_config, key, "type", "api");
+ const char *api_key_type = appconfig_get(&stream_config, rpt->key, "type", "api");
if(!api_key_type || !*api_key_type) api_key_type = "unknown";
if(strcmp(api_key_type, "api") != 0) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - API KEY GIVEN IS NOT API KEY");
- error("STREAM [receive from [%s]:%s]: API key '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, key, api_key_type);
+ rrdpush_receive_log_status(
+ rpt,
+ "API key is a machine GUID",
+ "INVALID API KEY PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
- if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ENABLED");
- error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
+ if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) {
+ rrdpush_receive_log_status(
+ rpt,
+ "API key is not enabled",
+ "API KEY DISABLED PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
{
- SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
+ SIMPLE_PATTERN *key_allow_from = simple_pattern_create(
+ appconfig_get(&stream_config, rpt->key, "allow from", "*"),
+ NULL, SIMPLE_PATTERN_EXACT);
+
if(key_allow_from) {
if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
simple_pattern_free(key_allow_from);
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP");
- error("STREAM [receive from [%s]:%s]: API key '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, key);
+
+ rrdpush_receive_log_status(
+ rpt,
+ "API key is not allowed from this IP",
+ "NOT ALLOWED IP PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
+
simple_pattern_free(key_allow_from);
}
}
- const char *machine_guid_type = appconfig_get(&stream_config, machine_guid, "type", "machine");
- if(!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown";
- if(strcmp(machine_guid_type, "machine") != 0) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID GIVEN IS NOT A MACHINE GUID");
- error("STREAM [receive from [%s]:%s]: machine GUID '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid, machine_guid_type);
- return rrdpush_receiver_permission_denied(w);
+ {
+ const char *machine_guid_type = appconfig_get(&stream_config, rpt->machine_guid, "type", "machine");
+ if (!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown";
+
+ if (strcmp(machine_guid_type, "machine") != 0) {
+ rrdpush_receive_log_status(
+ rpt,
+ "machine GUID is an API key",
+ "INVALID MACHINE GUID PERMISSION DENIED");
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
}
- if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ENABLED");
- error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
+ if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) {
+ rrdpush_receive_log_status(
+ rpt,
+ "machine GUID is not enabled",
+ "MACHINE GUID DISABLED PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
{
- SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
+ SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(
+ appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"),
+ NULL, SIMPLE_PATTERN_EXACT);
+
if(machine_allow_from) {
if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
simple_pattern_free(machine_allow_from);
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP");
- error("STREAM [receive from [%s]:%s]: Machine GUID '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, machine_guid);
+
+ rrdpush_receive_log_status(
+ rpt,
+ "machine GUID is not allowed from this IP",
+ "NOT ALLOWED IP PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
+
simple_pattern_free(machine_allow_from);
}
}
+ if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
+
+ rrdpush_receive_log_status(
+ rpt,
+ "machine GUID is my own",
+ "LOCALHOST PERMISSION DENIED");
+
+ char initial_response[HTTP_HEADER_SIZE + 1];
+ snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
+
+ if(send_timeout(
+#ifdef ENABLE_HTTPS
+ &rpt->ssl,
+#endif
+ rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
+
+ error("STREAM '%s' [receive from [%s]:%s]: "
+ "failed to reply."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
+ }
+
+ close(rpt->fd);
+ receiver_state_free(rpt);
+ return web_client_socket_is_now_used_for_streaming(w);
+ }
+
if(unlikely(web_client_streaming_rate_t > 0)) {
- static netdata_mutex_t stream_rate_mutex = NETDATA_MUTEX_INITIALIZER;
- static volatile time_t last_stream_accepted_t = 0;
+ static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
+ static time_t last_stream_accepted_t = 0;
- netdata_mutex_lock(&stream_rate_mutex);
time_t now = now_realtime_sec();
+ netdata_spinlock_lock(&spinlock);
if(unlikely(last_stream_accepted_t == 0))
last_stream_accepted_t = now;
if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
- netdata_mutex_unlock(&stream_rate_mutex);
- rrdhost_system_info_free(system_info);
- error("STREAM [receive from [%s]:%s]: too busy to accept new streaming request. Will be allowed in %ld secs.", w->client_ip, w->client_port, (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
+ netdata_spinlock_unlock(&spinlock);
+
+ char msg[100 + 1];
+ snprintfz(msg, 100,
+ "rate limit, will accept new connection in %ld secs",
+ (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
+
+ rrdpush_receive_log_status(
+ rpt,
+ msg,
+ "RATE LIMIT TRY LATER");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_too_busy_now(w);
}
last_stream_accepted_t = now;
- netdata_mutex_unlock(&stream_rate_mutex);
+ netdata_spinlock_unlock(&spinlock);
}
/*
@@ -867,117 +984,85 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
* lock to prevent race-hazard (two threads try to create the host concurrently, one wins and the other does a
* lookup to the now-attached structure).
*/
- struct receiver_state *rpt = callocz(1, sizeof(*rpt));
- rrd_rdlock();
- RRDHOST *host = rrdhost_find_by_guid(machine_guid);
- if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
- host = NULL;
- if (host) {
- rrdhost_wrlock(host);
- netdata_mutex_lock(&host->receiver_lock);
- rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
- host->senders_disconnected_time = 0;
- if (host->receiver != NULL) {
- time_t age = now_realtime_sec() - host->receiver->last_msg_t;
- if (age > 30) {
- host->receiver->shutdown = 1;
- shutdown(host->receiver->fd, SHUT_RDWR);
- host->receiver = NULL; // Thread holds reference to structure
- info(
- "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - "
- "existing connection is dead (%"PRId64" sec), accepting new connection.",
- rrdhost_hostname(host),
- w->client_ip,
- w->client_port,
- (int64_t)age);
- }
- else {
- netdata_mutex_unlock(&host->receiver_lock);
- rrdhost_unlock(host);
- rrd_unlock();
- log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, rrdhost_hostname(host),
- "REJECTED - ALREADY CONNECTED");
- info(
- "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - "
- "existing connection is active (within last %"PRId64" sec), rejecting new connection.",
- rrdhost_hostname(host),
- w->client_ip,
- w->client_port,
- (int64_t)age);
- // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
- buffer_flush(w->response.data);
- buffer_strcat(w->response.data, "This GUID is already streaming to this server");
- freez(rpt);
- return 409;
+ {
+ time_t age = 0;
+ bool receiver_stale = false;
+ bool receiver_working = false;
+
+ rrd_rdlock();
+ RRDHOST *host = rrdhost_find_by_guid(rpt->machine_guid);
+ if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
+ host = NULL;
+
+ if (host) {
+ netdata_mutex_lock(&host->receiver_lock);
+ if (host->receiver) {
+ age = now_realtime_sec() - host->receiver->last_msg_t;
+
+ if (age < 30)
+ receiver_working = true;
+ else
+ receiver_stale = true;
}
+ netdata_mutex_unlock(&host->receiver_lock);
}
- host->receiver = rpt;
- netdata_mutex_unlock(&host->receiver_lock);
- rrdhost_unlock(host);
- }
- rrd_unlock();
-
- rpt->last_msg_t = now_realtime_sec();
-
- rpt->host = host;
- rpt->fd = w->ifd;
- rpt->key = strdupz(key);
- rpt->hostname = strdupz(hostname);
- rpt->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname);
- rpt->machine_guid = strdupz(machine_guid);
- rpt->os = strdupz(os);
- rpt->timezone = strdupz(timezone);
- rpt->abbrev_timezone = strdupz(abbrev_timezone);
- rpt->utc_offset = utc_offset;
- rpt->tags = (tags)?strdupz(tags):NULL;
- rpt->client_ip = strdupz(w->client_ip);
- rpt->client_port = strdupz(w->client_port);
- rpt->update_every = update_every;
- rpt->system_info = system_info;
- rpt->capabilities = stream_version;
-#ifdef ENABLE_HTTPS
- rpt->ssl.conn = w->ssl.conn;
- rpt->ssl.flags = w->ssl.flags;
-
- w->ssl.conn = NULL;
- w->ssl.flags = NETDATA_SSL_START;
-#endif
-
- if(w->user_agent && w->user_agent[0]) {
- char *t = strchr(w->user_agent, '/');
- if(t && *t) {
- *t = '\0';
- t++;
+ rrd_unlock();
+
+ if (receiver_stale && stop_streaming_receiver(host, "STALE RECEIVER")) {
+ // we stopped the receiver
+ // we can proceed with this connection
+ receiver_stale = false;
+
+ info("STREAM '%s' [receive from [%s]:%s]: "
+ "stopped previous stale receiver to accept this one."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
}
- rpt->program_name = strdupz(w->user_agent);
- if(t && *t) rpt->program_version = strdupz(t);
+ if (receiver_working || receiver_stale) {
+ // another receiver is already connected
+ // try again later
+
+ char msg[200 + 1];
+ snprintfz(msg, 200,
+ "multiple connections for same host, "
+ "old connection was used %ld secs ago%s",
+ age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)");
+
+ rrdpush_receive_log_status(
+ rpt,
+ msg,
+ "ALREADY CONNECTED");
+
+ // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
+ buffer_flush(w->response.data);
+ buffer_strcat(w->response.data, "This GUID is already streaming to this server");
+ receiver_state_free(rpt);
+ return HTTP_RESP_CONFLICT;
+ }
}
-
-
debug(D_SYSTEM, "starting STREAM receive thread.");
char tag[FILENAME_MAX + 1];
- snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
-
- if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
- error("Failed to create new STREAM receive thread for client.");
-
- // prevent the caller from closing the streaming socket
- if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) {
- web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET);
- }
- else {
- if(w->ifd == w->ofd)
- w->ifd = w->ofd = -1;
- else
- w->ifd = -1;
+ snprintfz(tag, FILENAME_MAX, THREAD_TAG_STREAM_RECEIVER "[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
+
+ if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) {
+ rrdpush_receive_log_status(
+ rpt,
+ "can't create receiver thread",
+ "INTERNAL SERVER ERROR");
+
+ buffer_flush(w->response.data);
+ buffer_strcat(w->response.data, "Can't handle this request");
+ receiver_state_free(rpt);
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
}
- buffer_flush(w->response.data);
- return 200;
+ // prevent the caller from closing the streaming socket
+ return web_client_socket_is_now_used_for_streaming(w);
}
static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
@@ -995,7 +1080,7 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps)
}
void log_receiver_capabilities(struct receiver_state *rpt) {
- BUFFER *wb = buffer_create(100);
+ BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, rpt->capabilities);
info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
@@ -1005,7 +1090,7 @@ void log_receiver_capabilities(struct receiver_state *rpt) {
}
void log_sender_capabilities(struct sender_state *s) {
- BUFFER *wb = buffer_create(100);
+ BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, s->capabilities);
info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",