summaryrefslogtreecommitdiffstats
path: root/src/rrdpush.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rrdpush.c')
-rw-r--r--src/rrdpush.c249
1 files changed, 168 insertions, 81 deletions
diff --git a/src/rrdpush.c b/src/rrdpush.c
index 72e6d8a73..6def90fe5 100644
--- a/src/rrdpush.c
+++ b/src/rrdpush.c
@@ -57,13 +57,16 @@ int rrdpush_init() {
// to its current clock, we send for this many
// iterations a BEGIN line without microseconds
// this is for the first iterations of each chart
-static unsigned int remote_clock_resync_iterations = 60;
+unsigned int remote_clock_resync_iterations = 60;
#define rrdpush_lock(host) netdata_mutex_lock(&((host)->rrdpush_mutex))
#define rrdpush_unlock(host) netdata_mutex_unlock(&((host)->rrdpush_mutex))
// checks if the current chart definition has been sent
static inline int need_to_send_chart_definition(RRDSET *st) {
+ if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_EXPOSED_UPSTREAM))))
+ return 1;
+
RRDDIM *rd;
rrddim_foreach_read(rd, st)
if(!rd->exposed)
@@ -74,7 +77,9 @@ static inline int need_to_send_chart_definition(RRDSET *st) {
// sends the current chart definition
static inline void send_chart_definition(RRDSET *st) {
- buffer_sprintf(st->rrdhost->rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
+ rrdset_flag_set(st, RRDSET_FLAG_EXPOSED_UPSTREAM);
+
+ buffer_sprintf(st->rrdhost->rrdpush_buffer, "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s\"\n"
, st->id
, st->name
, st->title
@@ -84,11 +89,14 @@ static inline void send_chart_definition(RRDSET *st) {
, rrdset_type_name(st->chart_type)
, st->priority
, st->update_every
+ , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":""
+ , rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":""
+ , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
);
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- buffer_sprintf(st->rrdhost->rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
+ buffer_sprintf(st->rrdhost->rrdpush_buffer, "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s\"\n"
, rd->id
, rd->name
, rrd_algorithm_name(rd->algorithm)
@@ -99,11 +107,13 @@ static inline void send_chart_definition(RRDSET *st) {
);
rd->exposed = 1;
}
+
+ st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
}
// sends the current chart dimensions
static inline void send_chart_metrics(RRDSET *st) {
- buffer_sprintf(st->rrdhost->rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0);
+ buffer_sprintf(st->rrdhost->rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->upstream_resync_time > st->last_collected_time.tv_sec)?st->usec_since_last_update:0);
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
@@ -117,7 +127,17 @@ static inline void send_chart_metrics(RRDSET *st) {
buffer_strcat(st->rrdhost->rrdpush_buffer, "END\n");
}
-void rrdpush_sender_thread_spawn(RRDHOST *host);
+static void rrdpush_sender_thread_spawn(RRDHOST *host);
+
+void rrdset_push_chart_definition(RRDSET *st) {
+ RRDHOST *host = st->rrdhost;
+
+ rrdset_rdlock(st);
+ rrdpush_lock(host);
+ send_chart_definition(st);
+ rrdpush_unlock(host);
+ rrdset_unlock(st);
+}
void rrdset_done_push(RRDSET *st) {
RRDHOST *host = st->rrdhost;
@@ -167,9 +187,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
RRDSET *st;
rrdset_foreach_read(st, host) {
- // make it re-align the current time
- // on the remote host
- st->counter_done = 0;
+ st->upstream_resync_time = 0;
rrdset_rdlock(st);
@@ -219,8 +237,6 @@ static void rrdpush_sender_thread_cleanup_locked_all(RRDHOST *host) {
host->rrdpush_buffer = NULL;
host->rrdpush_spawn = 0;
-
- rrdhost_flag_set(host, RRDHOST_ORPHAN);
}
void rrdpush_sender_thread_stop(RRDHOST *host) {
@@ -274,6 +290,7 @@ void *rrdpush_sender_thread(void *ptr) {
.tv_usec = 0
};
+ time_t last_sent_t = 0;
struct pollfd fds[2], *ifd, *ofd;
nfds_t fdmax;
@@ -281,8 +298,16 @@ void *rrdpush_sender_thread(void *ptr) {
ofd = &fds[1];
for(; host->rrdpush_enabled && !netdata_exit ;) {
+ debug(D_STREAM, "STREAM: Checking if we need to timeout the connection...");
+ if(host->rrdpush_socket != -1 && now_monotonic_sec() - last_sent_t > timeout) {
+ error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, timeout, sent_connection);
+ close(host->rrdpush_socket);
+ host->rrdpush_socket = -1;
+ }
if(unlikely(host->rrdpush_socket == -1)) {
+ debug(D_STREAM, "STREAM: Attempting to connect...");
+
// stop appending data into rrdpush_buffer
// they will be lost, so there is no point to do it
host->rrdpush_connected = 0;
@@ -298,16 +323,19 @@ void *rrdpush_sender_thread(void *ptr) {
info("STREAM %s [send to %s]: initializing communication...", host->hostname, connected_to);
- char http[1000 + 1];
- snprintfz(http, 1000,
- "STREAM key=%s&hostname=%s&machine_guid=%s&os=%s&update_every=%d HTTP/1.1\r\n"
+ #define HTTP_HEADER_SIZE 8192
+ char http[HTTP_HEADER_SIZE + 1];
+ snprintfz(http, HTTP_HEADER_SIZE,
+ "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&tags=%s HTTP/1.1\r\n"
"User-Agent: netdata-push-service/%s\r\n"
"Accept: */*\r\n\r\n"
, host->rrdpush_api_key
, host->hostname
+ , host->registry_hostname
, host->machine_guid
- , host->os
, default_rrd_update_every
+ , host->os
+ , (host->tags)?host->tags:""
, program_version
);
@@ -321,7 +349,7 @@ void *rrdpush_sender_thread(void *ptr) {
info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, connected_to);
- if(recv_timeout(host->rrdpush_socket, http, 1000, 0, timeout) == -1) {
+ if(recv_timeout(host->rrdpush_socket, http, HTTP_HEADER_SIZE, 0, timeout) == -1) {
close(host->rrdpush_socket);
host->rrdpush_socket = -1;
error("STREAM %s [send to %s]: failed to initialize communication", host->hostname, connected_to);
@@ -338,15 +366,21 @@ void *rrdpush_sender_thread(void *ptr) {
}
info("STREAM %s [send to %s]: established communication - sending metrics...", host->hostname, connected_to);
+ last_sent_t = now_monotonic_sec();
- if(fcntl(host->rrdpush_socket, F_SETFL, O_NONBLOCK) < 0)
+ if(sock_setnonblock(host->rrdpush_socket) < 0)
error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, connected_to);
+ if(sock_enlarge_out(host->rrdpush_socket) < 0)
+ error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", host->hostname, connected_to);
+
rrdpush_sender_thread_data_flush(host);
sent_connection = 0;
// allow appending data into rrdpush_buffer
host->rrdpush_connected = 1;
+
+ debug(D_STREAM, "Connected...");
}
ifd->fd = host->rrdpush_pipe[PIPE_READ];
@@ -356,82 +390,113 @@ void *rrdpush_sender_thread(void *ptr) {
ofd->fd = host->rrdpush_socket;
ofd->revents = 0;
if(begin < buffer_strlen(host->rrdpush_buffer)) {
+ debug(D_STREAM, "STREAM: Requesting data output on streaming socket...");
ofd->events = POLLOUT;
fdmax = 2;
}
else {
+ debug(D_STREAM, "STREAM: Not requesting data output on streaming socket (nothing to send now)...");
ofd->events = 0;
fdmax = 1;
}
+ debug(D_STREAM, "STREAM: Waiting for poll() events (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_buffer));
if(netdata_exit) break;
- int retval = poll(fds, fdmax, timeout * 1000);
+ int retval = poll(fds, fdmax, 1000);
if(netdata_exit) break;
if(unlikely(retval == -1)) {
- if(errno == EAGAIN || errno == EINTR)
+ debug(D_STREAM, "STREAM: poll() failed (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_buffer));
+
+ if(errno == EAGAIN || errno == EINTR) {
+ debug(D_STREAM, "STREAM: poll() failed with EAGAIN or EINTR...");
continue;
+ }
error("STREAM %s [send to %s]: failed to poll().", host->hostname, connected_to);
close(host->rrdpush_socket);
host->rrdpush_socket = -1;
break;
}
- else if(unlikely(!retval)) {
- // timeout
- continue;
- }
-
- if(ifd->revents & POLLIN) {
- char buffer[1000 + 1];
- if(read(host->rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
- error("STREAM %s [send to %s]: cannot read from internal pipe.", host->hostname, connected_to);
- }
-
- if(ofd->revents & POLLOUT && begin < buffer_strlen(host->rrdpush_buffer)) {
+ else if(likely(retval)) {
+ if (ifd->revents & POLLIN) {
+ debug(D_STREAM, "STREAM: Data added to send buffer (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_buffer));
- // BEGIN RRDPUSH LOCKED SESSION
-
- // during this session, data collectors
- // will not be able to append data to our buffer
- // but the socket is in non-blocking mode
- // so, we will not block at send()
-
- if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
- error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname);
-
- rrdpush_lock(host);
+ char buffer[1000 + 1];
+ if (read(host->rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
+ error("STREAM %s [send to %s]: cannot read from internal pipe.", host->hostname, connected_to);
+ }
- ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT);
- if(ret == -1) {
- if(errno != EAGAIN && errno != EINTR) {
- error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_connection);
+ if (ofd->revents & POLLOUT && begin < buffer_strlen(host->rrdpush_buffer)) {
+ debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_buffer));
+
+ // BEGIN RRDPUSH LOCKED SESSION
+
+ // during this session, data collectors
+ // will not be able to append data to our buffer
+ // but the socket is in non-blocking mode
+ // so, we will not block at send()
+
+ if (pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
+ error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname);
+
+ debug(D_STREAM, "STREAM: Getting exclusive lock on host...");
+ rrdpush_lock(host);
+
+ debug(D_STREAM, "STREAM: Sending data, starting from %zu, size %zu...", begin, buffer_strlen(host->rrdpush_buffer));
+ ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT);
+ if (unlikely(ret == -1)) {
+ if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) {
+ debug(D_STREAM, "STREAM: Send failed - closing socket...");
+ error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_connection);
+ close(host->rrdpush_socket);
+ host->rrdpush_socket = -1;
+ }
+ else {
+ debug(D_STREAM, "STREAM: Send failed - will retry...");
+ }
+ }
+ else if(likely(ret > 0)) {
+ sent_connection += ret;
+ sent_bytes += ret;
+ begin += ret;
+
+ if (begin == buffer_strlen(host->rrdpush_buffer)) {
+ // we send it all
+
+ debug(D_STREAM, "STREAM: Sent %zd bytes (the whole buffer)...", ret);
+ buffer_flush(host->rrdpush_buffer);
+ begin = 0;
+ }
+ else {
+ debug(D_STREAM, "STREAM: Sent %zd bytes (part of the data buffer)...", ret);
+ }
+
+ last_sent_t = now_monotonic_sec();
+ }
+ else {
+ debug(D_STREAM, "STREAM: send() returned %zd - closing the socket...", ret);
+ error("STREAM %s [send to %s]: failed to send metrics (send() returned %zd) - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, ret, sent_connection);
close(host->rrdpush_socket);
host->rrdpush_socket = -1;
}
- }
- else {
- sent_connection += ret;
- sent_bytes += ret;
- begin += ret;
- if(begin == buffer_strlen(host->rrdpush_buffer)) {
- // we send it all
-
- buffer_flush(host->rrdpush_buffer);
- begin = 0;
- }
- }
- rrdpush_unlock(host);
+ debug(D_STREAM, "STREAM: Releasing exclusive lock on host...");
+ rrdpush_unlock(host);
- if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
- error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname);
+ if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
+ error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname);
- // END RRDPUSH LOCKED SESSION
+ // END RRDPUSH LOCKED SESSION
+ }
+ }
+ else {
+ debug(D_STREAM, "STREAM: poll() timed out.");
}
// protection from overflow
- if(host->rrdpush_buffer->len > max_size) {
+ if(buffer_strlen(host->rrdpush_buffer) > max_size) {
+ debug(D_STREAM, "STREAM: Buffer is too big (%zu bytes), bigger than the max (%zu) - flushing it...", buffer_strlen(host->rrdpush_buffer), max_size);
errno = 0;
error("STREAM %s [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", host->hostname, connected_to, host->rrdpush_buffer->len, host->rrdpush_buffer->len - begin, sent_bytes, sent_connection);
if(host->rrdpush_socket != -1) {
@@ -464,7 +529,7 @@ cleanup:
// ----------------------------------------------------------------------------
// rrdpush receiver thread
-int rrdpush_receive(int fd, const char *key, const char *hostname, const char *machine_guid, const char *os, int update_every, char *client_ip, char *client_port) {
+static int rrdpush_receive(int fd, const char *key, const char *hostname, const char *registry_hostname, const char *machine_guid, const char *os, const char *tags, int update_every, char *client_ip, char *client_port) {
RRDHOST *host;
int history = default_rrd_history_entries;
RRD_MEMORY_MODE mode = default_rrd_memory_mode;
@@ -499,13 +564,18 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key);
rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key);
+ tags = appconfig_set_default(&stream_config, machine_guid, "host tags", (tags)?tags:"");
+ if(tags && !*tags) tags = NULL;
+
if(!strcmp(machine_guid, "localhost"))
host = localhost;
else
host = rrdhost_find_or_create(
hostname
+ , registry_hostname
, machine_guid
, os
+ , tags
, update_every
, history
, mode
@@ -522,7 +592,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
}
#ifdef NETDATA_INTERNAL_CHECKS
- info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s"
+ info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s, tags '%s'"
, hostname
, client_ip
, client_port
@@ -532,6 +602,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
, host->rrd_history_entries
, rrd_memory_mode_name(host->rrd_memory_mode)
, (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
+ , host->tags
);
#endif // NETDATA_INTERNAL_CHECKS
@@ -560,7 +631,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
}
// remove the non-blocking flag from the socket
- if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
+ if(sock_delnonblock(fd) < 0)
error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", host->hostname, client_ip, client_port, fd);
// convert the socket to a FILE *
@@ -572,7 +643,11 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
}
rrdhost_wrlock(host);
+ if(host->connected_senders > 0)
+ info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port);
+
host->connected_senders++;
+ rrdhost_flag_clear(host, RRDHOST_ORPHAN);
if(health_enabled != CONFIG_BOOLEAN_NO)
host->health_delay_up_to = now_realtime_sec() + alarms_delay;
rrdhost_unlock(host);
@@ -586,6 +661,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
host->senders_disconnected_time = now_realtime_sec();
host->connected_senders--;
if(!host->connected_senders) {
+ rrdhost_flag_set(host, RRDHOST_ORPHAN);
if(health_enabled == CONFIG_BOOLEAN_AUTO)
host->health_enabled = 0;
}
@@ -603,14 +679,16 @@ struct rrdpush_thread {
int fd;
char *key;
char *hostname;
+ char *registry_hostname;
char *machine_guid;
char *os;
+ char *tags;
char *client_ip;
char *client_port;
int update_every;
};
-void *rrdpush_receiver_thread(void *ptr) {
+static void *rrdpush_receiver_thread(void *ptr) {
struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr;
if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
@@ -621,13 +699,15 @@ void *rrdpush_receiver_thread(void *ptr) {
info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
- rrdpush_receive(rpt->fd, rpt->key, rpt->hostname, rpt->machine_guid, rpt->os, rpt->update_every, rpt->client_ip, rpt->client_port);
+ rrdpush_receive(rpt->fd, rpt->key, rpt->hostname, rpt->registry_hostname, rpt->machine_guid, rpt->os, rpt->tags, rpt->update_every, rpt->client_ip, rpt->client_port);
info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
freez(rpt->key);
freez(rpt->hostname);
+ freez(rpt->registry_hostname);
freez(rpt->machine_guid);
freez(rpt->os);
+ freez(rpt->tags);
freez(rpt->client_ip);
freez(rpt->client_port);
freez(rpt);
@@ -636,7 +716,7 @@ void *rrdpush_receiver_thread(void *ptr) {
return NULL;
}
-void rrdpush_sender_thread_spawn(RRDHOST *host) {
+static void rrdpush_sender_thread_spawn(RRDHOST *host) {
rrdhost_wrlock(host);
if(!host->rrdpush_spawn) {
@@ -646,7 +726,6 @@ void rrdpush_sender_thread_spawn(RRDHOST *host) {
else if(pthread_detach(host->rrdpush_thread))
error("STREAM %s [send]: cannot request detach newly created thread.", host->hostname);
- rrdhost_flag_clear(host, RRDHOST_ORPHAN);
host->rrdpush_spawn = 1;
}
@@ -658,7 +737,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
info("STREAM [receive from [%s]:%s]: new client connection.", w->client_ip, w->client_port);
- char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = "unknown";
+ char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *tags = NULL;
int update_every = default_rrd_update_every;
char buf[GUID_LEN + 1];
@@ -674,12 +753,18 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
key = value;
else if(!strcmp(name, "hostname"))
hostname = value;
+ else if(!strcmp(name, "registry_hostname"))
+ registry_hostname = value;
else if(!strcmp(name, "machine_guid"))
machine_guid = value;
else if(!strcmp(name, "update_every"))
update_every = (int)strtoul(value, NULL, 0);
else if(!strcmp(name, "os"))
os = value;
+ else if(!strcmp(name, "tags"))
+ tags = value;
+ else
+ info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", w->client_ip, w->client_port, key, value);
}
if(!key || !*key) {
@@ -704,21 +789,21 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
}
if(regenerate_guid(key, buf) == -1) {
- error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID. Forbidding access.", w->client_ip, w->client_port, key);
+ error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "Your API key is invalid.");
return 401;
}
if(regenerate_guid(machine_guid, buf) == -1) {
- error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, key);
+ error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "Your machine GUID is invalid.");
return 404;
}
if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) {
- error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
+ error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "Your API key is not permitted access.");
return 401;
@@ -732,14 +817,16 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
}
struct rrdpush_thread *rpt = mallocz(sizeof(struct rrdpush_thread));
- rpt->fd = w->ifd;
- rpt->key = strdupz(key);
- rpt->hostname = strdupz(hostname);
- rpt->machine_guid = strdupz(machine_guid);
- rpt->os = strdupz(os);
- rpt->client_ip = strdupz(w->client_ip);
- rpt->client_port = strdupz(w->client_port);
- rpt->update_every = update_every;
+ rpt->fd = w->ifd;
+ rpt->key = strdupz(key);
+ rpt->hostname = strdupz(hostname);
+ rpt->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname);
+ rpt->machine_guid = strdupz(machine_guid);
+ rpt->os = strdupz(os);
+ rpt->tags = (tags)?strdupz(tags):NULL;
+ rpt->client_ip = strdupz(w->client_ip);
+ rpt->client_port = strdupz(w->client_port);
+ rpt->update_every = update_every;
pthread_t thread;
debug(D_SYSTEM, "STREAM [receive from [%s]:%s]: starting receiving thread.", w->client_ip, w->client_port);