summaryrefslogtreecommitdiffstats
path: root/src/rrdpush.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rrdpush.c242
1 files changed, 167 insertions, 75 deletions
diff --git a/src/rrdpush.c b/src/rrdpush.c
index 2d10c3ca9..8f71c6d4c 100644
--- a/src/rrdpush.c
+++ b/src/rrdpush.c
@@ -25,6 +25,11 @@
#define START_STREAMING_PROMPT "Hit me baby, push them over..."
+typedef enum {
+ RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
+ RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
+} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
+
int default_rrdpush_enabled = 0;
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
@@ -86,7 +91,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
// send the chart
buffer_sprintf(
host->rrdpush_sender_buffer
- , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s\" \"%s\" \"%s\"\n"
+ , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
, st->id
, st->name
, st->title
@@ -99,6 +104,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
, rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":""
, rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":""
, rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
+ , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":""
, (st->plugin_name)?st->plugin_name:""
, (st->module_name)?st->module_name:""
);
@@ -140,19 +146,20 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
// sends the current chart dimensions
static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st) {
- buffer_sprintf(st->rrdhost->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->upstream_resync_time > st->last_collected_time.tv_sec)?st->usec_since_last_update:0);
+ RRDHOST *host = st->rrdhost;
+ buffer_sprintf(host->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->upstream_resync_time > st->last_collected_time.tv_sec)?st->usec_since_last_update:0);
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
if(rd->updated && rd->exposed)
- buffer_sprintf(st->rrdhost->rrdpush_sender_buffer
+ buffer_sprintf(host->rrdpush_sender_buffer
, "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n"
, rd->id
, rd->collected_value
);
}
- buffer_strcat(st->rrdhost->rrdpush_sender_buffer, "END\n");
+ buffer_strcat(host->rrdpush_sender_buffer, "END\n");
}
static void rrdpush_sender_thread_spawn(RRDHOST *host);
@@ -289,7 +296,7 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
rrdpush_buffer_lock(host);
rrdhost_wrlock(host);
- pthread_t thr = 0;
+ netdata_thread_t thr = 0;
if(host->rrdpush_sender_spawn) {
info("STREAM %s [send]: signaling sending thread to stop...", host->hostname);
@@ -302,9 +309,7 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
thr = host->rrdpush_sender_thread;
// signal it to cancel
- int ret = pthread_cancel(host->rrdpush_sender_thread);
- if(ret != 0)
- error("STREAM %s [send]: pthread_cancel() returned error.", host->hostname);
+ netdata_thread_cancel(host->rrdpush_sender_thread);
}
rrdhost_unlock(host);
@@ -312,12 +317,8 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
if(thr != 0) {
info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname);
-
void *result;
- int ret = pthread_join(thr, &result);
- if(ret != 0)
- error("STREAM %s [send]: pthread_join() returned error.", host->hostname);
-
+ netdata_thread_join(thr, &result);
info("STREAM %s [send]: sending thread has exited.", host->hostname);
}
}
@@ -363,7 +364,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
char http[HTTP_HEADER_SIZE + 1];
snprintfz(http, HTTP_HEADER_SIZE,
"STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s HTTP/1.1\r\n"
- "User-Agent: netdata-push-service/%s\r\n"
+ "User-Agent: %s/%s\r\n"
"Accept: */*\r\n\r\n"
, host->rrdpush_send_api_key
, host->hostname
@@ -373,7 +374,8 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
, host->os
, host->timezone
, (host->tags)?host->tags:""
- , program_version
+ , host->program_name
+ , host->program_version
);
if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
@@ -435,8 +437,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
if(!host->rrdpush_sender_join) {
info("STREAM %s [send]: sending thread detaches itself.", host->hostname);
- if(pthread_detach(pthread_self()))
- error("STREAM %s [send]: pthread_detach() failed.", host->hostname);
+ netdata_thread_detach(netdata_thread_self());
}
host->rrdpush_sender_spawn = 0;
@@ -452,18 +453,11 @@ void *rrdpush_sender_thread(void *ptr) {
if(!host->rrdpush_send_enabled || !host->rrdpush_send_destination || !*host->rrdpush_send_destination || !host->rrdpush_send_api_key || !*host->rrdpush_send_api_key) {
error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", host->hostname, gettid());
- pthread_exit(NULL);
return NULL;
}
info("STREAM %s [send]: thread created (task id %d)", host->hostname, gettid());
- if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
- error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname);
-
- if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
- error("STREAM %s [send]: cannot set pthread cancel type to DEFERRED.", host->hostname);
-
int timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60);
int default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
size_t max_size = (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024);
@@ -492,11 +486,11 @@ void *rrdpush_sender_thread(void *ptr) {
size_t not_connected_loops = 0;
- pthread_cleanup_push(rrdpush_sender_thread_cleanup_callback, host);
+ netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, host);
for(; host->rrdpush_send_enabled && !netdata_exit ;) {
// check for outstanding cancellation requests
- pthread_testcancel();
+ netdata_thread_testcancel();
// if we don't have socket open, lets wait a bit
if(unlikely(host->rrdpush_sender_socket == -1)) {
@@ -595,8 +589,7 @@ void *rrdpush_sender_thread(void *ptr) {
// but the socket is in non-blocking mode
// so, we will not block at send()
- if (pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
- error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname);
+ netdata_thread_disable_cancelability();
debug(D_STREAM, "STREAM: Getting exclusive lock on host...");
rrdpush_buffer_lock(host);
@@ -647,8 +640,7 @@ void *rrdpush_sender_thread(void *ptr) {
debug(D_STREAM, "STREAM: Releasing exclusive lock on host...");
rrdpush_buffer_unlock(host);
- if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
- error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname);
+ netdata_thread_enable_cancelability();
// END RRDPUSH LOCKED SESSION
}
@@ -689,9 +681,7 @@ void *rrdpush_sender_thread(void *ptr) {
}
}
- pthread_cleanup_pop(1);
-
- pthread_exit(NULL);
+ netdata_thread_cleanup_pop(1);
return NULL;
}
@@ -703,7 +693,49 @@ static void log_stream_connection(const char *client_ip, const char *client_port
log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
}
-static int rrdpush_receive(int fd, const char *key, const char *hostname, const char *registry_hostname, const char *machine_guid, const char *os, const char *timezone, const char *tags, int update_every, char *client_ip, char *client_port) {
+static RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY get_multiple_connections_strategy(struct config *c, const char *section, const char *name, RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY def) {
+ char *value;
+ switch(def) {
+ default:
+ case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
+ value = "allow";
+ break;
+
+ case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
+ value = "deny";
+ break;
+ }
+
+ value = appconfig_get(c, section, name, value);
+
+ RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY ret = def;
+
+ if(strcasecmp(value, "allow") == 0 || strcasecmp(value, "permit") == 0 || strcasecmp(value, "accept") == 0)
+ ret = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
+
+ else if(strcasecmp(value, "deny") == 0 || strcasecmp(value, "reject") == 0 || strcasecmp(value, "block") == 0)
+ ret = RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW;
+
+ else
+ error("Invalid stream config value at section [%s], setting '%s', value '%s'", section, name, value);
+
+ return ret;
+}
+
+static int rrdpush_receive(int fd
+ , const char *key
+ , const char *hostname
+ , const char *registry_hostname
+ , const char *machine_guid
+ , const char *os
+ , const char *timezone
+ , const char *tags
+ , const char *program_name
+ , const char *program_version
+ , int update_every
+ , char *client_ip
+ , char *client_port
+) {
RRDHOST *host;
int history = default_rrd_history_entries;
RRD_MEMORY_MODE mode = default_rrd_memory_mode;
@@ -712,6 +744,7 @@ static int rrdpush_receive(int fd, const char *key, const char *hostname, const
char *rrdpush_destination = default_rrdpush_destination;
char *rrdpush_api_key = default_rrdpush_api_key;
time_t alarms_delay = 60;
+ RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY rrdpush_multiple_connections_strategy = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
if(update_every < 0) update_every = 1;
@@ -738,6 +771,9 @@ static int rrdpush_receive(int fd, const char *key, const char *hostname, const
rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key);
rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key);
+ rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, key, "multiple connections", rrdpush_multiple_connections_strategy);
+ rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, machine_guid, "multiple connections", rrdpush_multiple_connections_strategy);
+
tags = appconfig_set_default(&stream_config, machine_guid, "host tags", (tags)?tags:"");
if(tags && !*tags) tags = NULL;
@@ -751,6 +787,8 @@ static int rrdpush_receive(int fd, const char *key, const char *hostname, const
, os
, timezone
, tags
+ , program_name
+ , program_version
, update_every
, history
, mode
@@ -821,8 +859,20 @@ static int rrdpush_receive(int fd, const char *key, const char *hostname, const
}
rrdhost_wrlock(host);
- if(host->connected_senders > 0)
- info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port);
+ if(host->connected_senders > 0) {
+ switch(rrdpush_multiple_connections_strategy) {
+ case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
+ info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port);
+ break;
+
+ case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
+ rrdhost_unlock(host);
+ log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port);
+ fclose(fp);
+ return 0;
+ }
+ }
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
host->connected_senders++;
@@ -877,35 +927,57 @@ struct rrdpush_thread {
char *tags;
char *client_ip;
char *client_port;
+ char *program_name;
+ char *program_version;
int update_every;
};
-static void *rrdpush_receiver_thread(void *ptr) {
- struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr;
-
- if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
- error("STREAM %s [receive]: cannot set pthread cancel type to DEFERRED.", rpt->hostname);
-
- if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
- error("STREAM %s [receive]: cannot set pthread cancel state to ENABLE.", rpt->hostname);
-
-
- info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
- rrdpush_receive(rpt->fd, rpt->key, rpt->hostname, rpt->registry_hostname, rpt->machine_guid, rpt->os, rpt->timezone, rpt->tags, rpt->update_every, rpt->client_ip, rpt->client_port);
- info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+static void rrdpush_receiver_thread_cleanup(void *ptr) {
+ static __thread int executed = 0;
+ if(!executed) {
+ executed = 1;
+ struct rrdpush_thread *rpt = (struct rrdpush_thread *) ptr;
+
+ info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+
+ freez(rpt->key);
+ freez(rpt->hostname);
+ freez(rpt->registry_hostname);
+ freez(rpt->machine_guid);
+ freez(rpt->os);
+ freez(rpt->timezone);
+ freez(rpt->tags);
+ freez(rpt->client_ip);
+ freez(rpt->client_port);
+ freez(rpt->program_name);
+ freez(rpt->program_version);
+ freez(rpt);
+ }
+}
- freez(rpt->key);
- freez(rpt->hostname);
- freez(rpt->registry_hostname);
- freez(rpt->machine_guid);
- freez(rpt->os);
- freez(rpt->timezone);
- freez(rpt->tags);
- freez(rpt->client_ip);
- freez(rpt->client_port);
- freez(rpt);
+static void *rrdpush_receiver_thread(void *ptr) {
+ netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
+
+ struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr;
+ info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+
+ rrdpush_receive(
+ rpt->fd
+ , rpt->key
+ , rpt->hostname
+ , rpt->registry_hostname
+ , rpt->machine_guid
+ , rpt->os
+ , rpt->timezone
+ , rpt->tags
+ , rpt->program_name
+ , rpt->program_version
+ , rpt->update_every
+ , rpt->client_ip
+ , rpt->client_port
+ );
- pthread_exit(NULL);
+ netdata_thread_cleanup_pop(1);
return NULL;
}
@@ -913,7 +985,10 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) {
rrdhost_wrlock(host);
if(!host->rrdpush_sender_spawn) {
- if(pthread_create(&host->rrdpush_sender_thread, NULL, rrdpush_sender_thread, (void *) host))
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", host->hostname);
+
+ if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host))
error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
else
host->rrdpush_sender_spawn = 1;
@@ -933,7 +1008,7 @@ int rrdpush_receiver_permission_denied(struct web_client *w) {
int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) {
(void)host;
- info("STREAM [receive from [%s]:%s]: new client connection.", w->client_ip, w->client_port);
+ info("clients wants to STREAM metrics.");
char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL;
int update_every = default_rrd_update_every;
@@ -1004,7 +1079,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
}
{
- SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), SIMPLE_PATTERN_EXACT);
+ SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
if(key_allow_from) {
if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
simple_pattern_free(key_allow_from);
@@ -1023,7 +1098,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
}
{
- SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), SIMPLE_PATTERN_EXACT);
+ SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
if(machine_allow_from) {
if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
simple_pattern_free(machine_allow_from);
@@ -1035,7 +1110,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
}
}
- struct rrdpush_thread *rpt = mallocz(sizeof(struct rrdpush_thread));
+ struct rrdpush_thread *rpt = callocz(1, sizeof(struct rrdpush_thread));
rpt->fd = w->ifd;
rpt->key = strdupz(key);
rpt->hostname = strdupz(hostname);
@@ -1047,21 +1122,38 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
rpt->client_ip = strdupz(w->client_ip);
rpt->client_port = strdupz(w->client_port);
rpt->update_every = update_every;
- pthread_t thread;
- debug(D_SYSTEM, "STREAM [receive from [%s]:%s]: starting receiving thread.", w->client_ip, w->client_port);
+ if(w->user_agent && w->user_agent[0]) {
+ char *t = strchr(w->user_agent, '/');
+ if(t && *t) {
+ *t = '\0';
+ t++;
+ }
+
+ rpt->program_name = strdupz(w->user_agent);
+ if(t && *t) rpt->program_version = strdupz(t);
+ }
+
+ netdata_thread_t thread;
+
+ debug(D_SYSTEM, "starting STREAM receive thread.");
- if(pthread_create(&thread, NULL, rrdpush_receiver_thread, (void *)rpt))
- error("STREAM [receive from [%s]:%s]: failed to create new thread for client.", w->client_ip, w->client_port);
+ char tag[FILENAME_MAX + 1];
+ snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
- else if(pthread_detach(thread))
- error("STREAM [receive from [%s]:%s]: cannot request detach newly created thread.", w->client_ip, w->client_port);
+ if(netdata_thread_create(&thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
+ error("Failed to create new STREAM receive thread for client.");
// prevent the caller from closing the streaming socket
- if(w->ifd == w->ofd)
- w->ifd = w->ofd = -1;
- else
- w->ifd = -1;
+ if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) {
+ web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET);
+ }
+ else {
+ if(w->ifd == w->ofd)
+ w->ifd = w->ofd = -1;
+ else
+ w->ifd = -1;
+ }
buffer_flush(w->response.data);
return 200;