summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c215
1 files changed, 204 insertions, 11 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index e8f8528a7..b083766dd 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -22,6 +22,10 @@ void destroy_receiver_state(struct receiver_state *rpt) {
SSL_free(rpt->ssl.conn);
}
#endif
+#ifdef ENABLE_COMPRESSION
+ if (rpt->decompressor)
+ rpt->decompressor->destroy(&rpt->decompressor);
+#endif
freez(rpt);
}
@@ -69,15 +73,33 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
time_t now = now_realtime_sec(), prev = rrdhost_last_entry_t(host);
time_t gap = 0;
if (prev == 0)
- info("STREAM %s from %s: Initial connection (no gap to check), remote=%ld local=%ld slew=%ld",
- host->hostname, cd->cmd, remote_time, now, now-remote_time);
+ info(
+ "STREAM %s from %s: Initial connection (no gap to check), "
+ "remote=%"PRId64" local=%"PRId64" slew=%"PRId64"",
+ host->hostname,
+ cd->cmd,
+ (int64_t)remote_time,
+ (int64_t)now,
+ (int64_t)now - remote_time);
else {
gap = now - prev;
- info("STREAM %s from %s: Checking for gaps... remote=%ld local=%ld..%ld slew=%ld %ld-sec gap",
- host->hostname, cd->cmd, remote_time, prev, now, remote_time - now, gap);
+ info(
+ "STREAM %s from %s: Checking for gaps... "
+ "remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap",
+ host->hostname,
+ cd->cmd,
+ (int64_t)remote_time,
+ (int64_t)prev,
+ (int64_t)now,
+ (int64_t)(remote_time - now),
+ (int64_t)gap);
}
char message[128];
- sprintf(message,"REPLICATE %ld %ld\n", remote_time - gap, remote_time);
+ sprintf(
+ message,
+ "REPLICATE %"PRId64" %"PRId64"\n",
+ (int64_t)(remote_time - gap),
+ (int64_t)remote_time);
int ret;
#ifdef ENABLE_HTTPS
SSL *conn = host->stream_ssl.conn ;
@@ -141,6 +163,8 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin
return PARSER_RC_OK;
}
+
+#ifndef ENABLE_COMPRESSION
/* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
*/
static int receiver_read(struct receiver_state *r, FILE *fp) {
@@ -168,6 +192,130 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
r->read_len = strlen(r->read_buffer);
return 0;
}
+#else
+/*
+ * The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
+ * if SSL encryption is on, then use SSL API for reading stream data.
+ * Use line oriented fgets() in buffer from receiver_state is provided.
+ * In other cases use fread to read binary data from socket.
+ * Return zero on success and the number of bytes were read using pointer in the last argument.
+ */
+static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t size, int* ret) {
+ if (!ret)
+ return 1;
+ *ret = 0;
+#ifdef ENABLE_HTTPS
+ if (r->ssl.conn && !r->ssl.flags) {
+ ERR_clear_error();
+ if (buffer != r->read_buffer + r->read_len) {
+ *ret = SSL_read(r->ssl.conn, buffer, size);
+ if (*ret > 0 )
+ return 0;
+ } else {
+ // we need to receive data with LF to parse compression header
+ size_t ofs = 0;
+ int res = 0;
+ while (ofs < size) {
+ do {
+ res = SSL_read(r->ssl.conn, buffer + ofs, 1);
+ } while (res == 0);
+
+ if (res < 0)
+ break;
+ if (buffer[ofs] == '\n')
+ break;
+ ofs += res;
+ }
+ if (res > 0) {
+ ofs += res;
+ *ret = ofs;
+ buffer[ofs] = 0;
+ return 0;
+ }
+ }
+ // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket
+ u_long err;
+ char buf[256];
+ while ((err = ERR_get_error()) != 0) {
+ ERR_error_string_n(err, buf, sizeof(buf));
+ error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf);
+ }
+ return 1;
+ }
+#endif
+ if (buffer != r->read_buffer + r->read_len) {
+ // read to external buffer
+ *ret = fread(buffer, 1, size, fp);
+ if (!*ret)
+ return 1;
+ } else {
+ if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp))
+ return 1;
+ *ret = strlen(r->read_buffer);
+ }
+ return 0;
+}
+
+/*
+ * Get the next line of data for parsing.
+ * Return data from the decompressor buffer if available.
+ * Otherwise read next line from the socket and check for compression header.
+ * Return the line was read If no compression header was found.
+ * Otherwise read the entire block of compressed data, decompress it
+ * and return it in receiver_state buffer.
+ * Return zero on success.
+ */
+static int receiver_read(struct receiver_state *r, FILE *fp) {
+ // check any decompressed data present
+ if (r->decompressor &&
+ r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) {
+ size_t available = sizeof(r->read_buffer) - r->read_len;
+ if (available) {
+ size_t len = r->decompressor->get(r->decompressor,
+ r->read_buffer + r->read_len, available);
+ if (!len)
+ return 1;
+ r->read_len += len;
+ }
+ return 0;
+ }
+ int ret = 0;
+ if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret))
+ return 1;
+
+ if (!is_compressed_data(r->read_buffer, ret)) {
+ r->read_len += ret;
+ return 0;
+ }
+
+ if (unlikely(!r->decompressor))
+ r->decompressor = create_decompressor();
+
+ size_t bytes_to_read = r->decompressor->start(r->decompressor,
+ r->read_buffer, ret);
+
+ // Read the entire block of compressed data because
+ // we're unable to decompress incomplete block
+ char compressed[bytes_to_read];
+ do {
+ if (read_stream(r, fp, compressed, bytes_to_read, &ret))
+ return 1;
+ // Send input data to decompressor
+ if (ret)
+ r->decompressor->put(r->decompressor, compressed, ret);
+ bytes_to_read -= ret;
+ } while (bytes_to_read > 0);
+ // Decompress
+ size_t bytes_to_parse = r->decompressor->decompress(r->decompressor);
+ if (!bytes_to_parse)
+ return 1;
+ // Fill read buffer with decompressed data
+ r->read_len = r->decompressor->get(r->decompressor,
+ r->read_buffer, sizeof(r->read_buffer));
+ return 0;
+}
+
+#endif
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
@@ -190,7 +338,6 @@ static char *receiver_next_line(struct receiver_state *r, int *pos) {
return NULL;
}
-
size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) {
size_t result;
PARSER_USER_OBJECT *user = callocz(1, sizeof(*user));
@@ -226,7 +373,11 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
user->parser = parser;
- do {
+#ifdef ENABLE_COMPRESSION
+ if (rpt->decompressor)
+ rpt->decompressor->reset(rpt->decompressor);
+#endif
+ do{
if (receiver_read(rpt, fp))
break;
int pos = 0;
@@ -293,6 +444,13 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching);
rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
+#ifdef ENABLE_COMPRESSION
+ unsigned int rrdpush_compression = default_compression_enabled;
+ rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression);
+ rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression);
+ rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled);
+#endif //ENABLE_COMPRESSION
+
(void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
@@ -345,6 +503,31 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
netdata_mutex_unlock(&rpt->host->receiver_lock);
}
+ else {
+ rrd_wrlock();
+ rrdhost_update(
+ rpt->host,
+ rpt->hostname,
+ rpt->registry_hostname,
+ rpt->machine_guid,
+ rpt->os,
+ rpt->timezone,
+ rpt->abbrev_timezone,
+ rpt->utc_offset,
+ rpt->tags,
+ rpt->program_name,
+ rpt->program_version,
+ rpt->update_every,
+ history,
+ mode,
+ (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO),
+ (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key),
+ rrdpush_destination,
+ rrdpush_api_key,
+ rrdpush_send_charts_matching,
+ rpt->system_info);
+ rrd_unlock();
+ }
#ifdef NETDATA_INTERNAL_CHECKS
int ssl = 0;
@@ -389,6 +572,16 @@ static int rrdpush_receive(struct receiver_state *rpt)
info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
char initial_response[HTTP_HEADER_SIZE];
if (rpt->stream_version > 1) {
+ if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){
+#ifdef ENABLE_COMPRESSION
+ if(!rpt->rrdpush_compression)
+ rpt->stream_version = STREAM_VERSION_CLABELS;
+#else
+ if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) {
+ rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION;
+ }
+#endif
+ }
info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version);
} else if (rpt->stream_version == 1) {
@@ -441,10 +634,10 @@ static int rrdpush_receive(struct receiver_state *rpt)
if(health_enabled != CONFIG_BOOLEAN_NO) {
if(alarms_delay > 0) {
rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay;
- info("Postponing health checks for %ld seconds, on host '%s', because it was just connected."
- , alarms_delay
- , rpt->host->hostname
- );
+ info(
+ "Postponing health checks for %" PRId64 " seconds, on host '%s', because it was just connected.",
+ (int64_t)alarms_delay,
+ rpt->host->hostname);
}
}
rrdhost_unlock(rpt->host);