diff options
Diffstat (limited to '')
-rw-r--r-- | src/rrdpush.c | 160 |
1 files changed, 99 insertions, 61 deletions
diff --git a/src/rrdpush.c b/src/rrdpush.c index 6def90fe5..c1d052fd8 100644 --- a/src/rrdpush.c +++ b/src/rrdpush.c @@ -113,12 +113,12 @@ static inline void send_chart_definition(RRDSET *st) { // sends the current chart dimensions static inline void send_chart_metrics(RRDSET *st) { - buffer_sprintf(st->rrdhost->rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->upstream_resync_time > st->last_collected_time.tv_sec)?st->usec_since_last_update:0); + buffer_sprintf(st->rrdhost->rrdpush_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->upstream_resync_time > st->last_collected_time.tv_sec)?st->usec_since_last_update:0); RRDDIM *rd; rrddim_foreach_read(rd, st) { if(rd->updated && rd->exposed) - buffer_sprintf(st->rrdhost->rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n" + buffer_sprintf(st->rrdhost->rrdpush_buffer, "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n" , rd->id , rd->collected_value ); @@ -380,7 +380,7 @@ void *rrdpush_sender_thread(void *ptr) { // allow appending data into rrdpush_buffer host->rrdpush_connected = 1; - debug(D_STREAM, "Connected..."); + debug(D_STREAM, "STREAM: Connected on fd %d...", host->rrdpush_socket); } ifd->fd = host->rrdpush_pipe[PIPE_READ]; @@ -389,13 +389,13 @@ void *rrdpush_sender_thread(void *ptr) { ofd->fd = host->rrdpush_socket; ofd->revents = 0; - if(begin < buffer_strlen(host->rrdpush_buffer)) { - debug(D_STREAM, "STREAM: Requesting data output on streaming socket..."); + if(ofd->fd != -1 && begin < buffer_strlen(host->rrdpush_buffer)) { + debug(D_STREAM, "STREAM: Requesting data output on streaming socket %d...", ofd->fd); ofd->events = POLLOUT; fdmax = 2; } else { - debug(D_STREAM, "STREAM: Not requesting data output on streaming socket (nothing to send now)..."); + debug(D_STREAM, "STREAM: Not requesting data output on streaming socket %d (nothing to send now)...", ofd->fd); ofd->events = 0; fdmax = 1; } @@ -419,7 +419,7 @@ void *rrdpush_sender_thread(void *ptr) { break; } else if(likely(retval)) { - if (ifd->revents & POLLIN) { + if (ifd->revents & POLLIN || ifd->revents & POLLPRI) { debug(D_STREAM, "STREAM: Data added to send buffer (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_buffer)); char buffer[1000 + 1]; @@ -427,67 +427,98 @@ void *rrdpush_sender_thread(void *ptr) { error("STREAM %s [send to %s]: cannot read from internal pipe.", host->hostname, connected_to); } - if (ofd->revents & POLLOUT && begin < buffer_strlen(host->rrdpush_buffer)) { - debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_buffer)); - - // BEGIN RRDPUSH LOCKED SESSION - - // during this session, data collectors - // will not be able to append data to our buffer - // but the socket is in non-blocking mode - // so, we will not block at send() - - if (pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0) - error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname); - - debug(D_STREAM, "STREAM: Getting exclusive lock on host..."); - rrdpush_lock(host); - - debug(D_STREAM, "STREAM: Sending data, starting from %zu, size %zu...", begin, buffer_strlen(host->rrdpush_buffer)); - ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT); - if (unlikely(ret == -1)) { - if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) { - debug(D_STREAM, "STREAM: Send failed - closing socket..."); - error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_connection); - close(host->rrdpush_socket); - host->rrdpush_socket = -1; + if (ofd->revents & POLLOUT) { + if (begin < buffer_strlen(host->rrdpush_buffer)) { + debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes, begin = %zu)...", buffer_strlen(host->rrdpush_buffer), begin); + + // BEGIN RRDPUSH LOCKED SESSION + + // during this session, data collectors + // will not be able to append data to our buffer + // but the socket is in non-blocking mode + // so, we will not block at send() + + if (pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0) + error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname); + + debug(D_STREAM, "STREAM: Getting exclusive lock on host..."); + rrdpush_lock(host); + + debug(D_STREAM, "STREAM: Sending data, starting from %zu, size %zu...", begin, buffer_strlen(host->rrdpush_buffer)); + ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT); + if (unlikely(ret == -1)) { + if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) { + debug(D_STREAM, "STREAM: Send failed - closing socket..."); + error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_connection); + close(host->rrdpush_socket); + host->rrdpush_socket = -1; + } + else { + debug(D_STREAM, "STREAM: Send failed - will retry..."); + } + } + else if (likely(ret > 0)) { + // DEBUG - dump the scring to see it + //char c = host->rrdpush_buffer->buffer[begin + ret]; + //host->rrdpush_buffer->buffer[begin + ret] = '\0'; + //debug(D_STREAM, "STREAM: sent from %zu to %zd:\n%s\n", begin, ret, &host->rrdpush_buffer->buffer[begin]); + //host->rrdpush_buffer->buffer[begin + ret] = c; + + sent_connection += ret; + sent_bytes += ret; + begin += ret; + + if (begin == buffer_strlen(host->rrdpush_buffer)) { + // we send it all + + debug(D_STREAM, "STREAM: Sent %zd bytes (the whole buffer)...", ret); + buffer_flush(host->rrdpush_buffer); + begin = 0; + } + else { + debug(D_STREAM, "STREAM: Sent %zd bytes (part of the data buffer)...", ret); + } + + last_sent_t = now_monotonic_sec(); } else { - debug(D_STREAM, "STREAM: Send failed - will retry..."); + debug(D_STREAM, "STREAM: send() returned %zd - closing the socket...", ret); + error("STREAM %s [send to %s]: failed to send metrics (send() returned %zd) - closing connection - we have sent %zu bytes on this connection.", + host->hostname, connected_to, ret, sent_connection); + close(host->rrdpush_socket); + host->rrdpush_socket = -1; } - } - else if(likely(ret > 0)) { - sent_connection += ret; - sent_bytes += ret; - begin += ret; - if (begin == buffer_strlen(host->rrdpush_buffer)) { - // we send it all + debug(D_STREAM, "STREAM: Releasing exclusive lock on host..."); + rrdpush_unlock(host); - debug(D_STREAM, "STREAM: Sent %zd bytes (the whole buffer)...", ret); - buffer_flush(host->rrdpush_buffer); - begin = 0; - } - else { - debug(D_STREAM, "STREAM: Sent %zd bytes (part of the data buffer)...", ret); - } + if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) + error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname); - last_sent_t = now_monotonic_sec(); + // END RRDPUSH LOCKED SESSION } else { - debug(D_STREAM, "STREAM: send() returned %zd - closing the socket...", ret); - error("STREAM %s [send to %s]: failed to send metrics (send() returned %zd) - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, ret, sent_connection); - close(host->rrdpush_socket); - host->rrdpush_socket = -1; + debug(D_STREAM, "STREAM: we have sent the entire buffer, but we received POLLOUT..."); } + } - debug(D_STREAM, "STREAM: Releasing exclusive lock on host..."); - rrdpush_unlock(host); - - if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) - error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname); - - // END RRDPUSH LOCKED SESSION + if(unlikely(ofd->revents & POLLERR)) { + debug(D_STREAM, "STREAM: Send failed (POLLERR) - closing socket..."); + error("STREAM %s [send to %s]: connection reports errors (POLLERR), closing it - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_connection); + close(host->rrdpush_socket); + host->rrdpush_socket = -1; + } + else if(unlikely(ofd->revents & POLLHUP)) { + debug(D_STREAM, "STREAM: Send failed (POLLHUP) - closing socket..."); + error("STREAM %s [send to %s]: connection closed by remote end (POLLHUP) - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_connection); + close(host->rrdpush_socket); + host->rrdpush_socket = -1; + } + else if(unlikely(ofd->revents & POLLNVAL)) { + debug(D_STREAM, "STREAM: Send failed (POLLNVAL) - closing socket..."); + error("STREAM %s [send to %s]: connection is invalid (POLLNVAL), closing it - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_connection); + close(host->rrdpush_socket); + host->rrdpush_socket = -1; } } else { @@ -602,7 +633,7 @@ static int rrdpush_receive(int fd, const char *key, const char *hostname, const , host->rrd_history_entries , rrd_memory_mode_name(host->rrd_memory_mode) , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") - , host->tags + , host->tags?host->tags:"" ); #endif // NETDATA_INTERNAL_CHECKS @@ -648,8 +679,15 @@ static int rrdpush_receive(int fd, const char *key, const char *hostname, const host->connected_senders++; rrdhost_flag_clear(host, RRDHOST_ORPHAN); - if(health_enabled != CONFIG_BOOLEAN_NO) - host->health_delay_up_to = now_realtime_sec() + alarms_delay; + if(health_enabled != CONFIG_BOOLEAN_NO) { + if(alarms_delay > 0) { + host->health_delay_up_to = now_realtime_sec() + alarms_delay; + info("Postponing health checks for %ld seconds, on host '%s', because it was just connected." + , alarms_delay + , host->hostname + ); + } + } rrdhost_unlock(host); // call the plugins.d processor to receive the metrics |