diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:03 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:18 +0000 |
commit | 5da14042f70711ea5cf66e034699730335462f66 (patch) | |
tree | 0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/streaming/sender.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz netdata-5da14042f70711ea5cf66e034699730335462f66.zip |
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/streaming/sender.c (renamed from streaming/sender.c) | 220 |
1 files changed, 124 insertions, 96 deletions
diff --git a/streaming/sender.c b/src/streaming/sender.c index 09b67e968..bb617c5fd 100644 --- a/streaming/sender.c +++ b/src/streaming/sender.c @@ -234,11 +234,8 @@ static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY struct custom_host_variables_callback *tmp = struct_ptr; BUFFER *wb = tmp->wb; - if(unlikely(rrdvar_flags(rv) & RRDVAR_FLAG_CUSTOM_HOST_VAR && rrdvar_type(rv) == RRDVAR_TYPE_CALCULATED)) { - rrdpush_sender_add_host_variable_to_buffer(wb, rv); - return 1; - } - return 0; + rrdpush_sender_add_host_variable_to_buffer(wb, rv); + return 1; } static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { @@ -789,7 +786,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p "&ml_capable=%d" "&ml_enabled=%d" "&mc_version=%d" - "&tags=%s" "&ver=%u" "&NETDATA_INSTANCE_CLOUD_TYPE=%s" "&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s" @@ -835,7 +831,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p , host->system_info->ml_capable , host->system_info->ml_enabled , host->system_info->mc_version - , rrdhost_tags(host) , s->capabilities , (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : "" , (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : "" @@ -952,6 +947,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p nd_log(NDLS_DAEMON, NDLP_WARNING, "STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); + sock_setcloexec(s->rrdpush_sender_socket); if(sock_enlarge_out(s->rrdpush_sender_socket) < 0) nd_log(NDLS_DAEMON, NDLP_WARNING, @@ -1118,9 +1114,8 @@ struct inflight_stream_function { usec_t received_ut; }; -void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { +static void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { struct inflight_stream_function *tmp = data; - struct sender_state *s = tmp->sender; if(rrdhost_can_send_definitions_to_parent(s->host)) { @@ -1129,7 +1124,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { pluginsd_function_result_begin_to_buffer(wb , string2str(tmp->transaction) , code - , functions_content_type_to_format(func_wb->content_type) + , content_type_id2string(func_wb->content_type) , func_wb->expires); buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb)); @@ -1150,6 +1145,77 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { freez(tmp); } +static void stream_execute_function_progress_callback(void *data, size_t done, size_t all) { + struct inflight_stream_function *tmp = data; + struct sender_state *s = tmp->sender; + + if(rrdhost_can_send_definitions_to_parent(s->host)) { + BUFFER *wb = sender_start(s); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_FUNCTION_PROGRESS " '%s' %zu %zu\n", + string2str(tmp->transaction), done, all); + + sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); + } +} + +static void execute_commands_function(struct sender_state *s, const char *command, const char *transaction, const char *timeout_s, const char *function, BUFFER *payload, const char *access, const char *source) { + worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + nd_log(NDLS_ACCESS, NDLP_INFO, NULL); + + if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { + netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + rrdhost_hostname(s->host), s->connected_to, + command, + transaction?transaction:"(unset)", + timeout_s?timeout_s:"(unset)", + function?function:"(unset)"); + } + else { + int timeout = str2i(timeout_s); + if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; + + struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function)); + tmp->received_ut = now_realtime_usec(); + tmp->sender = s; + tmp->transaction = string_strdupz(transaction); + BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_functions); + + int code = rrd_function_run(s->host, wb, timeout, + http_access_from_hex_mapping_old_roles(access), function, false, transaction, + stream_execute_function_callback, tmp, + stream_has_capability(s, STREAM_CAP_PROGRESS) ? stream_execute_function_progress_callback : NULL, + stream_has_capability(s, STREAM_CAP_PROGRESS) ? tmp : NULL, + NULL, NULL, payload, source); + + if(code != HTTP_RESP_OK) { + if (!buffer_strlen(wb)) + rrd_call_function_error(wb, "Failed to route request to collector", code); + } + } +} + +static void cleanup_intercepting_input(struct sender_state *s) { + freez((void *)s->functions.transaction); + freez((void *)s->functions.timeout_s); + freez((void *)s->functions.function); + freez((void *)s->functions.access); + freez((void *)s->functions.source); + buffer_free(s->functions.payload); + + s->functions.transaction = NULL; + s->functions.timeout_s = NULL; + s->functions.function = NULL; + s->functions.payload = NULL; + s->functions.access = NULL; + s->functions.source = NULL; + s->functions.intercept_input = false; +} + +static void execute_commands_cleanup(struct sender_state *s) { + cleanup_intercepting_input(s); +} + // This is just a placeholder until the gap filling state machine is inserted void execute_commands(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_EXECUTE); @@ -1163,109 +1229,70 @@ void execute_commands(struct sender_state *s) { char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline; *end = 0; while( start < end && (newline = strchr(start, '\n')) ) { - *newline = '\0'; + s->line.count++; + + if(s->functions.intercept_input) { + if(strcmp(start, PLUGINSD_CALL_FUNCTION_PAYLOAD_END "\n") == 0) { + execute_commands_function(s, + PLUGINSD_CALL_FUNCTION_PAYLOAD_END, + s->functions.transaction, s->functions.timeout_s, + s->functions.function, s->functions.payload, + s->functions.access, s->functions.source); + + cleanup_intercepting_input(s); + } + else + buffer_strcat(s->functions.payload, start); - if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) { - if (buffer_strlen(s->function_payload.payload) != 0) - buffer_strcat(s->function_payload.payload, "\n"); - buffer_strcat(s->function_payload.payload, start); start = newline + 1; continue; } - s->line.count++; + *newline = '\0'; s->line.num_words = quoted_strings_splitter_pluginsd(start, s->line.words, PLUGINSD_MAX_WORDS); const char *command = get_word(s->line.words, s->line.num_words, 0); - if(command && (strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) { - worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); - nd_log(NDLS_ACCESS, NDLP_INFO, NULL); - - char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(s->line.words, s->line.num_words, 1); - char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(s->line.words, s->line.num_words, 2); - char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(s->line.words, s->line.num_words, 3); - - if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - rrdhost_hostname(s->host), s->connected_to, - command, - transaction?transaction:"(unset)", - timeout_s?timeout_s:"(unset)", - function?function:"(unset)"); - } - else { - int timeout = str2i(timeout_s); - if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; - - struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function)); - tmp->received_ut = now_realtime_usec(); - tmp->sender = s; - tmp->transaction = string_strdupz(transaction); - BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); - - char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL; - int code = rrd_function_run(s->host, wb, timeout, function, false, transaction, - stream_execute_function_callback, tmp, NULL, NULL, payload); - - if(code != HTTP_RESP_OK) { - if (!buffer_strlen(wb)) - rrd_call_function_error(wb, "Failed to route request to collector", code); - - stream_execute_function_callback(wb, code, tmp); - } - } - - if (s->receiving_function_payload) { - s->receiving_function_payload = false; - - buffer_free(s->function_payload.payload); - freez(s->function_payload.txid); - freez(s->function_payload.timeout); - freez(s->function_payload.fn_name); + if(command && strcmp(command, PLUGINSD_CALL_FUNCTION) == 0) { + char *transaction = get_word(s->line.words, s->line.num_words, 1); + char *timeout_s = get_word(s->line.words, s->line.num_words, 2); + char *function = get_word(s->line.words, s->line.num_words, 3); + char *access = get_word(s->line.words, s->line.num_words, 4); + char *source = get_word(s->line.words, s->line.num_words, 5); - memset(&s->function_payload, 0, sizeof(struct function_payload_state)); - } + execute_commands_function(s, command, transaction, timeout_s, function, NULL, access, source); } - else if (command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { - nd_log(NDLS_ACCESS, NDLP_INFO, NULL); - - if (s->receiving_function_payload) { - netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", - rrdhost_hostname(s->host), s->connected_to, command); - s->receiving_function_payload = false; - buffer_free(s->function_payload.payload); - s->function_payload.payload = NULL; - - // TODO send error response - } + else if(command && strcmp(command, PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN) == 0) { + char *transaction = get_word(s->line.words, s->line.num_words, 1); + char *timeout_s = get_word(s->line.words, s->line.num_words, 2); + char *function = get_word(s->line.words, s->line.num_words, 3); + char *access = get_word(s->line.words, s->line.num_words, 4); + char *source = get_word(s->line.words, s->line.num_words, 5); + char *content_type = get_word(s->line.words, s->line.num_words, 6); + + s->functions.transaction = strdupz(transaction ? transaction : ""); + s->functions.timeout_s = strdupz(timeout_s ? timeout_s : ""); + s->functions.function = strdupz(function ? function : ""); + s->functions.access = strdupz(access ? access : ""); + s->functions.source = strdupz(source ? source : ""); + s->functions.payload = buffer_create(0, NULL); + s->functions.payload->content_type = content_type_string2id(content_type); + s->functions.intercept_input = true; + } + else if(command && strcmp(command, PLUGINSD_CALL_FUNCTION_CANCEL) == 0) { + worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL); char *transaction = get_word(s->line.words, s->line.num_words, 1); - char *timeout_s = get_word(s->line.words, s->line.num_words, 2); - char *function = get_word(s->line.words, s->line.num_words, 3); - - if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - rrdhost_hostname(s->host), s->connected_to, - command, - transaction?transaction:"(unset)", - timeout_s?timeout_s:"(unset)", - function?function:"(unset)"); - } - - s->receiving_function_payload = true; - s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions); - - s->function_payload.txid = strdupz(get_word(s->line.words, s->line.num_words, 1)); - s->function_payload.timeout = strdupz(get_word(s->line.words, s->line.num_words, 2)); - s->function_payload.fn_name = strdupz(get_word(s->line.words, s->line.num_words, 3)); + if(transaction && *transaction) + rrd_function_cancel(transaction); } - else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { + else if(command && strcmp(command, PLUGINSD_CALL_FUNCTION_PROGRESS) == 0) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL); char *transaction = get_word(s->line.words, s->line.num_words, 1); if(transaction && *transaction) - rrd_function_cancel(transaction); + rrd_function_progress(transaction); } else if (command && strcmp(command, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST); @@ -1455,6 +1482,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { rrdpush_sender_thread_close_socket(host); rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); + execute_commands_cleanup(host->sender); rrdhost_clear_sender___while_having_sender_mutex(host); @@ -1655,6 +1683,7 @@ void *rrdpush_sender_thread(void *ptr) { now_s = now_monotonic_sec(); rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true); + execute_commands_cleanup(s); rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); s->flags &= ~SENDER_FLAG_OVERFLOW; @@ -1672,7 +1701,6 @@ void *rrdpush_sender_thread(void *ptr) { rrdpush_send_claimed_id(s->host); rrdpush_send_host_labels(s->host); rrdpush_send_global_functions(s->host); - rrdpush_send_dyncfg(s->host); s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); |