summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/receiver.c420
1 files changed, 253 insertions, 167 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 10ef8b7d3..a12b94fb4 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
+#include "web/server/h2o/http_server.h"
extern struct config stream_config;
@@ -28,9 +29,7 @@ void receiver_state_free(struct receiver_state *rpt) {
close(rpt->fd);
}
-#ifdef ENABLE_RRDPUSH_COMPRESSION
rrdpush_decompressor_destroy(&rpt->decompressor);
-#endif
if(rpt->system_info)
rrdhost_system_info_free(rpt->system_info);
@@ -59,6 +58,11 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz
return 0;
}
+#ifdef ENABLE_H2O
+ if (is_h2o_rrdpush(r))
+ return (int)h2o_stream_read(r->h2o_ctx, buffer, size);
+#endif
+
int tries = 100;
ssize_t bytes_read;
@@ -92,15 +96,44 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz
return (int)bytes_read;
}
-static inline bool receiver_read_uncompressed(struct receiver_state *r) {
+static inline STREAM_HANDSHAKE read_stream_error_to_reason(int code) {
+ if(code > 0)
+ return 0;
+
+ switch(code) {
+ case 0:
+ // asked to read zero bytes
+ return STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER;
+
+ case -1:
+ // EOF
+ return STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF;
+
+ case -2:
+ // failed to read
+ return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED;
+
+ case -3:
+ // timeout
+ return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT;
+
+ default:
+ // anything else
+ return STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
+ }
+}
+
+static inline bool receiver_read_uncompressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) {
#ifdef NETDATA_INTERNAL_CHECKS
if(r->reader.read_buffer[r->reader.read_len] != '\0')
fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
#endif
int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1);
- if(unlikely(bytes_read <= 0))
+ if(unlikely(bytes_read <= 0)) {
+ *reason = read_stream_error_to_reason(bytes_read);
return false;
+ }
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read);
@@ -111,8 +144,7 @@ static inline bool receiver_read_uncompressed(struct receiver_state *r) {
return true;
}
-#ifdef ENABLE_RRDPUSH_COMPRESSION
-static inline bool receiver_read_compressed(struct receiver_state *r) {
+static inline bool receiver_read_compressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) {
internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0',
"%s: read_buffer does not start with zero #2", __FUNCTION__ );
@@ -150,8 +182,10 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
int bytes_read = 0;
do {
int ret = read_stream(r, r->reader.read_buffer + r->reader.read_len + bytes_read, r->decompressor.signature_size - bytes_read);
- if (unlikely(ret <= 0))
+ if (unlikely(ret <= 0)) {
+ *reason = read_stream_error_to_reason(ret);
return false;
+ }
bytes_read += ret;
} while(unlikely(bytes_read < (int)r->decompressor.signature_size));
@@ -187,7 +221,7 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
int last_read_bytes = read_stream(r, &compressed[start], remaining);
if (unlikely(last_read_bytes <= 0)) {
- internal_error(true, "read_stream() failed #2, with code %d", last_read_bytes);
+ *reason = read_stream_error_to_reason(last_read_bytes);
return false;
}
@@ -217,57 +251,6 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
return true;
}
-#else // !ENABLE_RRDPUSH_COMPRESSION
-static inline bool receiver_read_compressed(struct receiver_state *r) {
- return receiver_read_uncompressed(r);
-}
-#endif // ENABLE_RRDPUSH_COMPRESSION
-
-/* Produce a full line if one exists, statefully return where we start next time.
- * When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
- */
-inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) {
- buffer_need_bytes(dst, reader->read_len - reader->pos + 2);
-
- size_t start = reader->pos;
-
- char *ss = &reader->read_buffer[start];
- char *se = &reader->read_buffer[reader->read_len];
- char *ds = &dst->buffer[dst->len];
- char *de = &ds[dst->size - dst->len - 2];
-
- if(ss >= se) {
- *ds = '\0';
- reader->pos = 0;
- reader->read_len = 0;
- reader->read_buffer[reader->read_len] = '\0';
- return false;
- }
-
- // copy all bytes to buffer
- while(ss < se && ds < de && *ss != '\n') {
- *ds++ = *ss++;
- dst->len++;
- }
-
- // if we have a newline, return the buffer
- if(ss < se && ds < de && *ss == '\n') {
- // newline found in the r->read_buffer
-
- *ds++ = *ss++; // copy the newline too
- dst->len++;
-
- *ds = '\0';
-
- reader->pos = ss - reader->read_buffer;
- return true;
- }
-
- reader->pos = 0;
- reader->read_len = 0;
- reader->read_buffer[reader->read_len] = '\0';
- return false;
-}
bool plugin_is_enabled(struct plugind *cd);
@@ -315,6 +298,10 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
parser = parser_init(&user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
}
+#ifdef ENABLE_H2O
+ parser->h2o_ctx = rpt->h2o_ctx;
+#endif
+
pluginsd_keywords_init(parser, PARSER_INIT_STREAMING);
rrd_collector_started();
@@ -323,43 +310,59 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- bool compressed_connection = false;
+ {
+ bool compressed_connection = rrdpush_decompression_initialize(rpt);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
- if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
- compressed_connection = true;
- rrdpush_decompressor_reset(&rpt->decompressor);
- }
- else
- rrdpush_decompressor_destroy(&rpt->decompressor);
+ buffered_reader_init(&rpt->reader);
+
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ {
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(
+ rpt->host) : "unknown"
+ );
+ parser->user.stream_log_fp = fopen(filename, "w");
+ parser->user.stream_log_repertoire = PARSER_REP_METADATA;
+ }
#endif
- buffered_reader_init(&rpt->reader);
+ CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
- BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
- while(!receiver_should_stop(rpt)) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
+ ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
- if(!buffered_reader_next_line(&rpt->reader, buffer)) {
- bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt);
+ while(!receiver_should_stop(rpt)) {
- if(unlikely(!have_new_data)) {
- receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, false);
- break;
- }
+ if(!buffered_reader_next_line(&rpt->reader, buffer)) {
+ STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
- continue;
- }
+ bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason)
+ : receiver_read_uncompressed(rpt, &reason);
- if (unlikely(parser_action(parser, buffer->buffer))) {
- receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
- break;
- }
+ if(unlikely(!have_new_data)) {
+ receiver_set_exit_reason(rpt, reason, false);
+ break;
+ }
- buffer->len = 0;
- buffer->buffer[0] = '\0';
- }
- buffer_free(buffer);
- result = parser->user.data_collections_count;
+ continue;
+ }
+
+ if(unlikely(parser_action(parser, buffer->buffer))) {
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
+ break;
+ }
+
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ result = parser->user.data_collections_count;
+ }
// free parser with the pop function
netdata_thread_cleanup_pop(1);
@@ -400,10 +403,10 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) {
if (rpt->config.alarms_delay > 0) {
host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay;
- netdata_log_health(
- "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
- rrdhost_hostname(host),
- (int64_t) rpt->config.alarms_delay);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
+ rrdhost_hostname(host),
+ (int64_t) rpt->config.alarms_delay);
}
}
@@ -521,26 +524,31 @@ static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *r
5);
}
-void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) {
-
- log_stream_connection(rpt->client_ip, rpt->client_port,
- (rpt->key && *rpt->key)? rpt->key : "-",
- (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-",
- (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-",
- status);
-
- netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
- "%s. "
- "STATUS: %s%s%s%s"
- , rpt->hostname
- , rpt->client_ip, rpt->client_port
- , msg
- , status
- , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":""
- , stream_handshake_error_to_string(rpt->exit.reason)
- , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":""
- );
-
+void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) {
+ // this function may be called BEFORE we spawn the receiver thread
+ // so, we need to add the fields again (it does not harm)
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
+ ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""),
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status),
+ ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_from_child_msgid),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ nd_log(NDLS_ACCESS, priority, "api_key:'%s' machine_guid:'%s' msg:'%s'"
+ , (rpt->key && *rpt->key)? rpt->key : ""
+ , (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : ""
+ , msg);
+
+ nd_log(NDLS_DAEMON, priority, "STREAM_RECEIVER for '%s': %s %s%s%s"
+ , (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""
+ , msg
+ , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":""
+ , stream_handshake_error_to_string(rpt->exit.reason)
+ , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":""
+ );
}
static void rrdpush_receive(struct receiver_state *rpt)
@@ -611,11 +619,19 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step);
rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
rpt->config.rrdpush_compression = default_rrdpush_compression_enabled;
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression);
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression);
-#endif // ENABLE_RRDPUSH_COMPRESSION
+
+ bool is_ephemeral = false;
+ is_ephemeral = appconfig_get_boolean(&stream_config, rpt->key, "is ephemeral node", CONFIG_BOOLEAN_NO);
+ is_ephemeral = appconfig_get_boolean(&stream_config, rpt->machine_guid, "is ephemeral node", is_ephemeral);
+
+ if(rpt->config.rrdpush_compression) {
+ char *order = appconfig_get(&stream_config, rpt->key, "compression algorithms order", RRDPUSH_COMPRESSION_ALGORITHMS_ORDER);
+ order = appconfig_get(&stream_config, rpt->machine_guid, "compression algorithms order", order);
+ rrdpush_parse_compression_order(rpt, order);
+ }
(void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
@@ -623,39 +639,46 @@ static void rrdpush_receive(struct receiver_state *rpt)
{
// this will also update the host with our system_info
RRDHOST *host = rrdhost_find_or_create(
- rpt->hostname
- , rpt->registry_hostname
- , rpt->machine_guid
- , rpt->os
- , rpt->timezone
- , rpt->abbrev_timezone
- , rpt->utc_offset
- , rpt->tags
- , rpt->program_name
- , rpt->program_version
- , rpt->config.update_every
- , rpt->config.history
- , rpt->config.mode
- , (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO)
- , (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && *rpt->config.rrdpush_api_key)
- , rpt->config.rrdpush_destination
- , rpt->config.rrdpush_api_key
- , rpt->config.rrdpush_send_charts_matching
- , rpt->config.rrdpush_enable_replication
- , rpt->config.rrdpush_seconds_to_replicate
- , rpt->config.rrdpush_replication_step
- , rpt->system_info
- , 0
- );
+ rpt->hostname,
+ rpt->registry_hostname,
+ rpt->machine_guid,
+ rpt->os,
+ rpt->timezone,
+ rpt->abbrev_timezone,
+ rpt->utc_offset,
+ rpt->tags,
+ rpt->program_name,
+ rpt->program_version,
+ rpt->config.update_every,
+ rpt->config.history,
+ rpt->config.mode,
+ (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO),
+ (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination &&
+ *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key &&
+ *rpt->config.rrdpush_api_key),
+ rpt->config.rrdpush_destination,
+ rpt->config.rrdpush_api_key,
+ rpt->config.rrdpush_send_charts_matching,
+ rpt->config.rrdpush_enable_replication,
+ rpt->config.rrdpush_seconds_to_replicate,
+ rpt->config.rrdpush_replication_step,
+ rpt->system_info,
+ 0);
if(!host) {
- rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION");
+ rrdpush_receive_log_status(
+ rpt,"failed to find/create host structure, rejecting connection",
+ RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR);
goto cleanup;
}
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
- rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER");
+ rrdpush_receive_log_status(
+ rpt, "host is initializing, retry later",
+ RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS, NDLP_NOTICE);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION);
goto cleanup;
}
@@ -664,7 +687,10 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->system_info = NULL;
if(!rrdhost_set_receiver(host, rpt)) {
- rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION");
+ rrdpush_receive_log_status(
+ rpt, "host is already served by another receiver",
+ RRDPUSH_STATUS_DUPLICATE_RECEIVER, NDLP_INFO);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING);
goto cleanup;
}
@@ -709,12 +735,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
- if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
- if (!rpt->config.rrdpush_compression)
- rpt->capabilities &= ~STREAM_CAP_COMPRESSION;
- }
-#endif // ENABLE_RRDPUSH_COMPRESSION
+ rrdpush_select_receiver_compression_algorithm(rpt);
{
// netdata_log_info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
@@ -737,19 +758,32 @@ static void rrdpush_receive(struct receiver_state *rpt)
}
netdata_log_debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
- ssize_t bytes_sent = send_timeout(
+#ifdef ENABLE_H2O
+ if (is_h2o_rrdpush(rpt)) {
+ h2o_stream_write(rpt->h2o_ctx, initial_response, strlen(initial_response));
+ } else {
+#endif
+ ssize_t bytes_sent = send_timeout(
#ifdef ENABLE_HTTPS
- &rpt->ssl,
+ &rpt->ssl,
#endif
- rpt->fd, initial_response, strlen(initial_response), 0, 60);
-
- if(bytes_sent != (ssize_t)strlen(initial_response)) {
- internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response));
- rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION");
- goto cleanup;
+ rpt->fd, initial_response, strlen(initial_response), 0, 60);
+
+ if(bytes_sent != (ssize_t)strlen(initial_response)) {
+ internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response));
+ rrdpush_receive_log_status(
+ rpt, "cannot reply back, dropping connection",
+ RRDPUSH_STATUS_CANT_REPLY, NDLP_ERR);
+ goto cleanup;
+ }
+#ifdef ENABLE_H2O
}
+#endif
}
+#ifdef ENABLE_H2O
+ unless_h2o_rrdpush(rpt)
+#endif
{
// remove the non-blocking flag from the socket
if(sock_delnonblock(rpt->fd) < 0)
@@ -770,17 +804,22 @@ static void rrdpush_receive(struct receiver_state *rpt)
, rpt->fd);
}
- rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED");
+ rrdpush_receive_log_status(
+ rpt, "connected and ready to receive data",
+ RRDPUSH_STATUS_CONNECTED, NDLP_INFO);
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
if (netdata_cloud_enabled)
- aclk_host_state_update(rpt->host, 1);
+ aclk_host_state_update(rpt->host, 1, 1);
#endif
rrdhost_set_is_parent_label();
+ if (is_ephemeral)
+ rrdhost_option_set(rpt->host, RRDHOST_OPTION_EPHEMERAL_HOST);
+
// let it reconnect to parent immediately
rrdpush_reset_destinations_postpone_time(rpt->host);
@@ -796,15 +835,17 @@ static void rrdpush_receive(struct receiver_state *rpt)
{
char msg[100 + 1];
- snprintfz(msg, 100, "disconnected (completed %zu updates)", count);
- rrdpush_receive_log_status(rpt, msg, "DISCONNECTED");
+ snprintfz(msg, sizeof(msg) - 1, "disconnected (completed %zu updates)", count);
+ rrdpush_receive_log_status(
+ rpt, msg,
+ RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING);
}
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// a child disconnected
if (netdata_cloud_enabled)
- aclk_host_state_update(rpt->host, 0);
+ aclk_host_state_update(rpt->host, 0, 1);
#endif
cleanup:
@@ -828,19 +869,64 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) {
rrdhost_set_is_parent_label();
}
+static bool stream_receiver_log_capabilities(BUFFER *wb, void *ptr) {
+ struct receiver_state *rpt = ptr;
+ if(!rpt)
+ return false;
+
+ stream_capabilities_to_string(wb, rpt->capabilities);
+ return true;
+}
+
+static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) {
+ struct receiver_state *rpt = ptr;
+ if(!rpt)
+ return false;
+
+#ifdef ENABLE_HTTPS
+ buffer_strcat(wb, SSL_connection(&rpt->ssl) ? "https" : "http");
+#else
+ buffer_strcat(wb, "http");
+#endif
+ return true;
+}
+
void *rrdpush_receiver_thread(void *ptr) {
netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
- worker_register("STREAMRCV");
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT);
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT);
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
+ {
+ worker_register("STREAMRCV");
+
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ,
+ "received bytes", "bytes/s",
+ WORKER_METRIC_INCREMENT);
+
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED,
+ "uncompressed bytes", "bytes/s",
+ WORKER_METRIC_INCREMENT);
- struct receiver_state *rpt = (struct receiver_state *)ptr;
- rpt->tid = gettid();
- netdata_log_info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ "replication completion", "%",
+ WORKER_METRIC_ABSOLUTE);
- rrdpush_receive(rpt);
+ struct receiver_state *rpt = (struct receiver_state *) ptr;
+ rpt->tid = gettid();
+
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
+ ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname),
+ ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt),
+ ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip
+ , rpt->client_port);
+
+ rrdpush_receive(rpt);
+ }
netdata_thread_cleanup_pop(1);
return NULL;