diff options
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r-- | streaming/rrdpush.c | 619 |
1 files changed, 352 insertions, 267 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index a57f1b080..256fa8282 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -108,7 +108,7 @@ int rrdpush_init() { default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate); default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB, "seconds per replication step", default_rrdpush_replication_step); - rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time); + rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s); #ifdef ENABLE_COMPRESSION default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, @@ -295,40 +295,14 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) { - time_t first_entry_local = rrdset_first_entry_t_of_tier(st, 0); - time_t last_entry_local = st->last_updated.tv_sec; - - if(unlikely(!last_entry_local)) - last_entry_local = rrdset_last_entry_t(st); + time_t db_first_time_t, db_last_time_t; time_t now = now_realtime_sec(); - if(unlikely(last_entry_local > now)) { - internal_error(true, - "RRDSET REPLAY ERROR: 'host:%s/chart:%s' last updated time %ld is in the future, adjusting it to now %ld", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - last_entry_local, now); - last_entry_local = now; - } - - if(unlikely(first_entry_local && last_entry_local && first_entry_local >= last_entry_local)) { - internal_error(true, - "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first updated time %ld is equal or bigger than last updated time %ld, adjusting it last updated time - update every", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - first_entry_local, last_entry_local); - first_entry_local = last_entry_local - st->update_every; - } - - if(unlikely(!first_entry_local && last_entry_local)) { - internal_error(true, - "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first time %ld, last time %ld, setting both to last time", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - first_entry_local, last_entry_local); - first_entry_local = last_entry_local; - } + rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_time_t, &db_last_time_t, now, 0); buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n", - (unsigned long long)first_entry_local, - (unsigned long long)last_entry_local, + (unsigned long long)db_first_time_t, + (unsigned long long)db_last_time_t, (unsigned long long)now); rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); @@ -342,17 +316,17 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { #endif } - st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); + st->upstream_resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); return replication_progress; } // sends the current chart dimensions -static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) { +static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s __maybe_unused, RRDSET_FLAGS flags) { buffer_fast_strcat(wb, "BEGIN \"", 7); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "\" ", 2); - if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time) + if(st->last_collected_time.tv_sec > st->upstream_resync_time_s) buffer_print_llu(wb, st->usec_since_last_update); else buffer_fast_strcat(wb, "0", 1); @@ -399,6 +373,7 @@ bool rrdset_push_chart_definition_now(RRDSET *st) { BUFFER *wb = sender_start(host->sender); rrdpush_send_chart_definition(wb, st); sender_commit(host->sender, wb); + sender_thread_buffer_free(); return true; } @@ -463,6 +438,8 @@ void rrdpush_send_host_labels(RRDHOST *host) { buffer_sprintf(wb, "OVERWRITE %s\n", "labels"); sender_commit(host->sender, wb); + + sender_thread_buffer_free(); } void rrdpush_claimed_id(RRDHOST *host) @@ -480,6 +457,8 @@ void rrdpush_claimed_id(RRDHOST *host) rrdhost_aclk_state_unlock(host); sender_commit(host->sender, wb); + + sender_thread_buffer_free(); } int connect_to_one_of_destinations( @@ -496,20 +475,11 @@ int connect_to_one_of_destinations( for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) { time_t now = now_realtime_sec(); - if(d->postpone_reconnection_until > now) { - info( - "STREAM %s: skipping destination '%s' (default port: %d) due to last error (code: %d, %s), will retry it in %d seconds", - rrdhost_hostname(host), - string2str(d->destination), - default_port, - d->last_handshake, d->last_error?d->last_error:"unset reason description", - (int)(d->postpone_reconnection_until - now)); - + if(d->postpone_reconnection_until > now) continue; - } info( - "STREAM %s: attempting to connect to '%s' (default port: %d)...", + "STREAM %s: connecting to '%s' (default port: %d)...", rrdhost_hostname(host), string2str(d->destination), default_port); @@ -528,8 +498,8 @@ int connect_to_one_of_destinations( // move the current item to the end of the list // without this, this destination will break the loop again and again // not advancing the destinations to find one that may work - DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, d, prev, next); - DOUBLE_LINKED_LIST_APPEND_UNSAFE(host->destinations, d, prev, next); + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, d, prev, next); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(host->destinations, d, prev, next); break; } @@ -550,7 +520,9 @@ bool destinations_init_add_one(char *entry, void *data) { struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations)); d->destination = string_strdupz(entry); - DOUBLE_LINKED_LIST_APPEND_UNSAFE(t->list, d, prev, next); + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED); + + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next); t->count++; info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); @@ -577,9 +549,10 @@ void rrdpush_destinations_init(RRDHOST *host) { void rrdpush_destinations_free(RRDHOST *host) { while (host->destinations) { struct rrdpush_destinations *tmp = host->destinations; - DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, tmp, prev, next); + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, tmp, prev, next); string_freez(tmp->destination); freez(tmp); + __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED); } host->destinations = NULL; @@ -590,25 +563,16 @@ void rrdpush_destinations_free(RRDHOST *host) { // Either the receiver lost the connection or the host is being destroyed. // The sender mutex guards thread creation, any spurious data is wiped on reconnection. -void rrdpush_sender_thread_stop(RRDHOST *host) { - +void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) { if (!host->sender) return; netdata_mutex_lock(&host->sender->mutex); - netdata_thread_t thr = 0; if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { - rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); - - info("STREAM %s [send]: signaling sending thread to stop...", rrdhost_hostname(host)); - - // signal the thread that we want to join it - rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN); - // copy the thread id, so that we will be waiting for the right one - // even if a new one has been spawn - thr = host->rrdpush_sender_thread; + host->sender->exit.shutdown = true; + host->sender->exit.reason = reason; // signal it to cancel netdata_thread_cancel(host->rrdpush_sender_thread); @@ -616,11 +580,14 @@ void rrdpush_sender_thread_stop(RRDHOST *host) { netdata_mutex_unlock(&host->sender->mutex); - if(thr != 0) { - info("STREAM %s [send]: waiting for the sending thread to stop...", rrdhost_hostname(host)); - void *result; - netdata_thread_join(thr, &result); - info("STREAM %s [send]: sending thread has exited.", rrdhost_hostname(host)); + if(wait) { + netdata_mutex_lock(&host->sender->mutex); + while(host->sender->tid) { + netdata_mutex_unlock(&host->sender->mutex); + sleep_usec(10 * USEC_PER_MS); + netdata_mutex_lock(&host->sender->mutex); + } + netdata_mutex_unlock(&host->sender->mutex); } } @@ -638,9 +605,9 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) { if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { char tag[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", rrdhost_hostname(host)); + snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host)); - if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host->sender)) + if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender)) error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); else rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); @@ -654,7 +621,7 @@ int rrdpush_receiver_permission_denied(struct web_client *w) { // to prevent an attacker from gaining info about the error buffer_flush(w->response.data); buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info."); - return 401; + return HTTP_RESP_UNAUTHORIZED; } int rrdpush_receiver_too_busy_now(struct web_client *w) { @@ -662,21 +629,42 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) { // to prevent an attacker from gaining info about the error buffer_flush(w->response.data); buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later."); - return 503; + return HTTP_RESP_SERVICE_UNAVAILABLE; } void *rrdpush_receiver_thread(void *ptr); int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { - info("clients wants to STREAM metrics."); - char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *abbrev_timezone = "UTC", *tags = NULL; - int32_t utc_offset = 0; - int update_every = default_rrd_update_every; - uint32_t stream_version = UINT_MAX; - char buf[GUID_LEN + 1]; + if(!service_running(ABILITY_STREAMING_CONNECTIONS)) + return rrdpush_receiver_too_busy_now(w); + + struct receiver_state *rpt = callocz(1, sizeof(*rpt)); + rpt->last_msg_t = now_realtime_sec(); + rpt->capabilities = STREAM_CAP_INVALID; + rpt->hops = 1; + + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); + + rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info)); + rpt->system_info->hops = rpt->hops; + + rpt->fd = w->ifd; + rpt->client_ip = strdupz(w->client_ip); + rpt->client_port = strdupz(w->client_port); + + rpt->config.update_every = default_rrd_update_every; + +#ifdef ENABLE_HTTPS + rpt->ssl.conn = w->ssl.conn; + rpt->ssl.flags = w->ssl.flags; + + w->ssl.conn = NULL; + w->ssl.flags = NETDATA_SSL_START; +#endif + + // parse the parameters and fill rpt and rpt->system_info - struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); - system_info->hops = 1; while(url) { char *value = mystrsep(&url, "&"); if(!value || !*value) continue; @@ -685,178 +673,307 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { if(!name || !*name) continue; if(!value || !*value) continue; - if(!strcmp(name, "key")) - key = value; - else if(!strcmp(name, "hostname")) - hostname = value; - else if(!strcmp(name, "registry_hostname")) - registry_hostname = value; - else if(!strcmp(name, "machine_guid")) - machine_guid = value; + if(!strcmp(name, "key") && !rpt->key) + rpt->key = strdupz(value); + + else if(!strcmp(name, "hostname") && !rpt->hostname) + rpt->hostname = strdupz(value); + + else if(!strcmp(name, "registry_hostname") && !rpt->registry_hostname) + rpt->registry_hostname = strdupz(value); + + else if(!strcmp(name, "machine_guid") && !rpt->machine_guid) + rpt->machine_guid = strdupz(value); + else if(!strcmp(name, "update_every")) - update_every = (int)strtoul(value, NULL, 0); - else if(!strcmp(name, "os")) - os = value; - else if(!strcmp(name, "timezone")) - timezone = value; - else if(!strcmp(name, "abbrev_timezone")) - abbrev_timezone = value; + rpt->config.update_every = (int)strtoul(value, NULL, 0); + + else if(!strcmp(name, "os") && !rpt->os) + rpt->os = strdupz(value); + + else if(!strcmp(name, "timezone") && !rpt->timezone) + rpt->timezone = strdupz(value); + + else if(!strcmp(name, "abbrev_timezone") && !rpt->abbrev_timezone) + rpt->abbrev_timezone = strdupz(value); + else if(!strcmp(name, "utc_offset")) - utc_offset = (int32_t)strtol(value, NULL, 0); + rpt->utc_offset = (int32_t)strtol(value, NULL, 0); + else if(!strcmp(name, "hops")) - system_info->hops = (uint16_t) strtoul(value, NULL, 0); + rpt->hops = rpt->system_info->hops = (uint16_t) strtoul(value, NULL, 0); + else if(!strcmp(name, "ml_capable")) - system_info->ml_capable = strtoul(value, NULL, 0); + rpt->system_info->ml_capable = strtoul(value, NULL, 0); + else if(!strcmp(name, "ml_enabled")) - system_info->ml_enabled = strtoul(value, NULL, 0); + rpt->system_info->ml_enabled = strtoul(value, NULL, 0); + else if(!strcmp(name, "mc_version")) - system_info->mc_version = strtoul(value, NULL, 0); - else if(!strcmp(name, "tags")) - tags = value; - else if(!strcmp(name, "ver")) - stream_version = convert_stream_version_to_capabilities(strtoul(value, NULL, 0)); + rpt->system_info->mc_version = strtoul(value, NULL, 0); + + else if(!strcmp(name, "tags") && !rpt->tags) + rpt->tags = strdupz(value); + + else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID)) + rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0)); + else { // An old Netdata child does not have a compatible streaming protocol, map to something sane. if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME")) name = "NETDATA_HOST_OS_NAME"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID")) name = "NETDATA_HOST_OS_ID"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE")) name = "NETDATA_HOST_OS_ID_LIKE"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION")) name = "NETDATA_HOST_OS_VERSION"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID")) name = "NETDATA_HOST_OS_VERSION_ID"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION")) name = "NETDATA_HOST_OS_DETECTION"; - else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && stream_version == UINT_MAX) { - stream_version = convert_stream_version_to_capabilities(1); - } - if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) { - info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", - w->client_ip, w->client_port, name, value); + else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID)) + rpt->capabilities = convert_stream_version_to_capabilities(1); + + if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) { + info("STREAM '%s' [receive from [%s]:%s]: " + "request has parameter '%s' = '%s', which is not used." + , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-" + , rpt->client_ip, rpt->client_port + , name, value); } } } - if (stream_version == UINT_MAX) - stream_version = convert_stream_version_to_capabilities(0); + if (rpt->capabilities & STREAM_CAP_INVALID) + // no version is supplied, assume version 0; + rpt->capabilities = convert_stream_version_to_capabilities(0); - if(!key || !*key) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO KEY"); - error("STREAM [receive from [%s]:%s]: request without an API key. Forbidding access.", w->client_ip, w->client_port); - return rrdpush_receiver_permission_denied(w); + // find the program name and version + if(w->user_agent && w->user_agent[0]) { + char *t = strchr(w->user_agent, '/'); + if(t && *t) { + *t = '\0'; + t++; + } + + rpt->program_name = strdupz(w->user_agent); + if(t && *t) rpt->program_version = strdupz(t); } - if(!hostname || !*hostname) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO HOSTNAME"); - error("STREAM [receive from [%s]:%s]: request without a hostname. Forbidding access.", w->client_ip, w->client_port); + // check if we should accept this connection + + if(!rpt->key || !*rpt->key) { + rrdpush_receive_log_status( + rpt, + "request without an API key", + "NO API KEY PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } - if(!machine_guid || !*machine_guid) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO MACHINE GUID"); - error("STREAM [receive from [%s]:%s]: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port); + if(!rpt->hostname || !*rpt->hostname) { + rrdpush_receive_log_status( + rpt, + "request without a hostname", + "NO HOSTNAME PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } - if(regenerate_guid(key, buf) == -1) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID KEY"); - error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key); + if(!rpt->registry_hostname) + rpt->registry_hostname = strdupz(rpt->hostname); + + if(!rpt->machine_guid || !*rpt->machine_guid) { + rrdpush_receive_log_status( + rpt, + "request without a machine GUID", + "NO MACHINE GUID PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } - if(regenerate_guid(machine_guid, buf) == -1) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID MACHINE GUID"); - error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid); - return rrdpush_receiver_permission_denied(w); + { + char buf[GUID_LEN + 1]; + + if (regenerate_guid(rpt->key, buf) == -1) { + rrdpush_receive_log_status( + rpt, + "API key is not a valid UUID (use the command uuidgen to generate one)", + "INVALID API KEY PERMISSION DENIED"); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + + if (regenerate_guid(rpt->machine_guid, buf) == -1) { + rrdpush_receive_log_status( + rpt, + "machine GUID is not a valid UUID", + "INVALID MACHINE GUID PERMISSION DENIED"); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } } - const char *api_key_type = appconfig_get(&stream_config, key, "type", "api"); + const char *api_key_type = appconfig_get(&stream_config, rpt->key, "type", "api"); if(!api_key_type || !*api_key_type) api_key_type = "unknown"; if(strcmp(api_key_type, "api") != 0) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - API KEY GIVEN IS NOT API KEY"); - error("STREAM [receive from [%s]:%s]: API key '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, key, api_key_type); + rrdpush_receive_log_status( + rpt, + "API key is a machine GUID", + "INVALID API KEY PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } - if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ENABLED"); - error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key); + if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) { + rrdpush_receive_log_status( + rpt, + "API key is not enabled", + "API KEY DISABLED PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } { - SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT); + SIMPLE_PATTERN *key_allow_from = simple_pattern_create( + appconfig_get(&stream_config, rpt->key, "allow from", "*"), + NULL, SIMPLE_PATTERN_EXACT); + if(key_allow_from) { if(!simple_pattern_matches(key_allow_from, w->client_ip)) { simple_pattern_free(key_allow_from); - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP"); - error("STREAM [receive from [%s]:%s]: API key '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, key); + + rrdpush_receive_log_status( + rpt, + "API key is not allowed from this IP", + "NOT ALLOWED IP PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } + simple_pattern_free(key_allow_from); } } - const char *machine_guid_type = appconfig_get(&stream_config, machine_guid, "type", "machine"); - if(!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown"; - if(strcmp(machine_guid_type, "machine") != 0) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID GIVEN IS NOT A MACHINE GUID"); - error("STREAM [receive from [%s]:%s]: machine GUID '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid, machine_guid_type); - return rrdpush_receiver_permission_denied(w); + { + const char *machine_guid_type = appconfig_get(&stream_config, rpt->machine_guid, "type", "machine"); + if (!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown"; + + if (strcmp(machine_guid_type, "machine") != 0) { + rrdpush_receive_log_status( + rpt, + "machine GUID is an API key", + "INVALID MACHINE GUID PERMISSION DENIED"); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } } - if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) { - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ENABLED"); - error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid); + if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) { + rrdpush_receive_log_status( + rpt, + "machine GUID is not enabled", + "MACHINE GUID DISABLED PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } { - SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT); + SIMPLE_PATTERN *machine_allow_from = simple_pattern_create( + appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"), + NULL, SIMPLE_PATTERN_EXACT); + if(machine_allow_from) { if(!simple_pattern_matches(machine_allow_from, w->client_ip)) { simple_pattern_free(machine_allow_from); - rrdhost_system_info_free(system_info); - log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP"); - error("STREAM [receive from [%s]:%s]: Machine GUID '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, machine_guid); + + rrdpush_receive_log_status( + rpt, + "machine GUID is not allowed from this IP", + "NOT ALLOWED IP PERMISSION DENIED"); + + receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); } + simple_pattern_free(machine_allow_from); } } + if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { + + rrdpush_receive_log_status( + rpt, + "machine GUID is my own", + "LOCALHOST PERMISSION DENIED"); + + char initial_response[HTTP_HEADER_SIZE + 1]; + snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); + + if(send_timeout( +#ifdef ENABLE_HTTPS + &rpt->ssl, +#endif + rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { + + error("STREAM '%s' [receive from [%s]:%s]: " + "failed to reply." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); + } + + close(rpt->fd); + receiver_state_free(rpt); + return web_client_socket_is_now_used_for_streaming(w); + } + if(unlikely(web_client_streaming_rate_t > 0)) { - static netdata_mutex_t stream_rate_mutex = NETDATA_MUTEX_INITIALIZER; - static volatile time_t last_stream_accepted_t = 0; + static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER; + static time_t last_stream_accepted_t = 0; - netdata_mutex_lock(&stream_rate_mutex); time_t now = now_realtime_sec(); + netdata_spinlock_lock(&spinlock); if(unlikely(last_stream_accepted_t == 0)) last_stream_accepted_t = now; if(now - last_stream_accepted_t < web_client_streaming_rate_t) { - netdata_mutex_unlock(&stream_rate_mutex); - rrdhost_system_info_free(system_info); - error("STREAM [receive from [%s]:%s]: too busy to accept new streaming request. Will be allowed in %ld secs.", w->client_ip, w->client_port, (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t))); + netdata_spinlock_unlock(&spinlock); + + char msg[100 + 1]; + snprintfz(msg, 100, + "rate limit, will accept new connection in %ld secs", + (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t))); + + rrdpush_receive_log_status( + rpt, + msg, + "RATE LIMIT TRY LATER"); + + receiver_state_free(rpt); return rrdpush_receiver_too_busy_now(w); } last_stream_accepted_t = now; - netdata_mutex_unlock(&stream_rate_mutex); + netdata_spinlock_unlock(&spinlock); } /* @@ -867,117 +984,85 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { * lock to prevent race-hazard (two threads try to create the host concurrently, one wins and the other does a * lookup to the now-attached structure). */ - struct receiver_state *rpt = callocz(1, sizeof(*rpt)); - rrd_rdlock(); - RRDHOST *host = rrdhost_find_by_guid(machine_guid); - if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */ - host = NULL; - if (host) { - rrdhost_wrlock(host); - netdata_mutex_lock(&host->receiver_lock); - rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); - host->senders_disconnected_time = 0; - if (host->receiver != NULL) { - time_t age = now_realtime_sec() - host->receiver->last_msg_t; - if (age > 30) { - host->receiver->shutdown = 1; - shutdown(host->receiver->fd, SHUT_RDWR); - host->receiver = NULL; // Thread holds reference to structure - info( - "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - " - "existing connection is dead (%"PRId64" sec), accepting new connection.", - rrdhost_hostname(host), - w->client_ip, - w->client_port, - (int64_t)age); - } - else { - netdata_mutex_unlock(&host->receiver_lock); - rrdhost_unlock(host); - rrd_unlock(); - log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, rrdhost_hostname(host), - "REJECTED - ALREADY CONNECTED"); - info( - "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - " - "existing connection is active (within last %"PRId64" sec), rejecting new connection.", - rrdhost_hostname(host), - w->client_ip, - w->client_port, - (int64_t)age); - // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up - buffer_flush(w->response.data); - buffer_strcat(w->response.data, "This GUID is already streaming to this server"); - freez(rpt); - return 409; + { + time_t age = 0; + bool receiver_stale = false; + bool receiver_working = false; + + rrd_rdlock(); + RRDHOST *host = rrdhost_find_by_guid(rpt->machine_guid); + if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */ + host = NULL; + + if (host) { + netdata_mutex_lock(&host->receiver_lock); + if (host->receiver) { + age = now_realtime_sec() - host->receiver->last_msg_t; + + if (age < 30) + receiver_working = true; + else + receiver_stale = true; } + netdata_mutex_unlock(&host->receiver_lock); } - host->receiver = rpt; - netdata_mutex_unlock(&host->receiver_lock); - rrdhost_unlock(host); - } - rrd_unlock(); - - rpt->last_msg_t = now_realtime_sec(); - - rpt->host = host; - rpt->fd = w->ifd; - rpt->key = strdupz(key); - rpt->hostname = strdupz(hostname); - rpt->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname); - rpt->machine_guid = strdupz(machine_guid); - rpt->os = strdupz(os); - rpt->timezone = strdupz(timezone); - rpt->abbrev_timezone = strdupz(abbrev_timezone); - rpt->utc_offset = utc_offset; - rpt->tags = (tags)?strdupz(tags):NULL; - rpt->client_ip = strdupz(w->client_ip); - rpt->client_port = strdupz(w->client_port); - rpt->update_every = update_every; - rpt->system_info = system_info; - rpt->capabilities = stream_version; -#ifdef ENABLE_HTTPS - rpt->ssl.conn = w->ssl.conn; - rpt->ssl.flags = w->ssl.flags; - - w->ssl.conn = NULL; - w->ssl.flags = NETDATA_SSL_START; -#endif - - if(w->user_agent && w->user_agent[0]) { - char *t = strchr(w->user_agent, '/'); - if(t && *t) { - *t = '\0'; - t++; + rrd_unlock(); + + if (receiver_stale && stop_streaming_receiver(host, "STALE RECEIVER")) { + // we stopped the receiver + // we can proceed with this connection + receiver_stale = false; + + info("STREAM '%s' [receive from [%s]:%s]: " + "stopped previous stale receiver to accept this one." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); } - rpt->program_name = strdupz(w->user_agent); - if(t && *t) rpt->program_version = strdupz(t); + if (receiver_working || receiver_stale) { + // another receiver is already connected + // try again later + + char msg[200 + 1]; + snprintfz(msg, 200, + "multiple connections for same host, " + "old connection was used %ld secs ago%s", + age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)"); + + rrdpush_receive_log_status( + rpt, + msg, + "ALREADY CONNECTED"); + + // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up + buffer_flush(w->response.data); + buffer_strcat(w->response.data, "This GUID is already streaming to this server"); + receiver_state_free(rpt); + return HTTP_RESP_CONFLICT; + } } - - debug(D_SYSTEM, "starting STREAM receive thread."); char tag[FILENAME_MAX + 1]; - snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port); - - if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) - error("Failed to create new STREAM receive thread for client."); - - // prevent the caller from closing the streaming socket - if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) { - web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET); - } - else { - if(w->ifd == w->ofd) - w->ifd = w->ofd = -1; - else - w->ifd = -1; + snprintfz(tag, FILENAME_MAX, THREAD_TAG_STREAM_RECEIVER "[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port); + + if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) { + rrdpush_receive_log_status( + rpt, + "can't create receiver thread", + "INTERNAL SERVER ERROR"); + + buffer_flush(w->response.data); + buffer_strcat(w->response.data, "Can't handle this request"); + receiver_state_free(rpt); + return HTTP_RESP_INTERNAL_SERVER_ERROR; } - buffer_flush(w->response.data); - return 200; + // prevent the caller from closing the streaming socket + return web_client_socket_is_now_used_for_streaming(w); } static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) { @@ -995,7 +1080,7 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) } void log_receiver_capabilities(struct receiver_state *rpt) { - BUFFER *wb = buffer_create(100); + BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, rpt->capabilities); info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", @@ -1005,7 +1090,7 @@ void log_receiver_capabilities(struct receiver_state *rpt) { } void log_sender_capabilities(struct sender_state *s) { - BUFFER *wb = buffer_create(100); + BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, s->capabilities); info("STREAM %s [send to %s]: established link with negotiated capabilities: %s", |