diff options
Diffstat (limited to 'fluent-bit/src/http_server/api/v1/metrics.c')
-rw-r--r-- | fluent-bit/src/http_server/api/v1/metrics.c | 579 |
1 files changed, 579 insertions, 0 deletions
diff --git a/fluent-bit/src/http_server/api/v1/metrics.c b/fluent-bit/src/http_server/api/v1/metrics.c new file mode 100644 index 00000000..4a541eaa --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/metrics.c @@ -0,0 +1,579 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_version.h> +#include <fluent-bit/flb_time.h> +#include "metrics.h" + +#include <fluent-bit/flb_http_server.h> +#include <msgpack.h> + +#define null_check(x) do { if (!x) { goto error; } else {sds = x;} } while (0) + +pthread_key_t hs_metrics_key; + +static struct mk_list *hs_metrics_key_create() +{ + struct mk_list *metrics_list = NULL; + + metrics_list = flb_malloc(sizeof(struct mk_list)); + if (metrics_list == NULL) { + flb_errno(); + return NULL; + } + mk_list_init(metrics_list); + pthread_setspecific(hs_metrics_key, metrics_list); + + return metrics_list; +} + +static void hs_metrics_key_destroy(void *data) +{ + struct mk_list *metrics_list = (struct mk_list*)data; + struct mk_list *tmp; + struct mk_list *head; + struct flb_hs_buf *entry; + + if (metrics_list == NULL) { + return; + } + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct flb_hs_buf, _head); + if (entry != NULL) { + if (entry->raw_data != NULL) { + flb_free(entry->raw_data); + entry->raw_data = NULL; + } + if (entry->data) { + flb_sds_destroy(entry->data); + entry->data = NULL; + } + mk_list_del(&entry->_head); + flb_free(entry); + } + } + + flb_free(metrics_list); +} + +/* Return the newest metrics buffer */ +static struct flb_hs_buf *metrics_get_latest() +{ + struct flb_hs_buf *buf; + struct mk_list *metrics_list; + + metrics_list = pthread_getspecific(hs_metrics_key); + if (!metrics_list) { + return NULL; + } + + if (mk_list_size(metrics_list) == 0) { + return NULL; + } + + buf = mk_list_entry_last(metrics_list, struct flb_hs_buf, _head); + return buf; +} + +/* Delete unused metrics, note that we only care about the latest node */ +static int cleanup_metrics() +{ + int c = 0; + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *metrics_list; + struct flb_hs_buf *last; + struct flb_hs_buf *entry; + + metrics_list = pthread_getspecific(hs_metrics_key); + if (!metrics_list) { + return -1; + } + + last = metrics_get_latest(); + if (!last) { + return -1; + } + + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct flb_hs_buf, _head); + if (entry != last && entry->users == 0) { + mk_list_del(&entry->_head); + flb_sds_destroy(entry->data); + flb_free(entry->raw_data); + flb_free(entry); + c++; + } + } + + return c; +} + +/* + * Callback invoked every time some metrics are received through a + * message queue channel. This function runs in a Monkey HTTP thread + * worker and it purpose is to take the metrics data and store it + * somewhere so then it can be available by the end-points upon + * HTTP client requests. + */ +static void cb_mq_metrics(mk_mq_t *queue, void *data, size_t size) +{ + flb_sds_t out_data; + struct flb_hs_buf *buf; + struct mk_list *metrics_list = NULL; + + metrics_list = pthread_getspecific(hs_metrics_key); + if (!metrics_list) { + metrics_list = hs_metrics_key_create(); + if (metrics_list == NULL) { + return; + } + } + + /* Convert msgpack to JSON */ + out_data = flb_msgpack_raw_to_json_sds(data, size); + if (!out_data) { + return; + } + + buf = flb_malloc(sizeof(struct flb_hs_buf)); + if (!buf) { + flb_errno(); + flb_sds_destroy(out_data); + return; + } + buf->users = 0; + buf->data = out_data; + + buf->raw_data = flb_malloc(size); + if (!buf->raw_data) { + flb_errno(); + flb_sds_destroy(out_data); + flb_free(buf); + return; + } + memcpy(buf->raw_data, data, size); + buf->raw_size = size; + + mk_list_add(&buf->_head, metrics_list); + + cleanup_metrics(); +} + +int string_cmp(const void* a_arg, const void* b_arg) { + char *a = *(char **)a_arg; + char *b = *(char **)b_arg; + + return strcmp(a, b); +} + +size_t extract_metric_name_end_position(char *s) { + int i; + + for (i = 0; i < flb_sds_len(s); i++) { + if (s[i] == '{') { + return i; + } + } + return 0; +} + +int is_same_metric(char *s1, char *s2) { + int i; + int p1 = extract_metric_name_end_position(s1); + int p2 = extract_metric_name_end_position(s2); + + if (p1 != p2) { + return 0; + } + + for (i = 0; i < p1; i++) { + if (s1[i] != s2[i]) { + return 0; + } + } + return 1; +} + +/* derive HELP text from metricname */ +/* if help text length > 128, increase init memory for metric_helptxt */ +flb_sds_t metrics_help_txt(char *metric_name, flb_sds_t *metric_helptxt) +{ + if (strstr(metric_name, "input_bytes")) { + return flb_sds_cat(*metric_helptxt, " Number of input bytes.\n", 24); + } + else if (strstr(metric_name, "input_records")) { + return flb_sds_cat(*metric_helptxt, " Number of input records.\n", 26); + } + else if (strstr(metric_name, "output_bytes")) { + return flb_sds_cat(*metric_helptxt, " Number of output bytes.\n", 25); + } + else if (strstr(metric_name, "output_records")) { + return flb_sds_cat(*metric_helptxt, " Number of output records.\n", 27); + } + else if (strstr(metric_name, "output_errors")) { + return flb_sds_cat(*metric_helptxt, " Number of output errors.\n", 26); + } + else if (strstr(metric_name, "output_retries_failed")) { + return flb_sds_cat(*metric_helptxt, " Number of abandoned batches because the maximum number of re-tries was reached.\n", 81); + } + else if (strstr(metric_name, "output_retries")) { + return flb_sds_cat(*metric_helptxt, " Number of output retries.\n", 27); + } + else if (strstr(metric_name, "output_proc_records")) { + return flb_sds_cat(*metric_helptxt, " Number of processed output records.\n", 37); + } + else if (strstr(metric_name, "output_proc_bytes")) { + return flb_sds_cat(*metric_helptxt, " Number of processed output bytes.\n", 35); + } + else if (strstr(metric_name, "output_dropped_records")) { + return flb_sds_cat(*metric_helptxt, " Number of dropped records.\n", 28); + } + else if (strstr(metric_name, "output_retried_records")) { + return flb_sds_cat(*metric_helptxt, " Number of retried records.\n", 28); + } + else { + return (flb_sds_cat(*metric_helptxt, " Fluentbit metrics.\n", 20)); + } +} + +/* API: expose metrics in Prometheus format /api/v1/metrics/prometheus */ +void cb_metrics_prometheus(mk_request_t *request, void *data) +{ + int i; + int j; + int m; + int len; + int time_len; + int start_time_len; + uint64_t uptime; + size_t index; + size_t num_metrics = 0; + long now; + flb_sds_t sds; + flb_sds_t sds_metric; + flb_sds_t tmp_sds; + struct flb_sds *metric_helptxt_head; + flb_sds_t metric_helptxt; + size_t off = 0; + struct flb_hs_buf *buf; + msgpack_unpacked result; + msgpack_object map; + char tmp[32]; + char time_str[64]; + char start_time_str[64]; + char* *metrics_arr; + struct flb_time tp; + struct flb_hs *hs = data; + struct flb_config *config = hs->config; + + buf = metrics_get_latest(); + if (!buf) { + mk_http_status(request, 404); + mk_http_done(request); + return; + } + + /* ref count */ + buf->users++; + + /* Compose outgoing buffer string */ + sds = flb_sds_create_size(1024); + if (!sds) { + mk_http_status(request, 500); + mk_http_done(request); + buf->users--; + return; + } + + /* length of HELP text */ + metric_helptxt = flb_sds_create_size(128); + if (!metric_helptxt) { + flb_sds_destroy(sds); + mk_http_status(request, 500); + mk_http_done(request); + buf->users--; + return; + } + metric_helptxt_head = FLB_SDS_HEADER(metric_helptxt); + + /* + * fluentbit_input_records[name="cpu0", hostname="${HOSTNAME}"] NUM TIMESTAMP + * fluentbit_input_bytes[name="cpu0", hostname="${HOSTNAME}"] NUM TIMESTAMP + */ + index = 0; + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, buf->raw_data, buf->raw_size, &off); + map = result.data; + + /* we need to know number of exposed metrics to reserve a memory */ + for (i = 0; i < map.via.map.size; i++) { + msgpack_object v = map.via.map.ptr[i].val; + /* Iterate sub-map */ + for (j = 0; j < v.via.map.size; j++) { + msgpack_object sv = v.via.map.ptr[j].val; + for (m = 0; m < sv.via.map.size; m++) { + num_metrics++; + } + } + } + metrics_arr = flb_malloc(num_metrics * sizeof(char*)); + if (!metrics_arr) { + flb_errno(); + + mk_http_status(request, 500); + mk_http_done(request); + buf->users--; + + flb_sds_destroy(sds); + flb_sds_destroy(metric_helptxt); + msgpack_unpacked_destroy(&result); + return; + } + + flb_time_get(&tp); + now = flb_time_to_nanosec(&tp) / 1000000; /* in milliseconds */ + time_len = snprintf(time_str, sizeof(time_str) - 1, "%lu", now); + + for (i = 0; i < map.via.map.size; i++) { + msgpack_object k; + msgpack_object v; + + /* Keys: input, output */ + k = map.via.map.ptr[i].key; + v = map.via.map.ptr[i].val; + + /* Iterate sub-map */ + for (j = 0; j < v.via.map.size; j++) { + msgpack_object sk; + msgpack_object sv; + + /* Keys: plugin name , values: metrics */ + sk = v.via.map.ptr[j].key; + sv = v.via.map.ptr[j].val; + + for (m = 0; m < sv.via.map.size; m++) { + msgpack_object mk; + msgpack_object mv; + + mk = sv.via.map.ptr[m].key; + mv = sv.via.map.ptr[m].val; + + /* Convert metric value to string */ + len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 " ", mv.via.u64); + if (len < 0) { + goto error; + } + + /* Allocate buffer */ + sds_metric = flb_sds_create_size(k.via.str.size + + mk.via.str.size + + sk.via.str.size + + len + time_len + 28); + if (sds_metric == NULL) { + goto error; + } + + sds_metric = flb_sds_cat(sds_metric, "fluentbit_", 10); + sds_metric = flb_sds_cat(sds_metric, k.via.str.ptr, k.via.str.size); + sds_metric = flb_sds_cat(sds_metric, "_", 1); + sds_metric = flb_sds_cat(sds_metric, mk.via.str.ptr, mk.via.str.size); + sds_metric = flb_sds_cat(sds_metric, "_total{name=\"", 13); + sds_metric = flb_sds_cat(sds_metric, sk.via.str.ptr, sk.via.str.size); + sds_metric = flb_sds_cat(sds_metric, "\"} ", 3); + sds_metric = flb_sds_cat(sds_metric, tmp, len); + sds_metric = flb_sds_cat(sds_metric, time_str, time_len); + sds_metric = flb_sds_cat(sds_metric, "\n", 1); + metrics_arr[index] = sds_metric; + index++; + } + } + } + + /* Sort metrics in alphabetic order, so we can group them later. */ + qsort(metrics_arr, num_metrics, sizeof(char *), string_cmp); + + /* When a new metric starts add HELP and TYPE annotation. */ + tmp_sds = flb_sds_cat(sds, "# HELP ", 7); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, metrics_arr[0], extract_metric_name_end_position(metrics_arr[0])); + null_check(tmp_sds); + if (!metrics_help_txt(metrics_arr[0], &metric_helptxt)) { + goto error; + } + tmp_sds = flb_sds_cat(sds, metric_helptxt, metric_helptxt_head->len); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "# TYPE ", 7); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, metrics_arr[0], extract_metric_name_end_position(metrics_arr[0])); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, " counter\n", 9); + null_check(tmp_sds); + + for (i = 0; i < num_metrics; i++) { + tmp_sds = flb_sds_cat(sds, metrics_arr[i], strlen(metrics_arr[i])); + null_check(tmp_sds); + if ((i != num_metrics - 1) && (is_same_metric(metrics_arr[i], metrics_arr[i+1]) == 0)) { + tmp_sds = flb_sds_cat(sds, "# HELP ", 7); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, metrics_arr[i+1], extract_metric_name_end_position(metrics_arr[i+1])); + null_check(tmp_sds); + metric_helptxt_head->len = 0; + if (!metrics_help_txt(metrics_arr[i+1], &metric_helptxt)) { + goto error; + } + tmp_sds = flb_sds_cat(sds, metric_helptxt, metric_helptxt_head->len); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "# TYPE ", 7); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, metrics_arr[i+1], extract_metric_name_end_position(metrics_arr[i+1])); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, " counter\n", 9); + null_check(tmp_sds); + } + } + + /* Attach uptime */ + uptime = time(NULL) - config->init_time; + len = snprintf(time_str, sizeof(time_str) - 1, "%lu", uptime); + + tmp_sds = flb_sds_cat(sds, + "# HELP fluentbit_uptime Number of seconds that Fluent Bit has " + "been running.\n", 76); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "# TYPE fluentbit_uptime counter\n", 32); + null_check(tmp_sds); + + tmp_sds = flb_sds_cat(sds, "fluentbit_uptime ", 17); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, time_str, len); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "\n", 1); + null_check(tmp_sds); + + /* Attach process_start_time_seconds metric. */ + start_time_len = snprintf(start_time_str, sizeof(start_time_str) - 1, + "%lu", config->init_time); + + tmp_sds = flb_sds_cat(sds, "# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.\n", 89); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "# TYPE process_start_time_seconds gauge\n", 40); + null_check(tmp_sds); + + tmp_sds = flb_sds_cat(sds, "process_start_time_seconds ", 27); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, start_time_str, start_time_len); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "\n", 1); + null_check(tmp_sds); + + /* Attach fluentbit_build_info metric. */ + tmp_sds = flb_sds_cat(sds, "# HELP fluentbit_build_info Build version information.\n", 55); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "# TYPE fluentbit_build_info gauge\n", 34); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "fluentbit_build_info{version=\"", 30); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, FLB_VERSION_STR, sizeof(FLB_VERSION_STR) - 1); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "\",edition=\"", 11); + null_check(tmp_sds); +#ifdef FLB_ENTERPRISE + tmp_sds = flb_sds_cat(sds, "Enterprise\"} 1\n", 15); + null_check(tmp_sds); +#else + tmp_sds = flb_sds_cat(sds, "Community\"} 1\n", 14); + null_check(tmp_sds); +#endif + + msgpack_unpacked_destroy(&result); + buf->users--; + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_PROMETHEUS); + mk_http_send(request, sds, flb_sds_len(sds), NULL); + for (i = 0; i < num_metrics; i++) { + flb_sds_destroy(metrics_arr[i]); + } + flb_free(metrics_arr); + flb_sds_destroy(sds); + flb_sds_destroy(metric_helptxt); + + mk_http_done(request); + return; + +error: + mk_http_status(request, 500); + mk_http_done(request); + buf->users--; + + for (i = 0; i < index; i++) { + flb_sds_destroy(metrics_arr[i]); + } + flb_free(metrics_arr); + flb_sds_destroy(sds); + flb_sds_destroy(metric_helptxt); + msgpack_unpacked_destroy(&result); +} + +/* API: expose built-in metrics /api/v1/metrics */ +static void cb_metrics(mk_request_t *request, void *data) +{ + struct flb_hs_buf *buf; + + buf = metrics_get_latest(); + if (!buf) { + mk_http_status(request, 404); + mk_http_done(request); + return; + } + + buf->users++; + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON); + mk_http_send(request, buf->data, flb_sds_len(buf->data), NULL); + mk_http_done(request); + + buf->users--; +} + +/* Perform registration */ +int api_v1_metrics(struct flb_hs *hs) +{ + + pthread_key_create(&hs_metrics_key, hs_metrics_key_destroy); + + /* Create a message queue */ + hs->qid_metrics = mk_mq_create(hs->ctx, "/metrics", + cb_mq_metrics, NULL); + + /* HTTP end-points */ + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/metrics/prometheus", + cb_metrics_prometheus, hs); + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/metrics", cb_metrics, hs); + + return 0; +} |