diff options
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 215 |
1 files changed, 204 insertions, 11 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index e8f8528a7..b083766dd 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -22,6 +22,10 @@ void destroy_receiver_state(struct receiver_state *rpt) { SSL_free(rpt->ssl.conn); } #endif +#ifdef ENABLE_COMPRESSION + if (rpt->decompressor) + rpt->decompressor->destroy(&rpt->decompressor); +#endif freez(rpt); } @@ -69,15 +73,33 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins time_t now = now_realtime_sec(), prev = rrdhost_last_entry_t(host); time_t gap = 0; if (prev == 0) - info("STREAM %s from %s: Initial connection (no gap to check), remote=%ld local=%ld slew=%ld", - host->hostname, cd->cmd, remote_time, now, now-remote_time); + info( + "STREAM %s from %s: Initial connection (no gap to check), " + "remote=%"PRId64" local=%"PRId64" slew=%"PRId64"", + host->hostname, + cd->cmd, + (int64_t)remote_time, + (int64_t)now, + (int64_t)now - remote_time); else { gap = now - prev; - info("STREAM %s from %s: Checking for gaps... remote=%ld local=%ld..%ld slew=%ld %ld-sec gap", - host->hostname, cd->cmd, remote_time, prev, now, remote_time - now, gap); + info( + "STREAM %s from %s: Checking for gaps... " + "remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap", + host->hostname, + cd->cmd, + (int64_t)remote_time, + (int64_t)prev, + (int64_t)now, + (int64_t)(remote_time - now), + (int64_t)gap); } char message[128]; - sprintf(message,"REPLICATE %ld %ld\n", remote_time - gap, remote_time); + sprintf( + message, + "REPLICATE %"PRId64" %"PRId64"\n", + (int64_t)(remote_time - gap), + (int64_t)remote_time); int ret; #ifdef ENABLE_HTTPS SSL *conn = host->stream_ssl.conn ; @@ -141,6 +163,8 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin return PARSER_RC_OK; } + +#ifndef ENABLE_COMPRESSION /* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing. */ static int receiver_read(struct receiver_state *r, FILE *fp) { @@ -168,6 +192,130 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { r->read_len = strlen(r->read_buffer); return 0; } +#else +/* + * The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing. + * if SSL encryption is on, then use SSL API for reading stream data. + * Use line oriented fgets() in buffer from receiver_state is provided. + * In other cases use fread to read binary data from socket. + * Return zero on success and the number of bytes were read using pointer in the last argument. + */ +static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t size, int* ret) { + if (!ret) + return 1; + *ret = 0; +#ifdef ENABLE_HTTPS + if (r->ssl.conn && !r->ssl.flags) { + ERR_clear_error(); + if (buffer != r->read_buffer + r->read_len) { + *ret = SSL_read(r->ssl.conn, buffer, size); + if (*ret > 0 ) + return 0; + } else { + // we need to receive data with LF to parse compression header + size_t ofs = 0; + int res = 0; + while (ofs < size) { + do { + res = SSL_read(r->ssl.conn, buffer + ofs, 1); + } while (res == 0); + + if (res < 0) + break; + if (buffer[ofs] == '\n') + break; + ofs += res; + } + if (res > 0) { + ofs += res; + *ret = ofs; + buffer[ofs] = 0; + return 0; + } + } + // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket + u_long err; + char buf[256]; + while ((err = ERR_get_error()) != 0) { + ERR_error_string_n(err, buf, sizeof(buf)); + error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf); + } + return 1; + } +#endif + if (buffer != r->read_buffer + r->read_len) { + // read to external buffer + *ret = fread(buffer, 1, size, fp); + if (!*ret) + return 1; + } else { + if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp)) + return 1; + *ret = strlen(r->read_buffer); + } + return 0; +} + +/* + * Get the next line of data for parsing. + * Return data from the decompressor buffer if available. + * Otherwise read next line from the socket and check for compression header. + * Return the line was read If no compression header was found. + * Otherwise read the entire block of compressed data, decompress it + * and return it in receiver_state buffer. + * Return zero on success. + */ +static int receiver_read(struct receiver_state *r, FILE *fp) { + // check any decompressed data present + if (r->decompressor && + r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { + size_t available = sizeof(r->read_buffer) - r->read_len; + if (available) { + size_t len = r->decompressor->get(r->decompressor, + r->read_buffer + r->read_len, available); + if (!len) + return 1; + r->read_len += len; + } + return 0; + } + int ret = 0; + if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) + return 1; + + if (!is_compressed_data(r->read_buffer, ret)) { + r->read_len += ret; + return 0; + } + + if (unlikely(!r->decompressor)) + r->decompressor = create_decompressor(); + + size_t bytes_to_read = r->decompressor->start(r->decompressor, + r->read_buffer, ret); + + // Read the entire block of compressed data because + // we're unable to decompress incomplete block + char compressed[bytes_to_read]; + do { + if (read_stream(r, fp, compressed, bytes_to_read, &ret)) + return 1; + // Send input data to decompressor + if (ret) + r->decompressor->put(r->decompressor, compressed, ret); + bytes_to_read -= ret; + } while (bytes_to_read > 0); + // Decompress + size_t bytes_to_parse = r->decompressor->decompress(r->decompressor); + if (!bytes_to_parse) + return 1; + // Fill read buffer with decompressed data + r->read_len = r->decompressor->get(r->decompressor, + r->read_buffer, sizeof(r->read_buffer)); + return 0; +} + +#endif /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. @@ -190,7 +338,6 @@ static char *receiver_next_line(struct receiver_state *r, int *pos) { return NULL; } - size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) { size_t result; PARSER_USER_OBJECT *user = callocz(1, sizeof(*user)); @@ -226,7 +373,11 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp user->parser = parser; - do { +#ifdef ENABLE_COMPRESSION + if (rpt->decompressor) + rpt->decompressor->reset(rpt->decompressor); +#endif + do{ if (receiver_read(rpt, fp)) break; int pos = 0; @@ -293,6 +444,13 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching); rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching); +#ifdef ENABLE_COMPRESSION + unsigned int rrdpush_compression = default_compression_enabled; + rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression); + rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression); + rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled); +#endif //ENABLE_COMPRESSION + (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { @@ -345,6 +503,31 @@ static int rrdpush_receive(struct receiver_state *rpt) } netdata_mutex_unlock(&rpt->host->receiver_lock); } + else { + rrd_wrlock(); + rrdhost_update( + rpt->host, + rpt->hostname, + rpt->registry_hostname, + rpt->machine_guid, + rpt->os, + rpt->timezone, + rpt->abbrev_timezone, + rpt->utc_offset, + rpt->tags, + rpt->program_name, + rpt->program_version, + rpt->update_every, + history, + mode, + (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO), + (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key), + rrdpush_destination, + rrdpush_api_key, + rrdpush_send_charts_matching, + rpt->system_info); + rrd_unlock(); + } #ifdef NETDATA_INTERNAL_CHECKS int ssl = 0; @@ -389,6 +572,16 @@ static int rrdpush_receive(struct receiver_state *rpt) info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port); char initial_response[HTTP_HEADER_SIZE]; if (rpt->stream_version > 1) { + if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){ +#ifdef ENABLE_COMPRESSION + if(!rpt->rrdpush_compression) + rpt->stream_version = STREAM_VERSION_CLABELS; +#else + if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) { + rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; + } +#endif + } info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version); sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version); } else if (rpt->stream_version == 1) { @@ -441,10 +634,10 @@ static int rrdpush_receive(struct receiver_state *rpt) if(health_enabled != CONFIG_BOOLEAN_NO) { if(alarms_delay > 0) { rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay; - info("Postponing health checks for %ld seconds, on host '%s', because it was just connected." - , alarms_delay - , rpt->host->hostname - ); + info( + "Postponing health checks for %" PRId64 " seconds, on host '%s', because it was just connected.", + (int64_t)alarms_delay, + rpt->host->hostname); } } rrdhost_unlock(rpt->host); |