summaryrefslogtreecommitdiffstats
path: root/spawn/spawn_client.c
blob: 8928a468c6bc9840dcaf7de9e34dce8e8a10d06d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// SPDX-License-Identifier: GPL-3.0-or-later

#include "spawn.h"

static uv_process_t process;
static uv_pipe_t spawn_channel;
static uv_loop_t *loop;
uv_async_t spawn_async;

static char prot_buffer[MAX_COMMAND_LENGTH];
static unsigned prot_buffer_len = 0;

static void async_cb(uv_async_t *handle)
{
    uv_stop(handle->loop);
}

static void after_pipe_write(uv_write_t* req, int status)
{
    (void)status;
#ifdef SPAWN_DEBUG
    netdata_log_info("CLIENT %s called status=%d", __func__, status);
#endif
    void **data = req->data;
    freez(data[0]);
    freez(data[1]);
    freez(data);
}

static void client_parse_spawn_protocol(unsigned source_len, char *source)
{
    unsigned required_len;
    struct spawn_prot_header *header;
    struct spawn_prot_spawn_result *spawn_result;
    struct spawn_prot_cmd_exit_status *exit_status;
    struct spawn_cmd_info *cmdinfo;

    while (source_len) {
        required_len = sizeof(*header);
        if (prot_buffer_len < required_len)
            copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
        if (prot_buffer_len < required_len)
            return; /* Source buffer ran out */

        header = (struct spawn_prot_header *)prot_buffer;
        cmdinfo = (struct spawn_cmd_info *)header->handle;
        fatal_assert(NULL != cmdinfo);

        switch(header->opcode) {
        case SPAWN_PROT_SPAWN_RESULT:
            required_len += sizeof(*spawn_result);
            if (prot_buffer_len < required_len)
                copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
            if (prot_buffer_len < required_len)
                return; /* Source buffer ran out */

            spawn_result = (struct spawn_prot_spawn_result *)(header + 1);
            uv_mutex_lock(&cmdinfo->mutex);
            cmdinfo->pid = spawn_result->exec_pid;
            if (0 == cmdinfo->pid) { /* Failed to spawn */
#ifdef SPAWN_DEBUG
                netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT failed to spawn.", __func__);
#endif
                cmdinfo->flags |= SPAWN_CMD_FAILED_TO_SPAWN | SPAWN_CMD_DONE;
                uv_cond_signal(&cmdinfo->cond);
            } else {
                cmdinfo->exec_run_timestamp = spawn_result->exec_run_timestamp;
                cmdinfo->flags |= SPAWN_CMD_IN_PROGRESS;
#ifdef SPAWN_DEBUG
                netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT in progress.", __func__);
#endif
            }
            uv_mutex_unlock(&cmdinfo->mutex);
            prot_buffer_len = 0;
            break;
        case SPAWN_PROT_CMD_EXIT_STATUS:
            required_len += sizeof(*exit_status);
            if (prot_buffer_len < required_len)
                copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
            if (prot_buffer_len < required_len)
                return; /* Source buffer ran out */

            exit_status = (struct spawn_prot_cmd_exit_status *)(header + 1);
            uv_mutex_lock(&cmdinfo->mutex);
            cmdinfo->exit_status = exit_status->exec_exit_status;
#ifdef SPAWN_DEBUG
            netdata_log_info("CLIENT %s SPAWN_PROT_CMD_EXIT_STATUS %d.", __func__, exit_status->exec_exit_status);
#endif
            cmdinfo->flags |= SPAWN_CMD_DONE;
            uv_cond_signal(&cmdinfo->cond);
            uv_mutex_unlock(&cmdinfo->mutex);
            prot_buffer_len = 0;
            break;
        default:
            fatal_assert(0);
            break;
        }

    }
}

static void on_pipe_read(uv_stream_t* pipe, ssize_t nread, const uv_buf_t* buf)
{
    if (0 == nread) {
        netdata_log_info("%s: Zero bytes read from spawn pipe.", __func__);
    } else if (UV_EOF == nread) {
        netdata_log_info("EOF found in spawn pipe.");
    } else if (nread < 0) {
        netdata_log_error("%s: %s", __func__, uv_strerror(nread));
    }

    if (nread < 0) { /* stop stream due to EOF or error */
        (void)uv_read_stop((uv_stream_t *)pipe);
    } else if (nread) {
#ifdef SPAWN_DEBUG
        netdata_log_info("CLIENT %s read %u", __func__, (unsigned)nread);
#endif
        client_parse_spawn_protocol(nread, buf->base);
    }
    if (buf && buf->len) {
        freez(buf->base);
    }

    if (nread < 0) {
        uv_close((uv_handle_t *)pipe, NULL);
    }
}

static void on_read_alloc(uv_handle_t* handle,
                          size_t suggested_size,
                          uv_buf_t* buf)
{
    (void)handle;
    buf->base = mallocz(suggested_size);
    buf->len = suggested_size;
}

static void spawn_process_cmd(struct spawn_cmd_info *cmdinfo)
{
    int ret;
    uv_buf_t *writebuf;
    struct write_context *write_ctx;

    void **data = callocz(2, sizeof(void *));
    writebuf = callocz(3, sizeof(uv_buf_t));
    write_ctx = callocz(1, sizeof(*write_ctx));

    data[0] = write_ctx;
    data[1] = writebuf;
    write_ctx->write_req.data = data;

    uv_mutex_lock(&cmdinfo->mutex);
    cmdinfo->flags |= SPAWN_CMD_PROCESSED;
    uv_mutex_unlock(&cmdinfo->mutex);

    write_ctx->header.opcode = SPAWN_PROT_EXEC_CMD;
    write_ctx->header.handle = cmdinfo;
    write_ctx->payload.command_length = strlen(cmdinfo->command_to_run);

    writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header));
    writebuf[1] = uv_buf_init((char *)&write_ctx->payload, sizeof(write_ctx->payload));
    writebuf[2] = uv_buf_init((char *)cmdinfo->command_to_run, write_ctx->payload.command_length);

#ifdef SPAWN_DEBUG
    netdata_log_info("CLIENT %s SPAWN_PROT_EXEC_CMD %u", __func__, (unsigned)cmdinfo->serial);
#endif
    ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&spawn_channel, writebuf, 3, after_pipe_write);
    fatal_assert(ret == 0);
}

void spawn_client(void *arg)
{
    int ret;
    struct completion *completion = (struct completion *)arg;

    loop = mallocz(sizeof(uv_loop_t));
    ret = uv_loop_init(loop);
    if (ret) {
        netdata_log_error("uv_loop_init(): %s", uv_strerror(ret));
        spawn_thread_error = ret;
        goto error_after_loop_init;
    }
    loop->data = NULL;

    spawn_async.data = NULL;
    ret = uv_async_init(loop, &spawn_async, async_cb);
    if (ret) {
        netdata_log_error("uv_async_init(): %s", uv_strerror(ret));
        spawn_thread_error = ret;
        goto error_after_async_init;
    }

    ret = uv_pipe_init(loop, &spawn_channel, 1);
    if (ret) {
        netdata_log_error("uv_pipe_init(): %s", uv_strerror(ret));
        spawn_thread_error = ret;
        goto error_after_pipe_init;
    }
    fatal_assert(spawn_channel.ipc);

    ret = create_spawn_server(loop, &spawn_channel, &process);
    if (ret) {
        netdata_log_error("Failed to fork spawn server process.");
        spawn_thread_error = ret;
        goto error_after_spawn_server;
    }

    spawn_thread_error = 0;
    spawn_thread_shutdown = 0;
    /* wake up initialization thread */
    completion_mark_complete(completion);

    prot_buffer_len = 0;
    ret = uv_read_start((uv_stream_t *)&spawn_channel, on_read_alloc, on_pipe_read);
    fatal_assert(ret == 0);

    while (spawn_thread_shutdown == 0) {
        struct spawn_cmd_info *cmdinfo;

        uv_run(loop, UV_RUN_DEFAULT);
        while (NULL != (cmdinfo = spawn_get_unprocessed_cmd())) {
            spawn_process_cmd(cmdinfo);
        }
    }
    /* cleanup operations of the event loop */
    netdata_log_info("Shutting down spawn client event loop.");
    uv_close((uv_handle_t *)&spawn_channel, NULL);
    uv_close((uv_handle_t *)&spawn_async, NULL);
    uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */

    netdata_log_info("Shutting down spawn client loop complete.");
    fatal_assert(0 == uv_loop_close(loop));

    return;

error_after_spawn_server:
    uv_close((uv_handle_t *)&spawn_channel, NULL);
error_after_pipe_init:
    uv_close((uv_handle_t *)&spawn_async, NULL);
error_after_async_init:
    uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */
    fatal_assert(0 == uv_loop_close(loop));
error_after_loop_init:
    freez(loop);

    /* wake up initialization thread */
    completion_mark_complete(completion);
}