summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/sender.c95
1 files changed, 84 insertions, 11 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 76843518e..c37b158b1 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -377,6 +377,7 @@ struct {
const char *error;
int worker_job_id;
time_t postpone_reconnect_seconds;
+ bool prevent_log;
} stream_responses[] = {
{
.response = START_STREAMING_PROMPT_VN,
@@ -413,6 +414,7 @@ struct {
.error = "remote server rejected this stream, the host we are trying to stream is its localhost",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
+ .prevent_log = true,
},
{
.response = START_STREAMING_ERROR_ALREADY_STREAMING,
@@ -422,6 +424,7 @@ struct {
.error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 2 * 60, // 2 minutes
+ .prevent_log = true,
},
{
.response = START_STREAMING_ERROR_NOT_PERMITTED,
@@ -469,6 +472,7 @@ struct {
.error = "remote node response is not understood, is it Netdata?",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 1 * 60, // 1 minute
+ .prevent_log = false,
}
};
@@ -498,6 +502,7 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
return true;
}
+ bool prevent_log = stream_responses[i].prevent_log;
const char *error = stream_responses[i].error;
int worker_job_id = stream_responses[i].worker_job_id;
time_t delay = stream_responses[i].postpone_reconnect_seconds;
@@ -509,13 +514,18 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
char buf[LOG_DATE_LENGTH];
log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
- netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
- rrdhost_hostname(host), s->connected_to, error, delay, buf);
+
+ if(prevent_log)
+ internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
+ rrdhost_hostname(host), s->connected_to, error, delay, buf);
+ else
+ netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
+ rrdhost_hostname(host), s->connected_to, error, delay, buf);
return false;
}
-static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
+static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
#ifdef ENABLE_HTTPS
RRDHOST *host = s->host;
bool ssl_required = host->destination && host->destination->ssl;
@@ -924,12 +934,13 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
sender_thread_buffer_free();
- internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).",
+ internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %"PRIu64" usec).",
rrdhost_hostname(s->host), s->connected_to,
string2str(tmp->transaction),
buffer_strlen(func_wb),
now_realtime_usec() - tmp->received_ut);
}
+
string_freez(tmp->transaction);
buffer_free(func_wb);
freez(tmp);
@@ -944,6 +955,14 @@ void execute_commands(struct sender_state *s) {
while( start < end && (newline = strchr(start, '\n')) ) {
*newline = '\0';
+ if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) {
+ if (buffer_strlen(s->function_payload.payload) != 0)
+ buffer_strcat(s->function_payload.payload, "\n");
+ buffer_strcat(s->function_payload.payload, start);
+ start = newline + 1;
+ continue;
+ }
+
netdata_log_access("STREAM: %d from '%s' for host '%s': %s",
gettid(), s->connected_to, rrdhost_hostname(s->host), start);
@@ -954,12 +973,12 @@ void execute_commands(struct sender_state *s) {
const char *keyword = get_word(words, num_words, 0);
- if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
+ char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1);
+ char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2);
+ char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
@@ -979,12 +998,65 @@ void execute_commands(struct sender_state *s) {
tmp->transaction = string_strdupz(transaction);
BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
- int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp);
+ char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL;
+ int code = rrd_function_run(s->host, wb, timeout, function, false, transaction,
+ stream_execute_function_callback, tmp, NULL, NULL, payload);
+
if(code != HTTP_RESP_OK) {
- rrd_call_function_error(wb, "Failed to route request to collector", code);
+ if (!buffer_strlen(wb))
+ rrd_call_function_error(wb, "Failed to route request to collector", code);
+
stream_execute_function_callback(wb, code, tmp);
}
}
+
+ if (s->receiving_function_payload) {
+ s->receiving_function_payload = false;
+
+ buffer_free(s->function_payload.payload);
+ freez(s->function_payload.txid);
+ freez(s->function_payload.timeout);
+ freez(s->function_payload.fn_name);
+
+ memset(&s->function_payload, 0, sizeof(struct function_payload_state));
+ }
+ }
+ else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
+ if (s->receiving_function_payload) {
+ netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword);
+ s->receiving_function_payload = false;
+ buffer_free(s->function_payload.payload);
+ s->function_payload.payload = NULL;
+
+ // TODO send error response
+ }
+
+ char *transaction = get_word(words, num_words, 1);
+ char *timeout_s = get_word(words, num_words, 2);
+ char *function = get_word(words, num_words, 3);
+
+ if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
+ netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ rrdhost_hostname(s->host), s->connected_to,
+ keyword,
+ transaction?transaction:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ function?function:"(unset)");
+ }
+
+ s->receiving_function_payload = true;
+ s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions);
+
+ s->function_payload.txid = strdupz(get_word(words, num_words, 1));
+ s->function_payload.timeout = strdupz(get_word(words, num_words, 2));
+ s->function_payload.fn_name = strdupz(get_word(words, num_words, 3));
+ }
+ else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+
+ char *transaction = get_word(words, num_words, 1);
+ if(transaction && *transaction)
+ rrd_function_cancel(transaction);
}
else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
@@ -1179,7 +1251,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
freez(s);
}
-void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
+void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) {
#ifdef ENABLE_HTTPS
static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
spinlock_lock(&sp);
@@ -1321,6 +1393,7 @@ void *rrdpush_sender_thread(void *ptr) {
rrdpush_send_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
rrdpush_send_global_functions(s->host);
+ rrdpush_send_dyncfg(s->host);
s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);