diff options
Diffstat (limited to 'src/rrdpush.c')
-rw-r--r-- | src/rrdpush.c | 242 |
1 files changed, 167 insertions, 75 deletions
diff --git a/src/rrdpush.c b/src/rrdpush.c index 2d10c3ca9..8f71c6d4c 100644 --- a/src/rrdpush.c +++ b/src/rrdpush.c @@ -25,6 +25,11 @@ #define START_STREAMING_PROMPT "Hit me baby, push them over..." +typedef enum { + RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW, + RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW +} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY; + int default_rrdpush_enabled = 0; char *default_rrdpush_destination = NULL; char *default_rrdpush_api_key = NULL; @@ -86,7 +91,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { // send the chart buffer_sprintf( host->rrdpush_sender_buffer - , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s\" \"%s\" \"%s\"\n" + , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n" , st->id , st->name , st->title @@ -99,6 +104,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":"" , rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":"" , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":"" + , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":"" , (st->plugin_name)?st->plugin_name:"" , (st->module_name)?st->module_name:"" ); @@ -140,19 +146,20 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { // sends the current chart dimensions static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st) { - buffer_sprintf(st->rrdhost->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->upstream_resync_time > st->last_collected_time.tv_sec)?st->usec_since_last_update:0); + RRDHOST *host = st->rrdhost; + buffer_sprintf(host->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->upstream_resync_time > st->last_collected_time.tv_sec)?st->usec_since_last_update:0); RRDDIM *rd; rrddim_foreach_read(rd, st) { if(rd->updated && rd->exposed) - buffer_sprintf(st->rrdhost->rrdpush_sender_buffer + buffer_sprintf(host->rrdpush_sender_buffer , "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n" , rd->id , rd->collected_value ); } - buffer_strcat(st->rrdhost->rrdpush_sender_buffer, "END\n"); + buffer_strcat(host->rrdpush_sender_buffer, "END\n"); } static void rrdpush_sender_thread_spawn(RRDHOST *host); @@ -289,7 +296,7 @@ void rrdpush_sender_thread_stop(RRDHOST *host) { rrdpush_buffer_lock(host); rrdhost_wrlock(host); - pthread_t thr = 0; + netdata_thread_t thr = 0; if(host->rrdpush_sender_spawn) { info("STREAM %s [send]: signaling sending thread to stop...", host->hostname); @@ -302,9 +309,7 @@ void rrdpush_sender_thread_stop(RRDHOST *host) { thr = host->rrdpush_sender_thread; // signal it to cancel - int ret = pthread_cancel(host->rrdpush_sender_thread); - if(ret != 0) - error("STREAM %s [send]: pthread_cancel() returned error.", host->hostname); + netdata_thread_cancel(host->rrdpush_sender_thread); } rrdhost_unlock(host); @@ -312,12 +317,8 @@ void rrdpush_sender_thread_stop(RRDHOST *host) { if(thr != 0) { info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname); - void *result; - int ret = pthread_join(thr, &result); - if(ret != 0) - error("STREAM %s [send]: pthread_join() returned error.", host->hostname); - + netdata_thread_join(thr, &result); info("STREAM %s [send]: sending thread has exited.", host->hostname); } } @@ -363,7 +364,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po char http[HTTP_HEADER_SIZE + 1]; snprintfz(http, HTTP_HEADER_SIZE, "STREAM key=%s&hostname=%s®istry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s HTTP/1.1\r\n" - "User-Agent: netdata-push-service/%s\r\n" + "User-Agent: %s/%s\r\n" "Accept: */*\r\n\r\n" , host->rrdpush_send_api_key , host->hostname @@ -373,7 +374,8 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po , host->os , host->timezone , (host->tags)?host->tags:"" - , program_version + , host->program_name + , host->program_version ); if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) { @@ -435,8 +437,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { if(!host->rrdpush_sender_join) { info("STREAM %s [send]: sending thread detaches itself.", host->hostname); - if(pthread_detach(pthread_self())) - error("STREAM %s [send]: pthread_detach() failed.", host->hostname); + netdata_thread_detach(netdata_thread_self()); } host->rrdpush_sender_spawn = 0; @@ -452,18 +453,11 @@ void *rrdpush_sender_thread(void *ptr) { if(!host->rrdpush_send_enabled || !host->rrdpush_send_destination || !*host->rrdpush_send_destination || !host->rrdpush_send_api_key || !*host->rrdpush_send_api_key) { error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", host->hostname, gettid()); - pthread_exit(NULL); return NULL; } info("STREAM %s [send]: thread created (task id %d)", host->hostname, gettid()); - if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) - error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname); - - if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0) - error("STREAM %s [send]: cannot set pthread cancel type to DEFERRED.", host->hostname); - int timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60); int default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999); size_t max_size = (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024); @@ -492,11 +486,11 @@ void *rrdpush_sender_thread(void *ptr) { size_t not_connected_loops = 0; - pthread_cleanup_push(rrdpush_sender_thread_cleanup_callback, host); + netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, host); for(; host->rrdpush_send_enabled && !netdata_exit ;) { // check for outstanding cancellation requests - pthread_testcancel(); + netdata_thread_testcancel(); // if we don't have socket open, lets wait a bit if(unlikely(host->rrdpush_sender_socket == -1)) { @@ -595,8 +589,7 @@ void *rrdpush_sender_thread(void *ptr) { // but the socket is in non-blocking mode // so, we will not block at send() - if (pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0) - error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname); + netdata_thread_disable_cancelability(); debug(D_STREAM, "STREAM: Getting exclusive lock on host..."); rrdpush_buffer_lock(host); @@ -647,8 +640,7 @@ void *rrdpush_sender_thread(void *ptr) { debug(D_STREAM, "STREAM: Releasing exclusive lock on host..."); rrdpush_buffer_unlock(host); - if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) - error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname); + netdata_thread_enable_cancelability(); // END RRDPUSH LOCKED SESSION } @@ -689,9 +681,7 @@ void *rrdpush_sender_thread(void *ptr) { } } - pthread_cleanup_pop(1); - - pthread_exit(NULL); + netdata_thread_cleanup_pop(1); return NULL; } @@ -703,7 +693,49 @@ static void log_stream_connection(const char *client_ip, const char *client_port log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid); } -static int rrdpush_receive(int fd, const char *key, const char *hostname, const char *registry_hostname, const char *machine_guid, const char *os, const char *timezone, const char *tags, int update_every, char *client_ip, char *client_port) { +static RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY get_multiple_connections_strategy(struct config *c, const char *section, const char *name, RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY def) { + char *value; + switch(def) { + default: + case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW: + value = "allow"; + break; + + case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW: + value = "deny"; + break; + } + + value = appconfig_get(c, section, name, value); + + RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY ret = def; + + if(strcasecmp(value, "allow") == 0 || strcasecmp(value, "permit") == 0 || strcasecmp(value, "accept") == 0) + ret = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW; + + else if(strcasecmp(value, "deny") == 0 || strcasecmp(value, "reject") == 0 || strcasecmp(value, "block") == 0) + ret = RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW; + + else + error("Invalid stream config value at section [%s], setting '%s', value '%s'", section, name, value); + + return ret; +} + +static int rrdpush_receive(int fd + , const char *key + , const char *hostname + , const char *registry_hostname + , const char *machine_guid + , const char *os + , const char *timezone + , const char *tags + , const char *program_name + , const char *program_version + , int update_every + , char *client_ip + , char *client_port +) { RRDHOST *host; int history = default_rrd_history_entries; RRD_MEMORY_MODE mode = default_rrd_memory_mode; @@ -712,6 +744,7 @@ static int rrdpush_receive(int fd, const char *key, const char *hostname, const char *rrdpush_destination = default_rrdpush_destination; char *rrdpush_api_key = default_rrdpush_api_key; time_t alarms_delay = 60; + RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY rrdpush_multiple_connections_strategy = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW; update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every); if(update_every < 0) update_every = 1; @@ -738,6 +771,9 @@ static int rrdpush_receive(int fd, const char *key, const char *hostname, const rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key); rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key); + rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, key, "multiple connections", rrdpush_multiple_connections_strategy); + rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, machine_guid, "multiple connections", rrdpush_multiple_connections_strategy); + tags = appconfig_set_default(&stream_config, machine_guid, "host tags", (tags)?tags:""); if(tags && !*tags) tags = NULL; @@ -751,6 +787,8 @@ static int rrdpush_receive(int fd, const char *key, const char *hostname, const , os , timezone , tags + , program_name + , program_version , update_every , history , mode @@ -821,8 +859,20 @@ static int rrdpush_receive(int fd, const char *key, const char *hostname, const } rrdhost_wrlock(host); - if(host->connected_senders > 0) - info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port); + if(host->connected_senders > 0) { + switch(rrdpush_multiple_connections_strategy) { + case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW: + info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port); + break; + + case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW: + rrdhost_unlock(host); + log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED"); + info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port); + fclose(fp); + return 0; + } + } rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); host->connected_senders++; @@ -877,35 +927,57 @@ struct rrdpush_thread { char *tags; char *client_ip; char *client_port; + char *program_name; + char *program_version; int update_every; }; -static void *rrdpush_receiver_thread(void *ptr) { - struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr; - - if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0) - error("STREAM %s [receive]: cannot set pthread cancel type to DEFERRED.", rpt->hostname); - - if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) - error("STREAM %s [receive]: cannot set pthread cancel state to ENABLE.", rpt->hostname); - - - info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); - rrdpush_receive(rpt->fd, rpt->key, rpt->hostname, rpt->registry_hostname, rpt->machine_guid, rpt->os, rpt->timezone, rpt->tags, rpt->update_every, rpt->client_ip, rpt->client_port); - info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); +static void rrdpush_receiver_thread_cleanup(void *ptr) { + static __thread int executed = 0; + if(!executed) { + executed = 1; + struct rrdpush_thread *rpt = (struct rrdpush_thread *) ptr; + + info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); + + freez(rpt->key); + freez(rpt->hostname); + freez(rpt->registry_hostname); + freez(rpt->machine_guid); + freez(rpt->os); + freez(rpt->timezone); + freez(rpt->tags); + freez(rpt->client_ip); + freez(rpt->client_port); + freez(rpt->program_name); + freez(rpt->program_version); + freez(rpt); + } +} - freez(rpt->key); - freez(rpt->hostname); - freez(rpt->registry_hostname); - freez(rpt->machine_guid); - freez(rpt->os); - freez(rpt->timezone); - freez(rpt->tags); - freez(rpt->client_ip); - freez(rpt->client_port); - freez(rpt); +static void *rrdpush_receiver_thread(void *ptr) { + netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr); + + struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr; + info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); + + rrdpush_receive( + rpt->fd + , rpt->key + , rpt->hostname + , rpt->registry_hostname + , rpt->machine_guid + , rpt->os + , rpt->timezone + , rpt->tags + , rpt->program_name + , rpt->program_version + , rpt->update_every + , rpt->client_ip + , rpt->client_port + ); - pthread_exit(NULL); + netdata_thread_cleanup_pop(1); return NULL; } @@ -913,7 +985,10 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) { rrdhost_wrlock(host); if(!host->rrdpush_sender_spawn) { - if(pthread_create(&host->rrdpush_sender_thread, NULL, rrdpush_sender_thread, (void *) host)) + char tag[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", host->hostname); + + if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host)) error("STREAM %s [send]: failed to create new thread for client.", host->hostname); else host->rrdpush_sender_spawn = 1; @@ -933,7 +1008,7 @@ int rrdpush_receiver_permission_denied(struct web_client *w) { int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) { (void)host; - info("STREAM [receive from [%s]:%s]: new client connection.", w->client_ip, w->client_port); + info("clients wants to STREAM metrics."); char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL; int update_every = default_rrd_update_every; @@ -1004,7 +1079,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url } { - SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), SIMPLE_PATTERN_EXACT); + SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT); if(key_allow_from) { if(!simple_pattern_matches(key_allow_from, w->client_ip)) { simple_pattern_free(key_allow_from); @@ -1023,7 +1098,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url } { - SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), SIMPLE_PATTERN_EXACT); + SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT); if(machine_allow_from) { if(!simple_pattern_matches(machine_allow_from, w->client_ip)) { simple_pattern_free(machine_allow_from); @@ -1035,7 +1110,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url } } - struct rrdpush_thread *rpt = mallocz(sizeof(struct rrdpush_thread)); + struct rrdpush_thread *rpt = callocz(1, sizeof(struct rrdpush_thread)); rpt->fd = w->ifd; rpt->key = strdupz(key); rpt->hostname = strdupz(hostname); @@ -1047,21 +1122,38 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url rpt->client_ip = strdupz(w->client_ip); rpt->client_port = strdupz(w->client_port); rpt->update_every = update_every; - pthread_t thread; - debug(D_SYSTEM, "STREAM [receive from [%s]:%s]: starting receiving thread.", w->client_ip, w->client_port); + if(w->user_agent && w->user_agent[0]) { + char *t = strchr(w->user_agent, '/'); + if(t && *t) { + *t = '\0'; + t++; + } + + rpt->program_name = strdupz(w->user_agent); + if(t && *t) rpt->program_version = strdupz(t); + } + + netdata_thread_t thread; + + debug(D_SYSTEM, "starting STREAM receive thread."); - if(pthread_create(&thread, NULL, rrdpush_receiver_thread, (void *)rpt)) - error("STREAM [receive from [%s]:%s]: failed to create new thread for client.", w->client_ip, w->client_port); + char tag[FILENAME_MAX + 1]; + snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port); - else if(pthread_detach(thread)) - error("STREAM [receive from [%s]:%s]: cannot request detach newly created thread.", w->client_ip, w->client_port); + if(netdata_thread_create(&thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) + error("Failed to create new STREAM receive thread for client."); // prevent the caller from closing the streaming socket - if(w->ifd == w->ofd) - w->ifd = w->ofd = -1; - else - w->ifd = -1; + if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) { + web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET); + } + else { + if(w->ifd == w->ofd) + w->ifd = w->ofd = -1; + else + w->ifd = -1; + } buffer_flush(w->response.data); return 200; |