diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_prometheus_exporter/prom_http.c')
-rw-r--r-- | src/fluent-bit/plugins/out_prometheus_exporter/prom_http.c | 268 |
1 files changed, 268 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_prometheus_exporter/prom_http.c b/src/fluent-bit/plugins/out_prometheus_exporter/prom_http.c new file mode 100644 index 000000000..7ff3f8200 --- /dev/null +++ b/src/fluent-bit/plugins/out_prometheus_exporter/prom_http.c @@ -0,0 +1,268 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_http_server.h> +#include "prom.h" +#include "prom_http.h" + +pthread_key_t ph_metrics_key; + +/* Return the newest storage metrics buffer */ +static struct prom_http_buf *metrics_get_latest() +{ + struct prom_http_buf *buf; + struct mk_list *metrics_list; + + metrics_list = pthread_getspecific(ph_metrics_key); + if (!metrics_list) { + return NULL; + } + + if (mk_list_size(metrics_list) == 0) { + return NULL; + } + + buf = mk_list_entry_last(metrics_list, struct prom_http_buf, _head); + return buf; +} + +/* Delete unused metrics, note that we only care about the latest node */ +static int cleanup_metrics() +{ + int c = 0; + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *metrics_list; + struct prom_http_buf *last; + struct prom_http_buf *entry; + + metrics_list = pthread_getspecific(ph_metrics_key); + if (!metrics_list) { + return -1; + } + + last = metrics_get_latest(); + if (!last) { + return -1; + } + + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct prom_http_buf, _head); + if (entry != last && entry->users == 0) { + mk_list_del(&entry->_head); + flb_free(entry->buf_data); + flb_free(entry); + c++; + } + } + + return c; +} + +/* destructor callback */ +static void destruct_metrics(void *data) +{ + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *metrics_list = (struct mk_list*)data; + struct prom_http_buf *entry; + + if (!metrics_list) { + return; + } + + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct prom_http_buf, _head); + mk_list_del(&entry->_head); + flb_free(entry->buf_data); + flb_free(entry); + } + + flb_free(metrics_list); +} + +/* + * Callback invoked every time a new payload of Metrics is received from + * Fluent Bit engine through Message Queue channel. + * + * This function runs in a Monkey HTTP thread worker and it purpose is + * to take the metrics data and store it locally for every thread, so then + * it can be available on 'cb_metrics()' to serve it as a response. + */ +static void cb_mq_metrics(mk_mq_t *queue, void *data, size_t size) +{ + struct prom_http_buf *buf; + struct mk_list *metrics_list = NULL; + + metrics_list = pthread_getspecific(ph_metrics_key); + if (!metrics_list) { + metrics_list = flb_malloc(sizeof(struct mk_list)); + if (!metrics_list) { + flb_errno(); + return; + } + mk_list_init(metrics_list); + pthread_setspecific(ph_metrics_key, metrics_list); + } + + /* FIXME: convert data ? */ + buf = flb_malloc(sizeof(struct prom_http_buf)); + if (!buf) { + flb_errno(); + return; + } + buf->users = 0; + buf->buf_data = flb_malloc(size); + if (!buf->buf_data) { + flb_errno(); + flb_free(buf); + return; + } + memcpy(buf->buf_data, data, size); + buf->buf_size = size; + + mk_list_add(&buf->_head, metrics_list); + cleanup_metrics(); +} + +/* Create message queue to receive Metrics payload from the engine */ +static int http_server_mq_create(struct prom_http *ph) +{ + int ret; + + pthread_key_create(&ph_metrics_key, destruct_metrics); + + ret = mk_mq_create(ph->ctx, "/metrics", cb_mq_metrics, NULL); + if (ret == -1) { + return -1; + } + ph->qid_metrics = ret; + return 0; +} + +/* HTTP endpoint: /metrics */ +static void cb_metrics(mk_request_t *request, void *data) +{ + struct prom_http_buf *buf; + (void) data; + + buf = metrics_get_latest(); + if (!buf) { + mk_http_status(request, 404); + mk_http_done(request); + return; + } + + buf->users++; + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_PROMETHEUS); + mk_http_send(request, buf->buf_data, buf->buf_size, NULL); + mk_http_done(request); + + buf->users--; +} + +/* HTTP endpoint: / (root) */ +static void cb_root(mk_request_t *request, void *data) +{ + (void) data; + + mk_http_status(request, 200); + mk_http_send(request, "Fluent Bit Prometheus Exporter\n", 31, NULL); + mk_http_done(request); +} + +struct prom_http *prom_http_server_create(struct prom_exporter *ctx, + const char *listen, + int tcp_port, + struct flb_config *config) +{ + int ret; + int vid; + char tmp[32]; + struct prom_http *ph; + + ph = flb_malloc(sizeof(struct prom_http)); + if (!ph) { + flb_errno(); + return NULL; + } + ph->config = config; + + /* HTTP Server context */ + ph->ctx = mk_create(); + if (!ph->ctx) { + flb_free(ph); + return NULL; + } + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) -1, "%s:%d", listen, tcp_port); + mk_config_set(ph->ctx, + "Listen", tmp, + "Workers", "1", + NULL); + + /* Virtual host */ + vid = mk_vhost_create(ph->ctx, NULL); + ph->vid = vid; + + /* Set HTTP URI callbacks */ + mk_vhost_handler(ph->ctx, vid, "/metrics", cb_metrics, NULL); + mk_vhost_handler(ph->ctx, vid, "/", cb_root, NULL); + + /* Create a Message Queue to push 'metrics' to HTTP workers */ + ret = http_server_mq_create(ph); + if (ret == -1) { + mk_destroy(ph->ctx); + flb_free(ph); + return NULL; + } + + return ph; +} + +void prom_http_server_destroy(struct prom_http *ph) +{ + if (ph) { + /* TODO: release mk_vhost */ + if (ph->ctx) { + mk_destroy(ph->ctx); + } + flb_free(ph); + } +} + +int prom_http_server_start(struct prom_http *ph) +{ + return mk_start(ph->ctx); +} + +int prom_http_server_stop(struct prom_http *ph) +{ + return mk_stop(ph->ctx); +} + +int prom_http_server_mq_push_metrics(struct prom_http *ph, + void *data, size_t size) +{ + return mk_mq_send(ph->ctx, ph->qid_metrics, data, size); +} |