From 83ba6762cc43d9db581b979bb5e3445669e46cc2 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 25 Nov 2024 18:33:56 +0100 Subject: Merging upstream version 2.0.3+dfsg (Closes: #923993, #1042533, #1045145). Signed-off-by: Daniel Baumann --- src/libnetdata/functions_evloop/functions_evloop.c | 12 ++++++++++-- src/libnetdata/functions_evloop/functions_evloop.h | 16 ++++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) (limited to 'src/libnetdata/functions_evloop') diff --git a/src/libnetdata/functions_evloop/functions_evloop.c b/src/libnetdata/functions_evloop/functions_evloop.c index 5000d038f..fd0061844 100644 --- a/src/libnetdata/functions_evloop/functions_evloop.c +++ b/src/libnetdata/functions_evloop/functions_evloop.c @@ -137,6 +137,8 @@ static void worker_add_job(struct functions_evloop_globals *wg, const char *keyw function?function:"(unset)"); } else { + // nd_log(NDLS_COLLECTORS, NDLP_INFO, "WORKER JOB WITH PAYLOAD '%s'", payload ? buffer_tostring(payload) : "NONE"); + int timeout = str2i(timeout_s); const char *msg = "No function with this name found"; @@ -222,6 +224,8 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) { char *s = (char *)buffer_tostring(buffer); if(strstr(&s[deferred.last_len], PLUGINSD_CALL_FUNCTION_PAYLOAD_END "\n") != NULL) { + // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION PAYLOAD END"); + if(deferred.last_len > 0) // remove the trailing newline from the buffer deferred.last_len--; @@ -249,11 +253,12 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) { } char *words[MAX_FUNCTION_PARAMETERS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd((char *)buffer_tostring(buffer), words, MAX_FUNCTION_PARAMETERS); + size_t num_words = quoted_strings_splitter_whitespace((char *)buffer_tostring(buffer), words, MAX_FUNCTION_PARAMETERS); const char *keyword = get_word(words, num_words, 0); if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION) == 0)) { + // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION CALL"); char *transaction = get_word(words, num_words, 1); char *timeout_s = get_word(words, num_words, 2); char *function = get_word(words, num_words, 3); @@ -262,6 +267,7 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) { worker_add_job(wg, keyword, transaction, function, timeout_s, NULL, access, source); } else if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN) == 0)) { + // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION PAYLOAD CALL"); char *transaction = get_word(words, num_words, 1); char *timeout_s = get_word(words, num_words, 2); char *function = get_word(words, num_words, 3); @@ -279,6 +285,7 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) { deferred.enabled = true; } else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_CANCEL) == 0) { + // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION CANCEL"); char *transaction = get_word(words, num_words, 1); const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); if(acquired) { @@ -292,6 +299,7 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) { nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received CANCEL for transaction '%s', but it not available here", transaction); } else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_PROGRESS) == 0) { + // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION PROGRESS"); char *transaction = get_word(words, num_words, 1); const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); if(acquired) { @@ -305,7 +313,7 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) { nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received PROGRESS for transaction '%s', but it not available here", transaction); } else - nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received unknown command: %s", keyword?keyword:"(unset)"); + nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received unknown command: %s", keyword ? keyword : "(unset)"); buffer_flush(buffer); } diff --git a/src/libnetdata/functions_evloop/functions_evloop.h b/src/libnetdata/functions_evloop/functions_evloop.h index 5c575bd17..35defe355 100644 --- a/src/libnetdata/functions_evloop/functions_evloop.h +++ b/src/libnetdata/functions_evloop/functions_evloop.h @@ -71,6 +71,14 @@ #define PLUGINSD_KEYWORD_CONFIG_ACTION_STATUS "status" #define PLUGINSD_FUNCTION_CONFIG "config" +// claiming +#define PLUGINSD_KEYWORD_NODE_ID "NODE_ID" +#define PLUGINSD_KEYWORD_CLAIMED_ID "CLAIMED_ID" + +#define PLUGINSD_KEYWORD_JSON "JSON" +#define PLUGINSD_KEYWORD_JSON_END "JSON_PAYLOAD_END" +#define PLUGINSD_KEYWORD_STREAM_PATH "STREAM_PATH" + typedef void (*functions_evloop_worker_execute_t)(const char *transaction, char *function, usec_t *stop_monotonic_ut, bool *cancelled, BUFFER *payload, HTTP_ACCESS access, const char *source, void *data); @@ -125,9 +133,13 @@ static inline void pluginsd_function_json_error_to_stdout(const char *transactio fflush(stdout); } -static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) { - pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires); +static inline void pluginsd_function_result_to_stdout(const char *transaction, BUFFER *result) { + pluginsd_function_result_begin_to_stdout(transaction, result->response_code, + content_type_id2string(result->content_type), + result->expires); + fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout); + pluginsd_function_result_end_to_stdout(); fflush(stdout); } -- cgit v1.2.3