// SPDX-License-Identifier: GPL-3.0-or-later #include "functions_evloop.h" static void functions_evloop_config_cb(const char *transaction, char *function, usec_t *stop_monotonic_ut, bool *cancelled, BUFFER *payload, HTTP_ACCESS access, const char *source, void *data); struct functions_evloop_worker_job { bool used; bool running; bool cancelled; usec_t stop_monotonic_ut; char *cmd; const char *transaction; time_t timeout; BUFFER *payload; HTTP_ACCESS access; const char *source; functions_evloop_worker_execute_t cb; void *cb_data; }; static void worker_job_cleanup(struct functions_evloop_worker_job *j) { freez((void *)j->cmd); freez((void *)j->transaction); freez((void *)j->source); buffer_free(j->payload); } struct rrd_functions_expectation { const char *function; size_t function_length; functions_evloop_worker_execute_t cb; void *cb_data; time_t default_timeout; struct rrd_functions_expectation *prev, *next; }; struct functions_evloop_globals { const char *tag; DICTIONARY *worker_queue; pthread_mutex_t worker_mutex; pthread_cond_t worker_cond_var; size_t workers; netdata_mutex_t *stdout_mutex; bool *plugin_should_exit; bool workers_exit; // all workers are waiting on the same condition - this makes them all exit, when any is cancelled ND_THREAD *reader_thread; ND_THREAD **worker_threads; struct { DICTIONARY *nodes; } dyncfg; struct rrd_functions_expectation *expectations; }; static void rrd_functions_worker_canceller(void *data) { struct functions_evloop_globals *wg = data; pthread_mutex_lock(&wg->worker_mutex); wg->workers_exit = true; pthread_cond_signal(&wg->worker_cond_var); pthread_mutex_unlock(&wg->worker_mutex); } static void *rrd_functions_worker_globals_worker_main(void *arg) { struct functions_evloop_globals *wg = arg; nd_thread_register_canceller(rrd_functions_worker_canceller, wg); bool last_acquired = true; while (true) { pthread_mutex_lock(&wg->worker_mutex); if(wg->workers_exit || nd_thread_signaled_to_cancel()) { pthread_mutex_unlock(&wg->worker_mutex); break; } if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired) pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex); const DICTIONARY_ITEM *acquired = NULL; struct functions_evloop_worker_job *j; dfe_start_write(wg->worker_queue, j) { if(j->running || j->cancelled) continue; acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item); j->running = true; break; } dfe_done(j); pthread_mutex_unlock(&wg->worker_mutex); if(wg->workers_exit || nd_thread_signaled_to_cancel()) { if(acquired) dictionary_acquired_item_release(wg->worker_queue, acquired); break; } if(acquired) { ND_LOG_STACK lgs[] = { ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd), ND_LOG_FIELD_END(), }; ND_LOG_STACK_PUSH(lgs); last_acquired = true; j = dictionary_acquired_item_value(acquired); j->cb(j->transaction, j->cmd, &j->stop_monotonic_ut, &j->cancelled, j->payload, j->access, j->source, j->cb_data); dictionary_del(wg->worker_queue, j->transaction); dictionary_acquired_item_release(wg->worker_queue, acquired); dictionary_garbage_collect(wg->worker_queue); } else last_acquired = false; } return NULL; } static void worker_add_job(struct functions_evloop_globals *wg, const char *keyword, char *transaction, char *function, char *timeout_s, BUFFER *payload, const char *access, const char *source) { if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { nd_log(NDLS_COLLECTORS, NDLP_ERR, "Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", keyword, transaction?transaction:"(unset)", timeout_s?timeout_s:"(unset)", function?function:"(unset)"); } else { int timeout = str2i(timeout_s); const char *msg = "No function with this name found"; bool found = false; struct rrd_functions_expectation *we; for(we = wg->expectations; we ;we = we->next) { if(strncmp(function, we->function, we->function_length) == 0) { if(timeout <= 0) timeout = (int)we->default_timeout; struct functions_evloop_worker_job t = { .cmd = strdupz(function), .transaction = strdupz(transaction), .running = false, .cancelled = false, .timeout = timeout, .stop_monotonic_ut = now_monotonic_usec() + (timeout * USEC_PER_SEC), .used = false, .payload = buffer_dup(payload), .access = http_access_from_hex(access), .source = source ? strdupz(source) : NULL, .cb = we->cb, .cb_data = we->cb_data, }; struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t)); if(j->used) { nd_log(NDLS_COLLECTORS, NDLP_WARNING, "Received duplicate function transaction '%s'. Ignoring it.", transaction); worker_job_cleanup(&t); msg = "Duplicate function transaction. Ignoring it."; } else { found = true; j->used = true; pthread_mutex_lock(&wg->worker_mutex); pthread_cond_signal(&wg->worker_cond_var); pthread_mutex_unlock(&wg->worker_mutex); } } } if(!found) { netdata_mutex_lock(wg->stdout_mutex); pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, msg); netdata_mutex_unlock(wg->stdout_mutex); } } } static void *rrd_functions_worker_globals_reader_main(void *arg) { struct functions_evloop_globals *wg = arg; struct { size_t last_len; // to remember the last pos - do not use a pointer, the buffer may realloc... bool enabled; char *transaction; char *function; char *timeout_s; char *access; char *source; char *content_type; } deferred = { 0 }; struct buffered_reader reader = { 0 }; buffered_reader_init(&reader); BUFFER *buffer = buffer_create(sizeof(reader.read_buffer) + 2, NULL); while(!(*wg->plugin_should_exit)) { if(unlikely(!buffered_reader_next_line(&reader, buffer))) { buffered_reader_ret_t ret = buffered_reader_read_timeout( &reader, fileno((FILE *)stdin), 2 * 60 * MSEC_PER_SEC, false ); if(unlikely(ret != BUFFERED_READER_READ_OK && ret != BUFFERED_READER_READ_POLL_TIMEOUT)) break; continue; } if(deferred.enabled) { char *s = (char *)buffer_tostring(buffer); if(strstr(&s[deferred.last_len], PLUGINSD_CALL_FUNCTION_PAYLOAD_END "\n") != NULL) { if(deferred.last_len > 0) // remove the trailing newline from the buffer deferred.last_len--; s[deferred.last_len] = '\0'; buffer->len = deferred.last_len; buffer->content_type = content_type_string2id(deferred.content_type); worker_add_job(wg, PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN, deferred.transaction, deferred.function, deferred.timeout_s, buffer, deferred.access, deferred.source); buffer_flush(buffer); freez(deferred.transaction); freez(deferred.function); freez(deferred.timeout_s); freez(deferred.access); freez(deferred.source); freez(deferred.content_type); memset(&deferred, 0, sizeof(deferred)); } else deferred.last_len = buffer->len; continue; } char *words[MAX_FUNCTION_PARAMETERS] = { NULL }; size_t num_words = quoted_strings_splitter_pluginsd((char *)buffer_tostring(buffer), words, MAX_FUNCTION_PARAMETERS); const char *keyword = get_word(words, num_words, 0); if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION) == 0)) { char *transaction = get_word(words, num_words, 1); char *timeout_s = get_word(words, num_words, 2); char *function = get_word(words, num_words, 3); char *access = get_word(words, num_words, 4); char *source = get_word(words, num_words, 5); worker_add_job(wg, keyword, transaction, function, timeout_s, NULL, access, source); } else if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN) == 0)) { char *transaction = get_word(words, num_words, 1); char *timeout_s = get_word(words, num_words, 2); char *function = get_word(words, num_words, 3); char *access = get_word(words, num_words, 4); char *source = get_word(words, num_words, 5); char *content_type = get_word(words, num_words, 6); deferred.transaction = strdupz(transaction ? transaction : ""); deferred.timeout_s = strdupz(timeout_s ? timeout_s : ""); deferred.function = strdupz(function ? function : ""); deferred.access = strdupz(access ? access : ""); deferred.source = strdupz(source ? source : ""); deferred.content_type = strdupz(content_type ? content_type : ""); deferred.last_len = 0; deferred.enabled = true; } else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_CANCEL) == 0) { char *transaction = get_word(words, num_words, 1); const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); if(acquired) { struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired); __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED); dictionary_acquired_item_release(wg->worker_queue, acquired); dictionary_del(wg->worker_queue, transaction); dictionary_garbage_collect(wg->worker_queue); } else nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received CANCEL for transaction '%s', but it not available here", transaction); } else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_PROGRESS) == 0) { char *transaction = get_word(words, num_words, 1); const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); if(acquired) { struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired); functions_stop_monotonic_update_on_progress(&j->stop_monotonic_ut); dictionary_acquired_item_release(wg->worker_queue, acquired); } else nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received PROGRESS for transaction '%s', but it not available here", transaction); } else nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received unknown command: %s", keyword?keyword:"(unset)"); buffer_flush(buffer); } if(!(*wg->plugin_should_exit)) nd_log(NDLS_COLLECTORS, NDLP_ERR, "Read error on stdin"); *wg->plugin_should_exit = true; exit(1); } void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { struct functions_evloop_worker_job *j = value; worker_job_cleanup(j); } struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) { struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals)); wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL); wg->dyncfg.nodes = dyncfg_nodes_dictionary_create(); pthread_mutex_init(&wg->worker_mutex, NULL); pthread_cond_init(&wg->worker_cond_var, NULL); wg->plugin_should_exit = plugin_should_exit; wg->stdout_mutex = stdout_mutex; wg->workers = worker_threads; wg->worker_threads = callocz(wg->workers, sizeof(ND_THREAD *)); wg->tag = tag; char tag_buffer[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag); wg->reader_thread = nd_thread_create(tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG, rrd_functions_worker_globals_reader_main, wg); for(size_t i = 0; i < wg->workers ; i++) { snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1); wg->worker_threads[i] = nd_thread_create(tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG, rrd_functions_worker_globals_worker_main, wg); } functions_evloop_add_function(wg, "config", functions_evloop_config_cb, 120, wg); return wg; } void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout, void *data) { struct rrd_functions_expectation *we = callocz(1, sizeof(*we)); we->function = function; we->function_length = strlen(we->function); we->cb = cb; we->cb_data = data; we->default_timeout = default_timeout; DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next); } void functions_evloop_cancel_threads(struct functions_evloop_globals *wg) { nd_thread_signal_cancel(wg->reader_thread); for(size_t i = 0; i < wg->workers ; i++) nd_thread_signal_cancel(wg->worker_threads[i]); } // ---------------------------------------------------------------------------- static void functions_evloop_config_cb(const char *transaction, char *function, usec_t *stop_monotonic_ut, bool *cancelled, BUFFER *payload, HTTP_ACCESS access, const char *source, void *data) { struct functions_evloop_globals *wg = data; CLEAN_BUFFER *result = buffer_create(1024, NULL); int code = dyncfg_node_find_and_call(wg->dyncfg.nodes, transaction, function, stop_monotonic_ut, cancelled, payload, access, source, result); netdata_mutex_lock(wg->stdout_mutex); pluginsd_function_result_begin_to_stdout(transaction, code, content_type_id2string(result->content_type), result->expires); printf("%s", buffer_tostring(result)); pluginsd_function_result_end_to_stdout(); fflush(stdout); netdata_mutex_unlock(wg->stdout_mutex); } void functions_evloop_dyncfg_add(struct functions_evloop_globals *wg, const char *id, const char *path, DYNCFG_STATUS status, DYNCFG_TYPE type, DYNCFG_SOURCE_TYPE source_type, const char *source, DYNCFG_CMDS cmds, HTTP_ACCESS view_access, HTTP_ACCESS edit_access, dyncfg_cb_t cb, void *data) { if(!dyncfg_is_valid_id(id)) { nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id); return; } struct dyncfg_node tmp = { .cmds = cmds, .type = type, .cb = cb, .data = data, }; dictionary_set(wg->dyncfg.nodes, id, &tmp, sizeof(tmp)); CLEAN_BUFFER *c = buffer_create(100, NULL); dyncfg_cmds2buffer(cmds, c); netdata_mutex_lock(wg->stdout_mutex); fprintf(stdout, PLUGINSD_KEYWORD_CONFIG " '%s' " PLUGINSD_KEYWORD_CONFIG_ACTION_CREATE " '%s' '%s' '%s' '%s' '%s' '%s' "HTTP_ACCESS_FORMAT" "HTTP_ACCESS_FORMAT"\n", id, dyncfg_id2status(status), dyncfg_id2type(type), path, dyncfg_id2source_type(source_type), source, buffer_tostring(c), (HTTP_ACCESS_FORMAT_CAST)view_access, (HTTP_ACCESS_FORMAT_CAST)edit_access ); fflush(stdout); netdata_mutex_unlock(wg->stdout_mutex); } void functions_evloop_dyncfg_del(struct functions_evloop_globals *wg, const char *id) { if(!dyncfg_is_valid_id(id)) { nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id); return; } dictionary_del(wg->dyncfg.nodes, id); netdata_mutex_lock(wg->stdout_mutex); fprintf(stdout, PLUGINSD_KEYWORD_CONFIG " %s " PLUGINSD_KEYWORD_CONFIG_ACTION_DELETE "\n", id); fflush(stdout); netdata_mutex_unlock(wg->stdout_mutex); } void functions_evloop_dyncfg_status(struct functions_evloop_globals *wg, const char *id, DYNCFG_STATUS status) { if(!dyncfg_is_valid_id(id)) { nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id); return; } netdata_mutex_lock(wg->stdout_mutex); fprintf(stdout, PLUGINSD_KEYWORD_CONFIG " %s " PLUGINSD_KEYWORD_CONFIG_ACTION_STATUS " %s\n", id, dyncfg_id2status(status)); fflush(stdout); netdata_mutex_unlock(wg->stdout_mutex); }