summaryrefslogtreecommitdiffstats
path: root/spawn/spawn.c
blob: a9359da04e7d35d8420782699266ac1a0ce78d01 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
// SPDX-License-Identifier: GPL-3.0-or-later

#include "spawn.h"

static uv_thread_t thread;
int spawn_thread_error;
int spawn_thread_shutdown;

struct spawn_queue spawn_cmd_queue;

static struct spawn_cmd_info *create_spawn_cmd(const char *command_to_run)
{
    struct spawn_cmd_info *cmdinfo;

    cmdinfo = mallocz(sizeof(*cmdinfo));
    fatal_assert(0 == uv_cond_init(&cmdinfo->cond));
    fatal_assert(0 == uv_mutex_init(&cmdinfo->mutex));
    cmdinfo->serial = 0; /* invalid */
    cmdinfo->command_to_run = strdupz(command_to_run);
    cmdinfo->exit_status = -1; /* invalid */
    cmdinfo->pid = -1; /* invalid */
    cmdinfo->flags = 0;

    return cmdinfo;
}

void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo)
{
    uv_cond_destroy(&cmdinfo->cond);
    uv_mutex_destroy(&cmdinfo->mutex);

    freez(cmdinfo->command_to_run);
    freez(cmdinfo);
}

int spawn_cmd_compare(void *a, void *b)
{
    struct spawn_cmd_info *cmda = a, *cmdb = b;

    /* No need for mutex, serial will never change and the entries cannot be deallocated yet */
    if (cmda->serial < cmdb->serial) return -1;
    if (cmda->serial > cmdb->serial) return 1;

    return 0;
}

static void init_spawn_cmd_queue(void)
{
    spawn_cmd_queue.cmd_tree.root = NULL;
    spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare;
    spawn_cmd_queue.size = 0;
    spawn_cmd_queue.latest_serial = 0;
    fatal_assert(0 == uv_cond_init(&spawn_cmd_queue.cond));
    fatal_assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex));
}

/*
 * Returns serial number of the enqueued command
 */
uint64_t spawn_enq_cmd(const char *command_to_run)
{
    unsigned queue_size;
    uint64_t serial;
    avl_t *avl_ret;
    struct spawn_cmd_info *cmdinfo;

    cmdinfo = create_spawn_cmd(command_to_run);

    /* wait for free space in queue */
    uv_mutex_lock(&spawn_cmd_queue.mutex);
    while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) {
        uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex);
    }
    fatal_assert(queue_size < SPAWN_MAX_OUTSTANDING);
    spawn_cmd_queue.size = queue_size + 1;

    serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */
    cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */

    /* enqueue command */
    avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo);
    fatal_assert(avl_ret == (avl_t *)cmdinfo);
    uv_mutex_unlock(&spawn_cmd_queue.mutex);

    /* wake up event loop */
    fatal_assert(0 == uv_async_send(&spawn_async));
    return serial;
}

/*
 * Blocks until command with serial finishes running. Only one thread is allowed to wait per command.
 */
void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp)
{
    avl_t *avl_ret;
    struct spawn_cmd_info tmp, *cmdinfo;

    tmp.serial = serial;

    uv_mutex_lock(&spawn_cmd_queue.mutex);
    avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl_t *)&tmp);
    uv_mutex_unlock(&spawn_cmd_queue.mutex);

    fatal_assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */
    cmdinfo = (struct spawn_cmd_info *)avl_ret;

    uv_mutex_lock(&cmdinfo->mutex);
    while (!(cmdinfo->flags & SPAWN_CMD_DONE)) {
        /* Only 1 thread is allowed to wait for this command to finish */
        uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex);
    }
    uv_mutex_unlock(&cmdinfo->mutex);

    spawn_deq_cmd(cmdinfo);
    *exit_status = cmdinfo->exit_status;
    *exec_run_timestamp = cmdinfo->exec_run_timestamp;

    destroy_spawn_cmd(cmdinfo);
}

void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo)
{
    unsigned queue_size;
    avl_t *avl_ret;

    uv_mutex_lock(&spawn_cmd_queue.mutex);
    queue_size = spawn_cmd_queue.size;
    fatal_assert(queue_size);
    /* dequeue command */
    avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo);
    fatal_assert(avl_ret);

    spawn_cmd_queue.size = queue_size - 1;

    /* wake up callers */
    uv_cond_signal(&spawn_cmd_queue.cond);
    uv_mutex_unlock(&spawn_cmd_queue.mutex);
}

/*
 * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the
 * only writer as far as struct spawn_cmd_info entries are concerned.
 */
static int find_unprocessed_spawn_cmd_cb(void *entry, void *data)
{
    struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry;

    if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) {
        *cmdinfop = cmdinfo;
        return -1; /* break tree traversal */
    }
    return 0; /* continue traversing */
}

struct spawn_cmd_info *spawn_get_unprocessed_cmd(void)
{
    struct spawn_cmd_info *cmdinfo;
    unsigned queue_size;
    int ret;

    uv_mutex_lock(&spawn_cmd_queue.mutex);
    queue_size = spawn_cmd_queue.size;
    if (queue_size == 0) {
        uv_mutex_unlock(&spawn_cmd_queue.mutex);
        return NULL;
    }
    /* find command */
    cmdinfo = NULL;
    ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo);
    if (-1 != ret) { /* no commands available for processing */
        uv_mutex_unlock(&spawn_cmd_queue.mutex);
        return NULL;
    }
    uv_mutex_unlock(&spawn_cmd_queue.mutex);

    return cmdinfo;
}

/**
 * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties.
 * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD.
 * The caller has to be the netdata user as configured.
 *
 * @param loop the libuv loop of the caller context
 * @param spawn_channel the bidirectional libuv IPC pipe that the server and the caller will share
 * @param process the spawn server libuv process context
 * @return 0 on success or the libuv error code
 */
int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process)
{
    uv_process_options_t options = {0};
    char *args[3];
    int ret;
#define SPAWN_SERVER_DESCRIPTORS (3)
    uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS];
    struct passwd *passwd = NULL;
    char *user = NULL;

    passwd = getpwuid(getuid());
    user = (passwd && passwd->pw_name) ? passwd->pw_name : "";

    args[0] = exepath;
    args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT;
    args[2] = NULL;

    memset(&options, 0, sizeof(options));
    options.file = exepath;
    options.args = args;
    options.exit_cb = NULL; //exit_cb;
    options.stdio = stdio;
    options.stdio_count = SPAWN_SERVER_DESCRIPTORS;

    stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
    stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */
    stdio[1].flags = UV_INHERIT_FD;
    stdio[1].data.fd = 1 /* UV_STDOUT_FD */;
    stdio[2].flags = UV_INHERIT_FD;
    stdio[2].data.fd = 2 /* UV_STDERR_FD */;

    ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */
    if (0 != ret) {
        netdata_log_error("uv_spawn (process: \"%s\") (user: %s) failed (%s).", exepath, user, uv_strerror(ret));
        fatal("Cannot start netdata without the spawn server.");
    }

    return ret;
}

#define CONCURRENT_SPAWNS 16
#define SPAWN_ITERATIONS  10000
#undef CONCURRENT_STRESS_TEST

void spawn_init(void)
{
    struct completion completion;
    int error;

    netdata_log_info("Initializing spawn client.");

    init_spawn_cmd_queue();

    completion_init(&completion);
    error = uv_thread_create(&thread, spawn_client, &completion);
    if (error) {
        netdata_log_error("uv_thread_create(): %s", uv_strerror(error));
        goto after_error;
    }
    /* wait for spawn client thread to initialize */
    completion_wait_for(&completion);
    completion_destroy(&completion);
    uv_thread_set_name_np(thread, "DAEMON_SPAWN");

    if (spawn_thread_error) {
        error = uv_thread_join(&thread);
        if (error) {
            netdata_log_error("uv_thread_create(): %s", uv_strerror(error));
        }
        goto after_error;
    }
#ifdef CONCURRENT_STRESS_TEST
    signals_reset();
    signals_unblock();

    sleep(60);
    uint64_t serial[CONCURRENT_SPAWNS];
    for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) {
        for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
            char cmd[64];
            sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1);
            serial[i] = spawn_enq_cmd(cmd);
            netdata_log_info("Queued command %s for spawning.", cmd);
        }
        int exit_status;
        time_t exec_run_timestamp;
        for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
            netdata_log_info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
                 exec_run_timestamp);
            spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp);
            netdata_log_info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
                 exec_run_timestamp);
        }
    }
    exit(0);
#endif
    return;

    after_error:
        netdata_log_error("Failed to initialize spawn service. The alarms notifications will not be spawned.");
}