summaryrefslogtreecommitdiffstats
path: root/src/libnetdata/functions_evloop
diff options
context:
space:
mode:
Diffstat (limited to 'src/libnetdata/functions_evloop')
-rw-r--r--src/libnetdata/functions_evloop/functions_evloop.c12
-rw-r--r--src/libnetdata/functions_evloop/functions_evloop.h16
2 files changed, 24 insertions, 4 deletions
diff --git a/src/libnetdata/functions_evloop/functions_evloop.c b/src/libnetdata/functions_evloop/functions_evloop.c
index 5000d038f..fd0061844 100644
--- a/src/libnetdata/functions_evloop/functions_evloop.c
+++ b/src/libnetdata/functions_evloop/functions_evloop.c
@@ -137,6 +137,8 @@ static void worker_add_job(struct functions_evloop_globals *wg, const char *keyw
function?function:"(unset)");
}
else {
+ // nd_log(NDLS_COLLECTORS, NDLP_INFO, "WORKER JOB WITH PAYLOAD '%s'", payload ? buffer_tostring(payload) : "NONE");
+
int timeout = str2i(timeout_s);
const char *msg = "No function with this name found";
@@ -222,6 +224,8 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) {
char *s = (char *)buffer_tostring(buffer);
if(strstr(&s[deferred.last_len], PLUGINSD_CALL_FUNCTION_PAYLOAD_END "\n") != NULL) {
+ // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION PAYLOAD END");
+
if(deferred.last_len > 0)
// remove the trailing newline from the buffer
deferred.last_len--;
@@ -249,11 +253,12 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) {
}
char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
- size_t num_words = quoted_strings_splitter_pluginsd((char *)buffer_tostring(buffer), words, MAX_FUNCTION_PARAMETERS);
+ size_t num_words = quoted_strings_splitter_whitespace((char *)buffer_tostring(buffer), words, MAX_FUNCTION_PARAMETERS);
const char *keyword = get_word(words, num_words, 0);
if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION) == 0)) {
+ // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION CALL");
char *transaction = get_word(words, num_words, 1);
char *timeout_s = get_word(words, num_words, 2);
char *function = get_word(words, num_words, 3);
@@ -262,6 +267,7 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) {
worker_add_job(wg, keyword, transaction, function, timeout_s, NULL, access, source);
}
else if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN) == 0)) {
+ // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION PAYLOAD CALL");
char *transaction = get_word(words, num_words, 1);
char *timeout_s = get_word(words, num_words, 2);
char *function = get_word(words, num_words, 3);
@@ -279,6 +285,7 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) {
deferred.enabled = true;
}
else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_CANCEL) == 0) {
+ // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION CANCEL");
char *transaction = get_word(words, num_words, 1);
const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
if(acquired) {
@@ -292,6 +299,7 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) {
nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received CANCEL for transaction '%s', but it not available here", transaction);
}
else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_PROGRESS) == 0) {
+ // nd_log(NDLS_COLLECTORS, NDLP_INFO, "FUNCTION PROGRESS");
char *transaction = get_word(words, num_words, 1);
const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
if(acquired) {
@@ -305,7 +313,7 @@ static void *rrd_functions_worker_globals_reader_main(void *arg) {
nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received PROGRESS for transaction '%s', but it not available here", transaction);
}
else
- nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received unknown command: %s", keyword?keyword:"(unset)");
+ nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received unknown command: %s", keyword ? keyword : "(unset)");
buffer_flush(buffer);
}
diff --git a/src/libnetdata/functions_evloop/functions_evloop.h b/src/libnetdata/functions_evloop/functions_evloop.h
index 5c575bd17..35defe355 100644
--- a/src/libnetdata/functions_evloop/functions_evloop.h
+++ b/src/libnetdata/functions_evloop/functions_evloop.h
@@ -71,6 +71,14 @@
#define PLUGINSD_KEYWORD_CONFIG_ACTION_STATUS "status"
#define PLUGINSD_FUNCTION_CONFIG "config"
+// claiming
+#define PLUGINSD_KEYWORD_NODE_ID "NODE_ID"
+#define PLUGINSD_KEYWORD_CLAIMED_ID "CLAIMED_ID"
+
+#define PLUGINSD_KEYWORD_JSON "JSON"
+#define PLUGINSD_KEYWORD_JSON_END "JSON_PAYLOAD_END"
+#define PLUGINSD_KEYWORD_STREAM_PATH "STREAM_PATH"
+
typedef void (*functions_evloop_worker_execute_t)(const char *transaction, char *function, usec_t *stop_monotonic_ut,
bool *cancelled, BUFFER *payload, HTTP_ACCESS access,
const char *source, void *data);
@@ -125,9 +133,13 @@ static inline void pluginsd_function_json_error_to_stdout(const char *transactio
fflush(stdout);
}
-static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) {
- pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires);
+static inline void pluginsd_function_result_to_stdout(const char *transaction, BUFFER *result) {
+ pluginsd_function_result_begin_to_stdout(transaction, result->response_code,
+ content_type_id2string(result->content_type),
+ result->expires);
+
fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout);
+
pluginsd_function_result_end_to_stdout();
fflush(stdout);
}