diff options
Diffstat (limited to 'streaming/sender.c')
-rw-r--r-- | streaming/sender.c | 95 |
1 files changed, 84 insertions, 11 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index 76843518e..c37b158b1 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -377,6 +377,7 @@ struct { const char *error; int worker_job_id; time_t postpone_reconnect_seconds; + bool prevent_log; } stream_responses[] = { { .response = START_STREAMING_PROMPT_VN, @@ -413,6 +414,7 @@ struct { .error = "remote server rejected this stream, the host we are trying to stream is its localhost", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour + .prevent_log = true, }, { .response = START_STREAMING_ERROR_ALREADY_STREAMING, @@ -422,6 +424,7 @@ struct { .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 2 * 60, // 2 minutes + .prevent_log = true, }, { .response = START_STREAMING_ERROR_NOT_PERMITTED, @@ -469,6 +472,7 @@ struct { .error = "remote node response is not understood, is it Netdata?", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 1 * 60, // 1 minute + .prevent_log = false, } }; @@ -498,6 +502,7 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender return true; } + bool prevent_log = stream_responses[i].prevent_log; const char *error = stream_responses[i].error; int worker_job_id = stream_responses[i].worker_job_id; time_t delay = stream_responses[i].postpone_reconnect_seconds; @@ -509,13 +514,18 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender char buf[LOG_DATE_LENGTH]; log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until); - netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", - rrdhost_hostname(host), s->connected_to, error, delay, buf); + + if(prevent_log) + internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", + rrdhost_hostname(host), s->connected_to, error, delay, buf); + else + netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", + rrdhost_hostname(host), s->connected_to, error, delay, buf); return false; } -static bool rrdpush_sender_connect_ssl(struct sender_state *s) { +static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { #ifdef ENABLE_HTTPS RRDHOST *host = s->host; bool ssl_required = host->destination && host->destination->ssl; @@ -924,12 +934,13 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); sender_thread_buffer_free(); - internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).", + internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %"PRIu64" usec).", rrdhost_hostname(s->host), s->connected_to, string2str(tmp->transaction), buffer_strlen(func_wb), now_realtime_usec() - tmp->received_ut); } + string_freez(tmp->transaction); buffer_free(func_wb); freez(tmp); @@ -944,6 +955,14 @@ void execute_commands(struct sender_state *s) { while( start < end && (newline = strchr(start, '\n')) ) { *newline = '\0'; + if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) { + if (buffer_strlen(s->function_payload.payload) != 0) + buffer_strcat(s->function_payload.payload, "\n"); + buffer_strcat(s->function_payload.payload, start); + start = newline + 1; + continue; + } + netdata_log_access("STREAM: %d from '%s' for host '%s': %s", gettid(), s->connected_to, rrdhost_hostname(s->host), start); @@ -954,12 +973,12 @@ void execute_commands(struct sender_state *s) { const char *keyword = get_word(words, num_words, 0); - if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { + if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); + char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1); + char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2); + char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", @@ -979,12 +998,65 @@ void execute_commands(struct sender_state *s) { tmp->transaction = string_strdupz(transaction); BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); - int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp); + char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL; + int code = rrd_function_run(s->host, wb, timeout, function, false, transaction, + stream_execute_function_callback, tmp, NULL, NULL, payload); + if(code != HTTP_RESP_OK) { - rrd_call_function_error(wb, "Failed to route request to collector", code); + if (!buffer_strlen(wb)) + rrd_call_function_error(wb, "Failed to route request to collector", code); + stream_execute_function_callback(wb, code, tmp); } } + + if (s->receiving_function_payload) { + s->receiving_function_payload = false; + + buffer_free(s->function_payload.payload); + freez(s->function_payload.txid); + freez(s->function_payload.timeout); + freez(s->function_payload.fn_name); + + memset(&s->function_payload, 0, sizeof(struct function_payload_state)); + } + } + else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { + if (s->receiving_function_payload) { + netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword); + s->receiving_function_payload = false; + buffer_free(s->function_payload.payload); + s->function_payload.payload = NULL; + + // TODO send error response + } + + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); + + if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { + netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + rrdhost_hostname(s->host), s->connected_to, + keyword, + transaction?transaction:"(unset)", + timeout_s?timeout_s:"(unset)", + function?function:"(unset)"); + } + + s->receiving_function_payload = true; + s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions); + + s->function_payload.txid = strdupz(get_word(words, num_words, 1)); + s->function_payload.timeout = strdupz(get_word(words, num_words, 2)); + s->function_payload.fn_name = strdupz(get_word(words, num_words, 3)); + } + else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { + worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + + char *transaction = get_word(words, num_words, 1); + if(transaction && *transaction) + rrd_function_cancel(transaction); } else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST); @@ -1179,7 +1251,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { freez(s); } -void rrdpush_initialize_ssl_ctx(RRDHOST *host) { +void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) { #ifdef ENABLE_HTTPS static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER; spinlock_lock(&sp); @@ -1321,6 +1393,7 @@ void *rrdpush_sender_thread(void *ptr) { rrdpush_send_claimed_id(s->host); rrdpush_send_host_labels(s->host); rrdpush_send_global_functions(s->host); + rrdpush_send_dyncfg(s->host); s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); |