diff options
Diffstat (limited to 'database/rrdfunctions.c')
-rw-r--r-- | database/rrdfunctions.c | 758 |
1 files changed, 758 insertions, 0 deletions
diff --git a/database/rrdfunctions.c b/database/rrdfunctions.c new file mode 100644 index 0000000..fb847a3 --- /dev/null +++ b/database/rrdfunctions.c @@ -0,0 +1,758 @@ +#define NETDATA_RRD_INTERNALS +#include "rrd.h" + +#define MAX_FUNCTION_LENGTH (PLUGINSD_LINE_MAX - 512) // we need some space for the rest of the line + +static unsigned char functions_allowed_chars[256] = { + [0] = '\0', // + [1] = '_', // + [2] = '_', // + [3] = '_', // + [4] = '_', // + [5] = '_', // + [6] = '_', // + [7] = '_', // + [8] = '_', // + [9] = ' ', // Horizontal Tab + [10] = ' ', // Line Feed + [11] = ' ', // Vertical Tab + [12] = ' ', // Form Feed + [13] = ' ', // Carriage Return + [14] = '_', // + [15] = '_', // + [16] = '_', // + [17] = '_', // + [18] = '_', // + [19] = '_', // + [20] = '_', // + [21] = '_', // + [22] = '_', // + [23] = '_', // + [24] = '_', // + [25] = '_', // + [26] = '_', // + [27] = '_', // + [28] = '_', // + [29] = '_', // + [30] = '_', // + [31] = '_', // + [32] = ' ', // SPACE keep + [33] = '_', // ! + [34] = '_', // " + [35] = '_', // # + [36] = '_', // $ + [37] = '_', // % + [38] = '_', // & + [39] = '_', // ' + [40] = '_', // ( + [41] = '_', // ) + [42] = '_', // * + [43] = '_', // + + [44] = ',', // , keep + [45] = '-', // - keep + [46] = '.', // . keep + [47] = '/', // / keep + [48] = '0', // 0 keep + [49] = '1', // 1 keep + [50] = '2', // 2 keep + [51] = '3', // 3 keep + [52] = '4', // 4 keep + [53] = '5', // 5 keep + [54] = '6', // 6 keep + [55] = '7', // 7 keep + [56] = '8', // 8 keep + [57] = '9', // 9 keep + [58] = ':', // : keep + [59] = ':', // ; convert ; to : + [60] = '_', // < + [61] = ':', // = convert = to : + [62] = '_', // > + [63] = '_', // ? + [64] = '_', // @ + [65] = 'A', // A keep + [66] = 'B', // B keep + [67] = 'C', // C keep + [68] = 'D', // D keep + [69] = 'E', // E keep + [70] = 'F', // F keep + [71] = 'G', // G keep + [72] = 'H', // H keep + [73] = 'I', // I keep + [74] = 'J', // J keep + [75] = 'K', // K keep + [76] = 'L', // L keep + [77] = 'M', // M keep + [78] = 'N', // N keep + [79] = 'O', // O keep + [80] = 'P', // P keep + [81] = 'Q', // Q keep + [82] = 'R', // R keep + [83] = 'S', // S keep + [84] = 'T', // T keep + [85] = 'U', // U keep + [86] = 'V', // V keep + [87] = 'W', // W keep + [88] = 'X', // X keep + [89] = 'Y', // Y keep + [90] = 'Z', // Z keep + [91] = '_', // [ + [92] = '/', // backslash convert \ to / + [93] = '_', // ] + [94] = '_', // ^ + [95] = '_', // _ keep + [96] = '_', // ` + [97] = 'a', // a keep + [98] = 'b', // b keep + [99] = 'c', // c keep + [100] = 'd', // d keep + [101] = 'e', // e keep + [102] = 'f', // f keep + [103] = 'g', // g keep + [104] = 'h', // h keep + [105] = 'i', // i keep + [106] = 'j', // j keep + [107] = 'k', // k keep + [108] = 'l', // l keep + [109] = 'm', // m keep + [110] = 'n', // n keep + [111] = 'o', // o keep + [112] = 'p', // p keep + [113] = 'q', // q keep + [114] = 'r', // r keep + [115] = 's', // s keep + [116] = 't', // t keep + [117] = 'u', // u keep + [118] = 'v', // v keep + [119] = 'w', // w keep + [120] = 'x', // x keep + [121] = 'y', // y keep + [122] = 'z', // z keep + [123] = '_', // { + [124] = '_', // | + [125] = '_', // } + [126] = '_', // ~ + [127] = '_', // + [128] = '_', // + [129] = '_', // + [130] = '_', // + [131] = '_', // + [132] = '_', // + [133] = '_', // + [134] = '_', // + [135] = '_', // + [136] = '_', // + [137] = '_', // + [138] = '_', // + [139] = '_', // + [140] = '_', // + [141] = '_', // + [142] = '_', // + [143] = '_', // + [144] = '_', // + [145] = '_', // + [146] = '_', // + [147] = '_', // + [148] = '_', // + [149] = '_', // + [150] = '_', // + [151] = '_', // + [152] = '_', // + [153] = '_', // + [154] = '_', // + [155] = '_', // + [156] = '_', // + [157] = '_', // + [158] = '_', // + [159] = '_', // + [160] = '_', // + [161] = '_', // + [162] = '_', // + [163] = '_', // + [164] = '_', // + [165] = '_', // + [166] = '_', // + [167] = '_', // + [168] = '_', // + [169] = '_', // + [170] = '_', // + [171] = '_', // + [172] = '_', // + [173] = '_', // + [174] = '_', // + [175] = '_', // + [176] = '_', // + [177] = '_', // + [178] = '_', // + [179] = '_', // + [180] = '_', // + [181] = '_', // + [182] = '_', // + [183] = '_', // + [184] = '_', // + [185] = '_', // + [186] = '_', // + [187] = '_', // + [188] = '_', // + [189] = '_', // + [190] = '_', // + [191] = '_', // + [192] = '_', // + [193] = '_', // + [194] = '_', // + [195] = '_', // + [196] = '_', // + [197] = '_', // + [198] = '_', // + [199] = '_', // + [200] = '_', // + [201] = '_', // + [202] = '_', // + [203] = '_', // + [204] = '_', // + [205] = '_', // + [206] = '_', // + [207] = '_', // + [208] = '_', // + [209] = '_', // + [210] = '_', // + [211] = '_', // + [212] = '_', // + [213] = '_', // + [214] = '_', // + [215] = '_', // + [216] = '_', // + [217] = '_', // + [218] = '_', // + [219] = '_', // + [220] = '_', // + [221] = '_', // + [222] = '_', // + [223] = '_', // + [224] = '_', // + [225] = '_', // + [226] = '_', // + [227] = '_', // + [228] = '_', // + [229] = '_', // + [230] = '_', // + [231] = '_', // + [232] = '_', // + [233] = '_', // + [234] = '_', // + [235] = '_', // + [236] = '_', // + [237] = '_', // + [238] = '_', // + [239] = '_', // + [240] = '_', // + [241] = '_', // + [242] = '_', // + [243] = '_', // + [244] = '_', // + [245] = '_', // + [246] = '_', // + [247] = '_', // + [248] = '_', // + [249] = '_', // + [250] = '_', // + [251] = '_', // + [252] = '_', // + [253] = '_', // + [254] = '_', // + [255] = '_' // +}; + +static inline size_t sanitize_function_text(char *dst, const char *src, size_t dst_len) { + return text_sanitize((unsigned char *)dst, (const unsigned char *)src, dst_len, + functions_allowed_chars, true, "", NULL); +} + +// we keep a dictionary per RRDSET with these functions +// the dictionary is created on demand (only when a function is added to an RRDSET) + +typedef enum { + RRD_FUNCTION_LOCAL = (1 << 0), + RRD_FUNCTION_GLOBAL = (1 << 1), + + // this is 8-bit +} RRD_FUNCTION_OPTIONS; + +struct rrd_collector_function { + bool sync; // when true, the function is called synchronously + uint8_t options; // RRD_FUNCTION_OPTIONS + STRING *help; + int timeout; // the default timeout of the function + + int (*function)(BUFFER *wb, int timeout, const char *function, void *collector_data, + function_data_ready_callback callback, void *callback_data); + + void *collector_data; + struct rrd_collector *collector; +}; + +// Each function points to this collector structure +// so that when the collector exits, all of them will +// be invalidated (running == false) +// The last function that is using this collector +// frees the structure too (or when the collector calls +// rrdset_collector_finished()). + +struct rrd_collector { + int32_t refcount; + pid_t tid; + bool running; +}; + +// Each thread that adds RRDSET functions, has to call +// rrdset_collector_started() and rrdset_collector_finished() +// to create the collector structure. + +static __thread struct rrd_collector *thread_rrd_collector = NULL; + +static void rrd_collector_free(struct rrd_collector *rdc) { + int32_t expected = 0; + if(likely(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))) { + // the collector is still referenced by charts. + // leave it hanging there, the last chart will actually free it. + return; + } + + // we can free it now + freez(rdc); +} + +// called once per collector +void rrd_collector_started(void) { + if(likely(thread_rrd_collector)) return; + + thread_rrd_collector = callocz(1, sizeof(struct rrd_collector)); + thread_rrd_collector->tid = gettid(); + thread_rrd_collector->running = true; +} + +// called once per collector +void rrd_collector_finished(void) { + if(!thread_rrd_collector) + return; + + thread_rrd_collector->running = false; + rrd_collector_free(thread_rrd_collector); + thread_rrd_collector = NULL; +} + +static struct rrd_collector *rrd_collector_acquire(void) { + __atomic_add_fetch(&thread_rrd_collector->refcount, 1, __ATOMIC_SEQ_CST); + return thread_rrd_collector; +} + +static void rrd_collector_release(struct rrd_collector *rdc) { + if(unlikely(!rdc)) return; + + int32_t refcount = __atomic_sub_fetch(&rdc->refcount, 1, __ATOMIC_SEQ_CST); + if(refcount == 0 && !rdc->running) + rrd_collector_free(rdc); +} + +static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, + void *rrdhost __maybe_unused) { + struct rrd_collector_function *rdcf = func; + + if(!thread_rrd_collector) + fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.", + __FUNCTION__, dictionary_acquired_item_name(item)); + + rdcf->collector = rrd_collector_acquire(); +} + +static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, + void *rrdhost __maybe_unused) { + struct rrd_collector_function *rdcf = func; + rrd_collector_release(rdcf->collector); +} + +static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, + void *new_func __maybe_unused, void *rrdhost __maybe_unused) { + struct rrd_collector_function *rdcf = func; + struct rrd_collector_function *new_rdcf = new_func; + + if(!thread_rrd_collector) + fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.", + __FUNCTION__, dictionary_acquired_item_name(item)); + + bool changed = false; + + if(rdcf->collector != thread_rrd_collector) { + struct rrd_collector *old_rdc = rdcf->collector; + rdcf->collector = rrd_collector_acquire(); + rrd_collector_release(old_rdc); + changed = true; + } + + if(rdcf->function != new_rdcf->function) { + rdcf->function = new_rdcf->function; + changed = true; + } + + if(rdcf->help != new_rdcf->help) { + STRING *old = rdcf->help; + rdcf->help = new_rdcf->help; + string_freez(old); + changed = true; + } + else + string_freez(new_rdcf->help); + + if(rdcf->timeout != new_rdcf->timeout) { + rdcf->timeout = new_rdcf->timeout; + changed = true; + } + + if(rdcf->sync != new_rdcf->sync) { + rdcf->sync = new_rdcf->sync; + changed = true; + } + + if(rdcf->collector_data != new_rdcf->collector_data) { + rdcf->collector_data = new_rdcf->collector_data; + changed = true; + } + + return changed; +} + + +void rrdfunctions_init(RRDHOST *host) { + if(host->functions) return; + + host->functions = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + dictionary_register_insert_callback(host->functions, rrd_functions_insert_callback, host); + dictionary_register_delete_callback(host->functions, rrd_functions_delete_callback, host); + dictionary_register_conflict_callback(host->functions, rrd_functions_conflict_callback, host); +} + +void rrdfunctions_destroy(RRDHOST *host) { + dictionary_destroy(host->functions); +} + +void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help, + bool sync, function_execute_at_collector function, void *collector_data) { + + // RRDSET *st may be NULL in this function + // to create a GLOBAL function + + if(st && !st->functions_view) + st->functions_view = dictionary_create_view(host->functions); + + char key[PLUGINSD_LINE_MAX + 1]; + sanitize_function_text(key, name, PLUGINSD_LINE_MAX); + + struct rrd_collector_function tmp = { + .sync = sync, + .timeout = timeout, + .options = (st)?RRD_FUNCTION_LOCAL:RRD_FUNCTION_GLOBAL, + .function = function, + .collector_data = collector_data, + .help = string_strdupz(help), + }; + const DICTIONARY_ITEM *item = dictionary_set_and_acquire_item(host->functions, key, &tmp, sizeof(tmp)); + + if(st) + dictionary_view_set(st->functions_view, key, item); + + dictionary_acquired_item_release(host->functions, item); +} + +void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) { + if(!st->functions_view) + return; + + struct rrd_collector_function *tmp; + dfe_start_read(st->functions_view, tmp) { + buffer_sprintf(wb + , PLUGINSD_KEYWORD_FUNCTION " \"%s\" %d \"%s\"\n" + , tmp_dfe.name + , tmp->timeout + , string2str(tmp->help) + ); + } + dfe_done(tmp); +} + +struct rrd_function_call_wait { + bool free_with_signal; + bool data_are_ready; + netdata_mutex_t mutex; + pthread_cond_t cond; + int code; +}; + +static void rrd_function_call_wait_free(struct rrd_function_call_wait *tmp) { + pthread_cond_destroy(&tmp->cond); + netdata_mutex_destroy(&tmp->mutex); + freez(tmp); +} + +struct { + const char *format; + uint8_t content_type; +} function_formats[] = { + { .format = "application/json", CT_APPLICATION_JSON }, + { .format = "text/plain", CT_TEXT_PLAIN }, + { .format = "application/xml", CT_APPLICATION_XML }, + { .format = "prometheus", CT_PROMETHEUS }, + { .format = "text", CT_TEXT_PLAIN }, + { .format = "txt", CT_TEXT_PLAIN }, + { .format = "json", CT_APPLICATION_JSON }, + { .format = "html", CT_TEXT_HTML }, + { .format = "text/html", CT_TEXT_HTML }, + { .format = "xml", CT_APPLICATION_XML }, + + // terminator + { .format = NULL, CT_TEXT_PLAIN }, +}; + +uint8_t functions_format_to_content_type(const char *format) { + if(format && *format) { + for (int i = 0; function_formats[i].format; i++) + if (strcmp(function_formats[i].format, format) == 0) + return function_formats[i].content_type; + } + + return CT_TEXT_PLAIN; +} + +const char *functions_content_type_to_format(uint8_t content_type) { + for (int i = 0; function_formats[i].format; i++) + if (function_formats[i].content_type == content_type) + return function_formats[i].format; + + return "text/plain"; +} + +int rrd_call_function_error(BUFFER *wb, const char *msg, int code) { + char buffer[PLUGINSD_LINE_MAX]; + json_escape_string(buffer, msg, PLUGINSD_LINE_MAX); + + buffer_flush(wb); + buffer_sprintf(wb, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer); + wb->contenttype = CT_APPLICATION_JSON; + buffer_no_cacheable(wb); + return code; +} + +static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, size_t key_length, struct rrd_collector_function **rdcf) { + char buffer[MAX_FUNCTION_LENGTH + 1]; + + strncpyz(buffer, name, MAX_FUNCTION_LENGTH); + char *s = NULL; + + *rdcf = NULL; + while(!(*rdcf) && buffer[0]) { + *rdcf = dictionary_get(host->functions, buffer); + if(*rdcf) break; + + // if s == NULL, set it to the end of the buffer + // this should happen only the first time + if(unlikely(!s)) + s = &buffer[key_length - 1]; + + // skip a word from the end + while(s >= buffer && !isspace(*s)) *s-- = '\0'; + + // skip all spaces + while(s >= buffer && isspace(*s)) *s-- = '\0'; + } + + buffer_flush(wb); + + if(!(*rdcf)) + return rrd_call_function_error(wb, "No collector is supplying this function on this host at this time.", HTTP_RESP_NOT_FOUND); + + if(!(*rdcf)->collector->running) + return rrd_call_function_error(wb, "The collector that registered this function, is not currently running.", HTTP_RESP_BACKEND_FETCH_FAILED); + + return HTTP_RESP_OK; +} + +static void rrd_call_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, int code, void *callback_data) { + struct rrd_function_call_wait *tmp = callback_data; + bool we_should_free = false; + + netdata_mutex_lock(&tmp->mutex); + + // since we got the mutex, + // the waiting thread is either in pthread_cond_timedwait() + // or gave up and left. + + tmp->code = code; + tmp->data_are_ready = true; + + if(tmp->free_with_signal) + we_should_free = true; + + pthread_cond_signal(&tmp->cond); + + netdata_mutex_unlock(&tmp->mutex); + + if(we_should_free) { + buffer_free(temp_wb); + rrd_function_call_wait_free(tmp); + } +} + +int rrd_call_function_and_wait(RRDHOST *host, BUFFER *wb, int timeout, const char *name) { + int code; + + struct rrd_collector_function *rdcf = NULL; + + char key[PLUGINSD_LINE_MAX + 1]; + size_t key_length = sanitize_function_text(key, name, PLUGINSD_LINE_MAX); + code = rrd_call_function_find(host, wb, key, key_length, &rdcf); + if(code != HTTP_RESP_OK) + return code; + + if(timeout <= 0) + timeout = rdcf->timeout; + + struct timespec tp; + clock_gettime(CLOCK_REALTIME, &tp); + tp.tv_sec += (time_t)timeout; + + if(rdcf->sync) { + code = rdcf->function(wb, timeout, key, rdcf->collector_data, NULL, NULL); + } + else { + struct rrd_function_call_wait *tmp = mallocz(sizeof(struct rrd_function_call_wait)); + tmp->free_with_signal = false; + tmp->data_are_ready = false; + netdata_mutex_init(&tmp->mutex); + pthread_cond_init(&tmp->cond, NULL); + + bool we_should_free = true; + BUFFER *temp_wb = buffer_create(PLUGINSD_LINE_MAX + 1); // we need it because we may give up on it + temp_wb->contenttype = wb->contenttype; + code = rdcf->function(temp_wb, timeout, key, rdcf->collector_data, rrd_call_function_signal_when_ready, tmp); + if (code == HTTP_RESP_OK) { + netdata_mutex_lock(&tmp->mutex); + + int rc = 0; + while (rc == 0 && !tmp->data_are_ready) { + // the mutex is unlocked within pthread_cond_timedwait() + rc = pthread_cond_timedwait(&tmp->cond, &tmp->mutex, &tp); + // the mutex is again ours + } + + if (tmp->data_are_ready) { + // we have a response + buffer_fast_strcat(wb, buffer_tostring(temp_wb), buffer_strlen(temp_wb)); + wb->contenttype = temp_wb->contenttype; + wb->expires = temp_wb->expires; + + if(wb->expires) + buffer_cacheable(wb); + else + buffer_no_cacheable(wb); + + code = tmp->code; + } + else if (rc == ETIMEDOUT) { + // timeout + // we will go away and let the callback free the structure + tmp->free_with_signal = true; + we_should_free = false; + code = rrd_call_function_error(wb, "Timeout while waiting for a response from the collector.", HTTP_RESP_GATEWAY_TIMEOUT); + } + else + code = rrd_call_function_error(wb, "Failed to get the response from the collector.", HTTP_RESP_INTERNAL_SERVER_ERROR); + + netdata_mutex_unlock(&tmp->mutex); + } + else { + buffer_free(temp_wb); + if(!buffer_strlen(wb)) + rrd_call_function_error(wb, "Failed to send request to the collector.", code); + } + + if (we_should_free) + rrd_function_call_wait_free(tmp); + } + + return code; +} + +int rrd_call_function_async(RRDHOST *host, BUFFER *wb, int timeout, const char *name, + rrd_call_function_async_callback callback, void *callback_data) { + int code; + + struct rrd_collector_function *rdcf = NULL; + char key[PLUGINSD_LINE_MAX + 1]; + size_t key_length = sanitize_function_text(key, name, PLUGINSD_LINE_MAX); + code = rrd_call_function_find(host, wb, key, key_length, &rdcf); + if(code != HTTP_RESP_OK) + return code; + + if(timeout <= 0) + timeout = rdcf->timeout; + + code = rdcf->function(wb, timeout, key, rdcf->collector_data, callback, callback_data); + + if(code != HTTP_RESP_OK) { + if (!buffer_strlen(wb)) + rrd_call_function_error(wb, "Failed to send request to the collector.", code); + } + + return code; +} + +static void functions2json(DICTIONARY *functions, BUFFER *wb, const char *ident, const char *kq, const char *sq) { + struct rrd_collector_function *t; + dfe_start_read(functions, t) { + if(!t->collector->running) continue; + + if(t_dfe.counter) + buffer_strcat(wb, ",\n"); + + buffer_sprintf(wb, "%s%s%s%s: {", ident, kq, t_dfe.name, kq); + buffer_sprintf(wb, "\n\t%s%shelp%s: %s%s%s", ident, kq, kq, sq, string2str(t->help), sq); + buffer_sprintf(wb, ",\n\t%s%stimeout%s: %d", ident, kq, kq, t->timeout); + buffer_sprintf(wb, ",\n\t%s%soptions%s: \"%s%s\"", ident, kq, kq + , (t->options & RRD_FUNCTION_LOCAL)?"LOCAL ":"" + , (t->options & RRD_FUNCTION_GLOBAL)?"GLOBAL ":"" + ); + buffer_sprintf(wb, "\n%s}", ident); + } + dfe_done(t); + buffer_strcat(wb, "\n"); +} + +void chart_functions2json(RRDSET *st, BUFFER *wb, int tabs, const char *kq, const char *sq) { + if(!st || !st->functions_view) return; + + char ident[tabs + 1]; + ident[tabs] = '\0'; + while(tabs) ident[--tabs] = '\t'; + + functions2json(st->functions_view, wb, ident, kq, sq); +} + +void host_functions2json(RRDHOST *host, BUFFER *wb, int tabs, const char *kq, const char *sq) { + if(!host || !host->functions) return; + + char ident[tabs + 1]; + ident[tabs] = '\0'; + while(tabs) ident[--tabs] = '\t'; + + functions2json(host->functions, wb, ident, kq, sq); +} + +void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst) { + if(!rrdset_functions_view || !dst) return; + + struct rrd_collector_function *t; + dfe_start_read(rrdset_functions_view, t) { + if(!t->collector->running) continue; + + dictionary_set(dst, t_dfe.name, NULL, 0); + } + dfe_done(t); +} |