summaryrefslogtreecommitdiffstats
path: root/libnetdata/functions_evloop/functions_evloop.c
blob: 3fcd70aa1fcd181f287c87c949befd71bb72853c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// SPDX-License-Identifier: GPL-3.0-or-later

#include "functions_evloop.h"

#define MAX_FUNCTION_PARAMETERS 1024

struct functions_evloop_worker_job {
    bool used;
    bool running;
    bool cancelled;
    char *cmd;
    const char *transaction;
    time_t timeout;
    functions_evloop_worker_execute_t cb;
};

struct rrd_functions_expectation {
    const char *function;
    size_t function_length;
    functions_evloop_worker_execute_t cb;
    time_t default_timeout;
    struct rrd_functions_expectation *prev, *next;
};

struct functions_evloop_globals {
    const char *tag;

    DICTIONARY *worker_queue;
    pthread_mutex_t worker_mutex;
    pthread_cond_t worker_cond_var;
    size_t workers;

    netdata_mutex_t *stdout_mutex;
    bool *plugin_should_exit;

    netdata_thread_t reader_thread;
    netdata_thread_t *worker_threads;

    struct rrd_functions_expectation *expectations;
};

static void *rrd_functions_worker_globals_worker_main(void *arg) {
    struct functions_evloop_globals *wg = arg;

    bool last_acquired = true;
    while (true) {
        pthread_mutex_lock(&wg->worker_mutex);

        if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired)
            pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);

        const DICTIONARY_ITEM *acquired = NULL;
        struct functions_evloop_worker_job *j;
        dfe_start_write(wg->worker_queue, j) {
            if(j->running || j->cancelled)
                continue;

            acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
            j->running = true;
            break;
        }
        dfe_done(j);

        pthread_mutex_unlock(&wg->worker_mutex);

        if(acquired) {
            last_acquired = true;
            j = dictionary_acquired_item_value(acquired);
            j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled);
            dictionary_del(wg->worker_queue, j->transaction);
            dictionary_acquired_item_release(wg->worker_queue, acquired);
            dictionary_garbage_collect(wg->worker_queue);
        }
        else
            last_acquired = false;
    }
    return NULL;
}

static void *rrd_functions_worker_globals_reader_main(void *arg) {
    struct functions_evloop_globals *wg = arg;

    char buffer[PLUGINSD_LINE_MAX + 1];

    char *s = NULL;
    while(!(*wg->plugin_should_exit) && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {

        char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
        size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, MAX_FUNCTION_PARAMETERS);

        const char *keyword = get_word(words, num_words, 0);

        if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
            char *transaction = get_word(words, num_words, 1);
            char *timeout_s = get_word(words, num_words, 2);
            char *function = get_word(words, num_words, 3);

            if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
                netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
                                  keyword,
                                  transaction?transaction:"(unset)",
                                  timeout_s?timeout_s:"(unset)",
                                  function?function:"(unset)");
            }
            else {
                int timeout = str2i(timeout_s);

                bool found = false;
                struct rrd_functions_expectation *we;
                for(we = wg->expectations; we ;we = we->next) {
                    if(strncmp(function, we->function, we->function_length) == 0) {
                        struct functions_evloop_worker_job t = {
                                .cmd = strdupz(function),
                                .transaction = strdupz(transaction),
                                .running = false,
                                .cancelled = false,
                                .timeout = timeout > 0 ? timeout : we->default_timeout,
                                .used = false,
                                .cb = we->cb,
                        };
                        struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t));
                        if(j->used) {
                            netdata_log_error("Received duplicate function transaction '%s'", transaction);
                            freez((void *)t.cmd);
                            freez((void *)t.transaction);
                        }
                        else {
                            found = true;
                            j->used = true;
                            pthread_cond_signal(&wg->worker_cond_var);
                        }
                    }
                }

                if(!found) {
                    netdata_mutex_lock(wg->stdout_mutex);
                    pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
                                                           "No function with this name found.");
                    netdata_mutex_unlock(wg->stdout_mutex);
                }
            }
        }
        else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
            char *transaction = get_word(words, num_words, 1);
            const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
            if(acquired) {
                struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
                __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED);
                dictionary_acquired_item_release(wg->worker_queue, acquired);
                dictionary_del(wg->worker_queue, transaction);
                dictionary_garbage_collect(wg->worker_queue);
            }
            else
                netdata_log_error("Received CANCEL for transaction '%s', but it not available here", transaction);
        }
        else
            netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
    }

    if(!s || feof(stdin) || ferror(stdin)) {
        *wg->plugin_should_exit = true;
        netdata_log_error("Received error on stdin.");
    }

    exit(1);
}

void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
    struct functions_evloop_worker_job *j = value;
    freez((void *)j->cmd);
    freez((void *)j->transaction);
}

struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) {
    struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals));

    wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
    dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL);

    pthread_mutex_init(&wg->worker_mutex, NULL);
    pthread_cond_init(&wg->worker_cond_var, NULL);

    wg->plugin_should_exit = plugin_should_exit;
    wg->stdout_mutex = stdout_mutex;
    wg->workers = worker_threads;
    wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t ));
    wg->tag = tag;

    char tag_buffer[NETDATA_THREAD_TAG_MAX + 1];
    snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag);
    netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
                          rrd_functions_worker_globals_reader_main, wg);

    for(size_t i = 0; i < wg->workers ; i++) {
        snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1);
        netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
                              rrd_functions_worker_globals_worker_main, wg);
    }

    return wg;
}

void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout) {
    struct rrd_functions_expectation *we = callocz(1, sizeof(*we));
    we->function = function;
    we->function_length = strlen(we->function);
    we->cb = cb;
    we->default_timeout = default_timeout;
    DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next);
}