diff options
author | Lennart Weller <lhw@ring0.de> | 2017-07-27 09:55:47 +0000 |
---|---|---|
committer | Lennart Weller <lhw@ring0.de> | 2017-07-27 09:55:47 +0000 |
commit | a133c9c3b637b1dbe7b5b053f7e2572c1950cead (patch) | |
tree | 2207939a88e96bca329457f40a9d9d18ab659dc1 /src/rrdpush.c | |
parent | New upstream version 1.6.0+dfsg (diff) | |
download | netdata-a133c9c3b637b1dbe7b5b053f7e2572c1950cead.tar.xz netdata-a133c9c3b637b1dbe7b5b053f7e2572c1950cead.zip |
New upstream version 1.7.0+dfsgupstream/1.7.0+dfsg
Diffstat (limited to 'src/rrdpush.c')
-rw-r--r-- | src/rrdpush.c | 249 |
1 files changed, 168 insertions, 81 deletions
diff --git a/src/rrdpush.c b/src/rrdpush.c index 72e6d8a7..6def90fe 100644 --- a/src/rrdpush.c +++ b/src/rrdpush.c @@ -57,13 +57,16 @@ int rrdpush_init() { // to its current clock, we send for this many // iterations a BEGIN line without microseconds // this is for the first iterations of each chart -static unsigned int remote_clock_resync_iterations = 60; +unsigned int remote_clock_resync_iterations = 60; #define rrdpush_lock(host) netdata_mutex_lock(&((host)->rrdpush_mutex)) #define rrdpush_unlock(host) netdata_mutex_unlock(&((host)->rrdpush_mutex)) // checks if the current chart definition has been sent static inline int need_to_send_chart_definition(RRDSET *st) { + if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_EXPOSED_UPSTREAM)))) + return 1; + RRDDIM *rd; rrddim_foreach_read(rd, st) if(!rd->exposed) @@ -74,7 +77,9 @@ static inline int need_to_send_chart_definition(RRDSET *st) { // sends the current chart definition static inline void send_chart_definition(RRDSET *st) { - buffer_sprintf(st->rrdhost->rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n" + rrdset_flag_set(st, RRDSET_FLAG_EXPOSED_UPSTREAM); + + buffer_sprintf(st->rrdhost->rrdpush_buffer, "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s\"\n" , st->id , st->name , st->title @@ -84,11 +89,14 @@ static inline void send_chart_definition(RRDSET *st) { , rrdset_type_name(st->chart_type) , st->priority , st->update_every + , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":"" + , rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":"" + , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":"" ); RRDDIM *rd; rrddim_foreach_read(rd, st) { - buffer_sprintf(st->rrdhost->rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n" + buffer_sprintf(st->rrdhost->rrdpush_buffer, "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s\"\n" , rd->id , rd->name , rrd_algorithm_name(rd->algorithm) @@ -99,11 +107,13 @@ static inline void send_chart_definition(RRDSET *st) { ); rd->exposed = 1; } + + st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); } // sends the current chart dimensions static inline void send_chart_metrics(RRDSET *st) { - buffer_sprintf(st->rrdhost->rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0); + buffer_sprintf(st->rrdhost->rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->upstream_resync_time > st->last_collected_time.tv_sec)?st->usec_since_last_update:0); RRDDIM *rd; rrddim_foreach_read(rd, st) { @@ -117,7 +127,17 @@ static inline void send_chart_metrics(RRDSET *st) { buffer_strcat(st->rrdhost->rrdpush_buffer, "END\n"); } -void rrdpush_sender_thread_spawn(RRDHOST *host); +static void rrdpush_sender_thread_spawn(RRDHOST *host); + +void rrdset_push_chart_definition(RRDSET *st) { + RRDHOST *host = st->rrdhost; + + rrdset_rdlock(st); + rrdpush_lock(host); + send_chart_definition(st); + rrdpush_unlock(host); + rrdset_unlock(st); +} void rrdset_done_push(RRDSET *st) { RRDHOST *host = st->rrdhost; @@ -167,9 +187,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { RRDSET *st; rrdset_foreach_read(st, host) { - // make it re-align the current time - // on the remote host - st->counter_done = 0; + st->upstream_resync_time = 0; rrdset_rdlock(st); @@ -219,8 +237,6 @@ static void rrdpush_sender_thread_cleanup_locked_all(RRDHOST *host) { host->rrdpush_buffer = NULL; host->rrdpush_spawn = 0; - - rrdhost_flag_set(host, RRDHOST_ORPHAN); } void rrdpush_sender_thread_stop(RRDHOST *host) { @@ -274,6 +290,7 @@ void *rrdpush_sender_thread(void *ptr) { .tv_usec = 0 }; + time_t last_sent_t = 0; struct pollfd fds[2], *ifd, *ofd; nfds_t fdmax; @@ -281,8 +298,16 @@ void *rrdpush_sender_thread(void *ptr) { ofd = &fds[1]; for(; host->rrdpush_enabled && !netdata_exit ;) { + debug(D_STREAM, "STREAM: Checking if we need to timeout the connection..."); + if(host->rrdpush_socket != -1 && now_monotonic_sec() - last_sent_t > timeout) { + error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, timeout, sent_connection); + close(host->rrdpush_socket); + host->rrdpush_socket = -1; + } if(unlikely(host->rrdpush_socket == -1)) { + debug(D_STREAM, "STREAM: Attempting to connect..."); + // stop appending data into rrdpush_buffer // they will be lost, so there is no point to do it host->rrdpush_connected = 0; @@ -298,16 +323,19 @@ void *rrdpush_sender_thread(void *ptr) { info("STREAM %s [send to %s]: initializing communication...", host->hostname, connected_to); - char http[1000 + 1]; - snprintfz(http, 1000, - "STREAM key=%s&hostname=%s&machine_guid=%s&os=%s&update_every=%d HTTP/1.1\r\n" + #define HTTP_HEADER_SIZE 8192 + char http[HTTP_HEADER_SIZE + 1]; + snprintfz(http, HTTP_HEADER_SIZE, + "STREAM key=%s&hostname=%s®istry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&tags=%s HTTP/1.1\r\n" "User-Agent: netdata-push-service/%s\r\n" "Accept: */*\r\n\r\n" , host->rrdpush_api_key , host->hostname + , host->registry_hostname , host->machine_guid - , host->os , default_rrd_update_every + , host->os + , (host->tags)?host->tags:"" , program_version ); @@ -321,7 +349,7 @@ void *rrdpush_sender_thread(void *ptr) { info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, connected_to); - if(recv_timeout(host->rrdpush_socket, http, 1000, 0, timeout) == -1) { + if(recv_timeout(host->rrdpush_socket, http, HTTP_HEADER_SIZE, 0, timeout) == -1) { close(host->rrdpush_socket); host->rrdpush_socket = -1; error("STREAM %s [send to %s]: failed to initialize communication", host->hostname, connected_to); @@ -338,15 +366,21 @@ void *rrdpush_sender_thread(void *ptr) { } info("STREAM %s [send to %s]: established communication - sending metrics...", host->hostname, connected_to); + last_sent_t = now_monotonic_sec(); - if(fcntl(host->rrdpush_socket, F_SETFL, O_NONBLOCK) < 0) + if(sock_setnonblock(host->rrdpush_socket) < 0) error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, connected_to); + if(sock_enlarge_out(host->rrdpush_socket) < 0) + error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", host->hostname, connected_to); + rrdpush_sender_thread_data_flush(host); sent_connection = 0; // allow appending data into rrdpush_buffer host->rrdpush_connected = 1; + + debug(D_STREAM, "Connected..."); } ifd->fd = host->rrdpush_pipe[PIPE_READ]; @@ -356,82 +390,113 @@ void *rrdpush_sender_thread(void *ptr) { ofd->fd = host->rrdpush_socket; ofd->revents = 0; if(begin < buffer_strlen(host->rrdpush_buffer)) { + debug(D_STREAM, "STREAM: Requesting data output on streaming socket..."); ofd->events = POLLOUT; fdmax = 2; } else { + debug(D_STREAM, "STREAM: Not requesting data output on streaming socket (nothing to send now)..."); ofd->events = 0; fdmax = 1; } + debug(D_STREAM, "STREAM: Waiting for poll() events (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_buffer)); if(netdata_exit) break; - int retval = poll(fds, fdmax, timeout * 1000); + int retval = poll(fds, fdmax, 1000); if(netdata_exit) break; if(unlikely(retval == -1)) { - if(errno == EAGAIN || errno == EINTR) + debug(D_STREAM, "STREAM: poll() failed (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_buffer)); + + if(errno == EAGAIN || errno == EINTR) { + debug(D_STREAM, "STREAM: poll() failed with EAGAIN or EINTR..."); continue; + } error("STREAM %s [send to %s]: failed to poll().", host->hostname, connected_to); close(host->rrdpush_socket); host->rrdpush_socket = -1; break; } - else if(unlikely(!retval)) { - // timeout - continue; - } - - if(ifd->revents & POLLIN) { - char buffer[1000 + 1]; - if(read(host->rrdpush_pipe[PIPE_READ], buffer, 1000) == -1) - error("STREAM %s [send to %s]: cannot read from internal pipe.", host->hostname, connected_to); - } - - if(ofd->revents & POLLOUT && begin < buffer_strlen(host->rrdpush_buffer)) { + else if(likely(retval)) { + if (ifd->revents & POLLIN) { + debug(D_STREAM, "STREAM: Data added to send buffer (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_buffer)); - // BEGIN RRDPUSH LOCKED SESSION - - // during this session, data collectors - // will not be able to append data to our buffer - // but the socket is in non-blocking mode - // so, we will not block at send() - - if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0) - error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname); - - rrdpush_lock(host); + char buffer[1000 + 1]; + if (read(host->rrdpush_pipe[PIPE_READ], buffer, 1000) == -1) + error("STREAM %s [send to %s]: cannot read from internal pipe.", host->hostname, connected_to); + } - ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT); - if(ret == -1) { - if(errno != EAGAIN && errno != EINTR) { - error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_connection); + if (ofd->revents & POLLOUT && begin < buffer_strlen(host->rrdpush_buffer)) { + debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_buffer)); + + // BEGIN RRDPUSH LOCKED SESSION + + // during this session, data collectors + // will not be able to append data to our buffer + // but the socket is in non-blocking mode + // so, we will not block at send() + + if (pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0) + error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname); + + debug(D_STREAM, "STREAM: Getting exclusive lock on host..."); + rrdpush_lock(host); + + debug(D_STREAM, "STREAM: Sending data, starting from %zu, size %zu...", begin, buffer_strlen(host->rrdpush_buffer)); + ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT); + if (unlikely(ret == -1)) { + if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) { + debug(D_STREAM, "STREAM: Send failed - closing socket..."); + error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_connection); + close(host->rrdpush_socket); + host->rrdpush_socket = -1; + } + else { + debug(D_STREAM, "STREAM: Send failed - will retry..."); + } + } + else if(likely(ret > 0)) { + sent_connection += ret; + sent_bytes += ret; + begin += ret; + + if (begin == buffer_strlen(host->rrdpush_buffer)) { + // we send it all + + debug(D_STREAM, "STREAM: Sent %zd bytes (the whole buffer)...", ret); + buffer_flush(host->rrdpush_buffer); + begin = 0; + } + else { + debug(D_STREAM, "STREAM: Sent %zd bytes (part of the data buffer)...", ret); + } + + last_sent_t = now_monotonic_sec(); + } + else { + debug(D_STREAM, "STREAM: send() returned %zd - closing the socket...", ret); + error("STREAM %s [send to %s]: failed to send metrics (send() returned %zd) - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, ret, sent_connection); close(host->rrdpush_socket); host->rrdpush_socket = -1; } - } - else { - sent_connection += ret; - sent_bytes += ret; - begin += ret; - if(begin == buffer_strlen(host->rrdpush_buffer)) { - // we send it all - - buffer_flush(host->rrdpush_buffer); - begin = 0; - } - } - rrdpush_unlock(host); + debug(D_STREAM, "STREAM: Releasing exclusive lock on host..."); + rrdpush_unlock(host); - if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) - error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname); + if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) + error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname); - // END RRDPUSH LOCKED SESSION + // END RRDPUSH LOCKED SESSION + } + } + else { + debug(D_STREAM, "STREAM: poll() timed out."); } // protection from overflow - if(host->rrdpush_buffer->len > max_size) { + if(buffer_strlen(host->rrdpush_buffer) > max_size) { + debug(D_STREAM, "STREAM: Buffer is too big (%zu bytes), bigger than the max (%zu) - flushing it...", buffer_strlen(host->rrdpush_buffer), max_size); errno = 0; error("STREAM %s [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", host->hostname, connected_to, host->rrdpush_buffer->len, host->rrdpush_buffer->len - begin, sent_bytes, sent_connection); if(host->rrdpush_socket != -1) { @@ -464,7 +529,7 @@ cleanup: // ---------------------------------------------------------------------------- // rrdpush receiver thread -int rrdpush_receive(int fd, const char *key, const char *hostname, const char *machine_guid, const char *os, int update_every, char *client_ip, char *client_port) { +static int rrdpush_receive(int fd, const char *key, const char *hostname, const char *registry_hostname, const char *machine_guid, const char *os, const char *tags, int update_every, char *client_ip, char *client_port) { RRDHOST *host; int history = default_rrd_history_entries; RRD_MEMORY_MODE mode = default_rrd_memory_mode; @@ -499,13 +564,18 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key); rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key); + tags = appconfig_set_default(&stream_config, machine_guid, "host tags", (tags)?tags:""); + if(tags && !*tags) tags = NULL; + if(!strcmp(machine_guid, "localhost")) host = localhost; else host = rrdhost_find_or_create( hostname + , registry_hostname , machine_guid , os + , tags , update_every , history , mode @@ -522,7 +592,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m } #ifdef NETDATA_INTERNAL_CHECKS - info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s" + info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s, tags '%s'" , hostname , client_ip , client_port @@ -532,6 +602,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m , host->rrd_history_entries , rrd_memory_mode_name(host->rrd_memory_mode) , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") + , host->tags ); #endif // NETDATA_INTERNAL_CHECKS @@ -560,7 +631,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m } // remove the non-blocking flag from the socket - if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) & ~O_NONBLOCK) == -1) + if(sock_delnonblock(fd) < 0) error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", host->hostname, client_ip, client_port, fd); // convert the socket to a FILE * @@ -572,7 +643,11 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m } rrdhost_wrlock(host); + if(host->connected_senders > 0) + info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port); + host->connected_senders++; + rrdhost_flag_clear(host, RRDHOST_ORPHAN); if(health_enabled != CONFIG_BOOLEAN_NO) host->health_delay_up_to = now_realtime_sec() + alarms_delay; rrdhost_unlock(host); @@ -586,6 +661,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m host->senders_disconnected_time = now_realtime_sec(); host->connected_senders--; if(!host->connected_senders) { + rrdhost_flag_set(host, RRDHOST_ORPHAN); if(health_enabled == CONFIG_BOOLEAN_AUTO) host->health_enabled = 0; } @@ -603,14 +679,16 @@ struct rrdpush_thread { int fd; char *key; char *hostname; + char *registry_hostname; char *machine_guid; char *os; + char *tags; char *client_ip; char *client_port; int update_every; }; -void *rrdpush_receiver_thread(void *ptr) { +static void *rrdpush_receiver_thread(void *ptr) { struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr; if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0) @@ -621,13 +699,15 @@ void *rrdpush_receiver_thread(void *ptr) { info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); - rrdpush_receive(rpt->fd, rpt->key, rpt->hostname, rpt->machine_guid, rpt->os, rpt->update_every, rpt->client_ip, rpt->client_port); + rrdpush_receive(rpt->fd, rpt->key, rpt->hostname, rpt->registry_hostname, rpt->machine_guid, rpt->os, rpt->tags, rpt->update_every, rpt->client_ip, rpt->client_port); info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); freez(rpt->key); freez(rpt->hostname); + freez(rpt->registry_hostname); freez(rpt->machine_guid); freez(rpt->os); + freez(rpt->tags); freez(rpt->client_ip); freez(rpt->client_port); freez(rpt); @@ -636,7 +716,7 @@ void *rrdpush_receiver_thread(void *ptr) { return NULL; } -void rrdpush_sender_thread_spawn(RRDHOST *host) { +static void rrdpush_sender_thread_spawn(RRDHOST *host) { rrdhost_wrlock(host); if(!host->rrdpush_spawn) { @@ -646,7 +726,6 @@ void rrdpush_sender_thread_spawn(RRDHOST *host) { else if(pthread_detach(host->rrdpush_thread)) error("STREAM %s [send]: cannot request detach newly created thread.", host->hostname); - rrdhost_flag_clear(host, RRDHOST_ORPHAN); host->rrdpush_spawn = 1; } @@ -658,7 +737,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url info("STREAM [receive from [%s]:%s]: new client connection.", w->client_ip, w->client_port); - char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = "unknown"; + char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *tags = NULL; int update_every = default_rrd_update_every; char buf[GUID_LEN + 1]; @@ -674,12 +753,18 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url key = value; else if(!strcmp(name, "hostname")) hostname = value; + else if(!strcmp(name, "registry_hostname")) + registry_hostname = value; else if(!strcmp(name, "machine_guid")) machine_guid = value; else if(!strcmp(name, "update_every")) update_every = (int)strtoul(value, NULL, 0); else if(!strcmp(name, "os")) os = value; + else if(!strcmp(name, "tags")) + tags = value; + else + info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", w->client_ip, w->client_port, key, value); } if(!key || !*key) { @@ -704,21 +789,21 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url } if(regenerate_guid(key, buf) == -1) { - error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID. Forbidding access.", w->client_ip, w->client_port, key); + error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key); buffer_flush(w->response.data); buffer_sprintf(w->response.data, "Your API key is invalid."); return 401; } if(regenerate_guid(machine_guid, buf) == -1) { - error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, key); + error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid); buffer_flush(w->response.data); buffer_sprintf(w->response.data, "Your machine GUID is invalid."); return 404; } if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) { - error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid); + error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key); buffer_flush(w->response.data); buffer_sprintf(w->response.data, "Your API key is not permitted access."); return 401; @@ -732,14 +817,16 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url } struct rrdpush_thread *rpt = mallocz(sizeof(struct rrdpush_thread)); - rpt->fd = w->ifd; - rpt->key = strdupz(key); - rpt->hostname = strdupz(hostname); - rpt->machine_guid = strdupz(machine_guid); - rpt->os = strdupz(os); - rpt->client_ip = strdupz(w->client_ip); - rpt->client_port = strdupz(w->client_port); - rpt->update_every = update_every; + rpt->fd = w->ifd; + rpt->key = strdupz(key); + rpt->hostname = strdupz(hostname); + rpt->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname); + rpt->machine_guid = strdupz(machine_guid); + rpt->os = strdupz(os); + rpt->tags = (tags)?strdupz(tags):NULL; + rpt->client_ip = strdupz(w->client_ip); + rpt->client_port = strdupz(w->client_port); + rpt->update_every = update_every; pthread_t thread; debug(D_SYSTEM, "STREAM [receive from [%s]:%s]: starting receiving thread.", w->client_ip, w->client_port); |