summaryrefslogtreecommitdiffstats
path: root/src/libnetdata/functions_evloop/functions_evloop.h
blob: 5c575bd1786b5d691771941d986ff526d0eff128 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// SPDX-License-Identifier: GPL-3.0-or-later

#ifndef NETDATA_FUNCTIONS_EVLOOP_H
#define NETDATA_FUNCTIONS_EVLOOP_H

#include "../libnetdata.h"

#define MAX_FUNCTION_PARAMETERS 1024
#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds

// plugins.d 1st version of the external plugins and streaming protocol
#define PLUGINSD_KEYWORD_CHART                  "CHART"
#define PLUGINSD_KEYWORD_CHART_DEFINITION_END   "CHART_DEFINITION_END"
#define PLUGINSD_KEYWORD_DIMENSION              "DIMENSION"
#define PLUGINSD_KEYWORD_BEGIN                  "BEGIN"
#define PLUGINSD_KEYWORD_SET                    "SET"
#define PLUGINSD_KEYWORD_END                    "END"
#define PLUGINSD_KEYWORD_FLUSH                  "FLUSH"
#define PLUGINSD_KEYWORD_DISABLE                "DISABLE"
#define PLUGINSD_KEYWORD_VARIABLE               "VARIABLE"
#define PLUGINSD_KEYWORD_LABEL                  "LABEL"
#define PLUGINSD_KEYWORD_OVERWRITE              "OVERWRITE"
#define PLUGINSD_KEYWORD_CLABEL                 "CLABEL"
#define PLUGINSD_KEYWORD_CLABEL_COMMIT          "CLABEL_COMMIT"
#define PLUGINSD_KEYWORD_EXIT                   "EXIT"

// high-speed versions of BEGIN, SET, END
#define PLUGINSD_KEYWORD_BEGIN_V2               "BEGIN2"
#define PLUGINSD_KEYWORD_SET_V2                 "SET2"
#define PLUGINSD_KEYWORD_END_V2                 "END2"

// super high-speed versions of BEGIN, SET, END have this as first parameter
// enabled with the streaming capability STREAM_CAP_SLOTS
#define PLUGINSD_KEYWORD_SLOT                   "SLOT" // to change the length of this, update pluginsd_extract_chart_slot() too

// virtual hosts (only for external plugins - for streaming virtual hosts are like all other hosts)
#define PLUGINSD_KEYWORD_HOST_DEFINE            "HOST_DEFINE"
#define PLUGINSD_KEYWORD_HOST_DEFINE_END        "HOST_DEFINE_END"
#define PLUGINSD_KEYWORD_HOST_LABEL             "HOST_LABEL"
#define PLUGINSD_KEYWORD_HOST                   "HOST"

// replication
// enabled with STREAM_CAP_REPLICATION
#define PLUGINSD_KEYWORD_REPLAY_CHART           "REPLAY_CHART"
#define PLUGINSD_KEYWORD_REPLAY_BEGIN           "RBEGIN"
#define PLUGINSD_KEYWORD_REPLAY_SET             "RSET"
#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE    "RDSTATE"
#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE    "RSSTATE"
#define PLUGINSD_KEYWORD_REPLAY_END             "REND"

// plugins.d accepts these for functions (from external plugins or streaming children)
// related to STREAM_CAP_FUNCTIONS, STREAM_CAP_PROGRESS
#define PLUGINSD_KEYWORD_FUNCTION               "FUNCTION"                  // define a function
#define PLUGINSD_KEYWORD_FUNCTION_PROGRESS      "FUNCTION_PROGRESS"         // send updates about function progress
#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN  "FUNCTION_RESULT_BEGIN"     // the result of a function transaction
#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END    "FUNCTION_RESULT_END"       // the end of the result of a func. trans.

// plugins.d sends these for functions (to external plugins or streaming children)
// related to STREAM_CAP_FUNCTIONS, STREAM_CAP_PROGRESS
#define PLUGINSD_CALL_FUNCTION                  "FUNCTION"                  // call a function to a plugin or remote host
#define PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN    "FUNCTION_PAYLOAD"          // call a function with a payload
#define PLUGINSD_CALL_FUNCTION_PAYLOAD_END      "FUNCTION_PAYLOAD_END"      // function payload ends
#define PLUGINSD_CALL_FUNCTION_CANCEL           "FUNCTION_CANCEL"           // cancel a running function transaction
#define PLUGINSD_CALL_FUNCTION_PROGRESS         "FUNCTION_PROGRESS"         // let the function know the user is waiting

// dyncfg
// enabled with STREAM_CAP_DYNCFG
#define PLUGINSD_KEYWORD_CONFIG                 "CONFIG"
#define PLUGINSD_KEYWORD_CONFIG_ACTION_CREATE   "create"
#define PLUGINSD_KEYWORD_CONFIG_ACTION_DELETE   "delete"
#define PLUGINSD_KEYWORD_CONFIG_ACTION_STATUS   "status"
#define PLUGINSD_FUNCTION_CONFIG                "config"

typedef void (*functions_evloop_worker_execute_t)(const char *transaction, char *function, usec_t *stop_monotonic_ut,
                                                  bool *cancelled, BUFFER *payload, HTTP_ACCESS access,
                                                  const char *source, void *data);

struct functions_evloop_worker_job;
struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit);
void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout, void *data);
void functions_evloop_cancel_threads(struct functions_evloop_globals *wg);

#define FUNCTIONS_EXTENDED_TIME_ON_PROGRESS_UT (10 * USEC_PER_SEC)
static inline void functions_stop_monotonic_update_on_progress(usec_t *stop_monotonic_ut) {
    usec_t now_ut = now_monotonic_usec();
    if(now_ut + FUNCTIONS_EXTENDED_TIME_ON_PROGRESS_UT > *stop_monotonic_ut) {
        nd_log(NDLS_DAEMON, NDLP_DEBUG, "Extending function timeout due to PROGRESS update...");
        __atomic_store_n(stop_monotonic_ut, now_ut + FUNCTIONS_EXTENDED_TIME_ON_PROGRESS_UT, __ATOMIC_RELAXED);
    }
    else
        nd_log(NDLS_DAEMON, NDLP_DEBUG, "Received PROGRESS update...");
}

#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires)      \
    buffer_sprintf(wb                                                                               \
                    , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n"              \
                    , (transaction) ? (transaction) : ""                                            \
                    , (int)(code)                                                                   \
                    , (content_type) ? (content_type) : ""                                          \
                    , (long int)(expires)                                                           \
    )

#define pluginsd_function_result_end_to_buffer(wb) \
    buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")

#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires)          \
    fprintf(stdout                                                                                  \
                    , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n"              \
                    , (transaction) ? (transaction) : ""                                            \
                    , (int)(code)                                                                   \
                    , (content_type) ? (content_type) : ""                                          \
                    , (long int)(expires)                                                           \
    )

#define pluginsd_function_result_end_to_stdout() \
    fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")

static inline void pluginsd_function_json_error_to_stdout(const char *transaction, int code, const char *msg) {
    char buffer[PLUGINSD_LINE_MAX + 1];
    json_escape_string(buffer, msg, PLUGINSD_LINE_MAX);

    pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
    fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
    pluginsd_function_result_end_to_stdout();
    fflush(stdout);
}

static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) {
    pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires);
    fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout);
    pluginsd_function_result_end_to_stdout();
    fflush(stdout);
}

static inline void pluginsd_function_progress_to_stdout(const char *transaction, size_t done, size_t all) {
    fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION_PROGRESS " '%s' %zu %zu\n",
            transaction, done, all);
    fflush(stdout);
}

static inline void send_newline_and_flush(pthread_mutex_t *mutex) {
    netdata_mutex_lock(mutex);
    fprintf(stdout, "\n");
    fflush(stdout);
    netdata_mutex_unlock(mutex);
}

void functions_evloop_dyncfg_add(struct functions_evloop_globals *wg, const char *id, const char *path,
                                 DYNCFG_STATUS status, DYNCFG_TYPE type, DYNCFG_SOURCE_TYPE source_type, const char *source, DYNCFG_CMDS cmds,
                                 HTTP_ACCESS view_access, HTTP_ACCESS edit_access,
                                 dyncfg_cb_t cb, void *data);

void functions_evloop_dyncfg_del(struct functions_evloop_globals *wg, const char *id);
void functions_evloop_dyncfg_status(struct functions_evloop_globals *wg, const char *id, DYNCFG_STATUS status);

#endif //NETDATA_FUNCTIONS_EVLOOP_H