summaryrefslogtreecommitdiffstats
path: root/src/streaming/sender.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/streaming/sender.c (renamed from streaming/sender.c)220
1 files changed, 124 insertions, 96 deletions
diff --git a/streaming/sender.c b/src/streaming/sender.c
index 09b67e968..bb617c5fd 100644
--- a/streaming/sender.c
+++ b/src/streaming/sender.c
@@ -234,11 +234,8 @@ static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY
struct custom_host_variables_callback *tmp = struct_ptr;
BUFFER *wb = tmp->wb;
- if(unlikely(rrdvar_flags(rv) & RRDVAR_FLAG_CUSTOM_HOST_VAR && rrdvar_type(rv) == RRDVAR_TYPE_CALCULATED)) {
- rrdpush_sender_add_host_variable_to_buffer(wb, rv);
- return 1;
- }
- return 0;
+ rrdpush_sender_add_host_variable_to_buffer(wb, rv);
+ return 1;
}
static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
@@ -789,7 +786,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
"&ml_capable=%d"
"&ml_enabled=%d"
"&mc_version=%d"
- "&tags=%s"
"&ver=%u"
"&NETDATA_INSTANCE_CLOUD_TYPE=%s"
"&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s"
@@ -835,7 +831,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
, host->system_info->ml_capable
, host->system_info->ml_enabled
, host->system_info->mc_version
- , rrdhost_tags(host)
, s->capabilities
, (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : ""
, (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : ""
@@ -952,6 +947,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
nd_log(NDLS_DAEMON, NDLP_WARNING,
"STREAM %s [send to %s]: cannot set non-blocking mode for socket.",
rrdhost_hostname(host), s->connected_to);
+ sock_setcloexec(s->rrdpush_sender_socket);
if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
nd_log(NDLS_DAEMON, NDLP_WARNING,
@@ -1118,9 +1114,8 @@ struct inflight_stream_function {
usec_t received_ut;
};
-void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
+static void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
struct inflight_stream_function *tmp = data;
-
struct sender_state *s = tmp->sender;
if(rrdhost_can_send_definitions_to_parent(s->host)) {
@@ -1129,7 +1124,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
pluginsd_function_result_begin_to_buffer(wb
, string2str(tmp->transaction)
, code
- , functions_content_type_to_format(func_wb->content_type)
+ , content_type_id2string(func_wb->content_type)
, func_wb->expires);
buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
@@ -1150,6 +1145,77 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
freez(tmp);
}
+static void stream_execute_function_progress_callback(void *data, size_t done, size_t all) {
+ struct inflight_stream_function *tmp = data;
+ struct sender_state *s = tmp->sender;
+
+ if(rrdhost_can_send_definitions_to_parent(s->host)) {
+ BUFFER *wb = sender_start(s);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_FUNCTION_PROGRESS " '%s' %zu %zu\n",
+ string2str(tmp->transaction), done, all);
+
+ sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
+ }
+}
+
+static void execute_commands_function(struct sender_state *s, const char *command, const char *transaction, const char *timeout_s, const char *function, BUFFER *payload, const char *access, const char *source) {
+ worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
+
+ if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
+ netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ rrdhost_hostname(s->host), s->connected_to,
+ command,
+ transaction?transaction:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ function?function:"(unset)");
+ }
+ else {
+ int timeout = str2i(timeout_s);
+ if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
+
+ struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function));
+ tmp->received_ut = now_realtime_usec();
+ tmp->sender = s;
+ tmp->transaction = string_strdupz(transaction);
+ BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_functions);
+
+ int code = rrd_function_run(s->host, wb, timeout,
+ http_access_from_hex_mapping_old_roles(access), function, false, transaction,
+ stream_execute_function_callback, tmp,
+ stream_has_capability(s, STREAM_CAP_PROGRESS) ? stream_execute_function_progress_callback : NULL,
+ stream_has_capability(s, STREAM_CAP_PROGRESS) ? tmp : NULL,
+ NULL, NULL, payload, source);
+
+ if(code != HTTP_RESP_OK) {
+ if (!buffer_strlen(wb))
+ rrd_call_function_error(wb, "Failed to route request to collector", code);
+ }
+ }
+}
+
+static void cleanup_intercepting_input(struct sender_state *s) {
+ freez((void *)s->functions.transaction);
+ freez((void *)s->functions.timeout_s);
+ freez((void *)s->functions.function);
+ freez((void *)s->functions.access);
+ freez((void *)s->functions.source);
+ buffer_free(s->functions.payload);
+
+ s->functions.transaction = NULL;
+ s->functions.timeout_s = NULL;
+ s->functions.function = NULL;
+ s->functions.payload = NULL;
+ s->functions.access = NULL;
+ s->functions.source = NULL;
+ s->functions.intercept_input = false;
+}
+
+static void execute_commands_cleanup(struct sender_state *s) {
+ cleanup_intercepting_input(s);
+}
+
// This is just a placeholder until the gap filling state machine is inserted
void execute_commands(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
@@ -1163,109 +1229,70 @@ void execute_commands(struct sender_state *s) {
char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
*end = 0;
while( start < end && (newline = strchr(start, '\n')) ) {
- *newline = '\0';
+ s->line.count++;
+
+ if(s->functions.intercept_input) {
+ if(strcmp(start, PLUGINSD_CALL_FUNCTION_PAYLOAD_END "\n") == 0) {
+ execute_commands_function(s,
+ PLUGINSD_CALL_FUNCTION_PAYLOAD_END,
+ s->functions.transaction, s->functions.timeout_s,
+ s->functions.function, s->functions.payload,
+ s->functions.access, s->functions.source);
+
+ cleanup_intercepting_input(s);
+ }
+ else
+ buffer_strcat(s->functions.payload, start);
- if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) {
- if (buffer_strlen(s->function_payload.payload) != 0)
- buffer_strcat(s->function_payload.payload, "\n");
- buffer_strcat(s->function_payload.payload, start);
start = newline + 1;
continue;
}
- s->line.count++;
+ *newline = '\0';
s->line.num_words = quoted_strings_splitter_pluginsd(start, s->line.words, PLUGINSD_MAX_WORDS);
const char *command = get_word(s->line.words, s->line.num_words, 0);
- if(command && (strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
- worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
- nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
-
- char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(s->line.words, s->line.num_words, 1);
- char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(s->line.words, s->line.num_words, 2);
- char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(s->line.words, s->line.num_words, 3);
-
- if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
- netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
- rrdhost_hostname(s->host), s->connected_to,
- command,
- transaction?transaction:"(unset)",
- timeout_s?timeout_s:"(unset)",
- function?function:"(unset)");
- }
- else {
- int timeout = str2i(timeout_s);
- if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
-
- struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function));
- tmp->received_ut = now_realtime_usec();
- tmp->sender = s;
- tmp->transaction = string_strdupz(transaction);
- BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
-
- char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL;
- int code = rrd_function_run(s->host, wb, timeout, function, false, transaction,
- stream_execute_function_callback, tmp, NULL, NULL, payload);
-
- if(code != HTTP_RESP_OK) {
- if (!buffer_strlen(wb))
- rrd_call_function_error(wb, "Failed to route request to collector", code);
-
- stream_execute_function_callback(wb, code, tmp);
- }
- }
-
- if (s->receiving_function_payload) {
- s->receiving_function_payload = false;
-
- buffer_free(s->function_payload.payload);
- freez(s->function_payload.txid);
- freez(s->function_payload.timeout);
- freez(s->function_payload.fn_name);
+ if(command && strcmp(command, PLUGINSD_CALL_FUNCTION) == 0) {
+ char *transaction = get_word(s->line.words, s->line.num_words, 1);
+ char *timeout_s = get_word(s->line.words, s->line.num_words, 2);
+ char *function = get_word(s->line.words, s->line.num_words, 3);
+ char *access = get_word(s->line.words, s->line.num_words, 4);
+ char *source = get_word(s->line.words, s->line.num_words, 5);
- memset(&s->function_payload, 0, sizeof(struct function_payload_state));
- }
+ execute_commands_function(s, command, transaction, timeout_s, function, NULL, access, source);
}
- else if (command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
- nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
-
- if (s->receiving_function_payload) {
- netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload",
- rrdhost_hostname(s->host), s->connected_to, command);
- s->receiving_function_payload = false;
- buffer_free(s->function_payload.payload);
- s->function_payload.payload = NULL;
-
- // TODO send error response
- }
+ else if(command && strcmp(command, PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN) == 0) {
+ char *transaction = get_word(s->line.words, s->line.num_words, 1);
+ char *timeout_s = get_word(s->line.words, s->line.num_words, 2);
+ char *function = get_word(s->line.words, s->line.num_words, 3);
+ char *access = get_word(s->line.words, s->line.num_words, 4);
+ char *source = get_word(s->line.words, s->line.num_words, 5);
+ char *content_type = get_word(s->line.words, s->line.num_words, 6);
+
+ s->functions.transaction = strdupz(transaction ? transaction : "");
+ s->functions.timeout_s = strdupz(timeout_s ? timeout_s : "");
+ s->functions.function = strdupz(function ? function : "");
+ s->functions.access = strdupz(access ? access : "");
+ s->functions.source = strdupz(source ? source : "");
+ s->functions.payload = buffer_create(0, NULL);
+ s->functions.payload->content_type = content_type_string2id(content_type);
+ s->functions.intercept_input = true;
+ }
+ else if(command && strcmp(command, PLUGINSD_CALL_FUNCTION_CANCEL) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL);
char *transaction = get_word(s->line.words, s->line.num_words, 1);
- char *timeout_s = get_word(s->line.words, s->line.num_words, 2);
- char *function = get_word(s->line.words, s->line.num_words, 3);
-
- if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
- netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
- rrdhost_hostname(s->host), s->connected_to,
- command,
- transaction?transaction:"(unset)",
- timeout_s?timeout_s:"(unset)",
- function?function:"(unset)");
- }
-
- s->receiving_function_payload = true;
- s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions);
-
- s->function_payload.txid = strdupz(get_word(s->line.words, s->line.num_words, 1));
- s->function_payload.timeout = strdupz(get_word(s->line.words, s->line.num_words, 2));
- s->function_payload.fn_name = strdupz(get_word(s->line.words, s->line.num_words, 3));
+ if(transaction && *transaction)
+ rrd_function_cancel(transaction);
}
- else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
+ else if(command && strcmp(command, PLUGINSD_CALL_FUNCTION_PROGRESS) == 0) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL);
char *transaction = get_word(s->line.words, s->line.num_words, 1);
if(transaction && *transaction)
- rrd_function_cancel(transaction);
+ rrd_function_progress(transaction);
}
else if (command && strcmp(command, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
@@ -1455,6 +1482,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
rrdpush_sender_thread_close_socket(host);
rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
+ execute_commands_cleanup(host->sender);
rrdhost_clear_sender___while_having_sender_mutex(host);
@@ -1655,6 +1683,7 @@ void *rrdpush_sender_thread(void *ptr) {
now_s = now_monotonic_sec();
rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true);
+ execute_commands_cleanup(s);
rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
s->flags &= ~SENDER_FLAG_OVERFLOW;
@@ -1672,7 +1701,6 @@ void *rrdpush_sender_thread(void *ptr) {
rrdpush_send_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
rrdpush_send_global_functions(s->host);
- rrdpush_send_dyncfg(s->host);
s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);