summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_metrics_exporter.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_metrics_exporter.c')
-rw-r--r--fluent-bit/src/flb_metrics_exporter.c336
1 files changed, 336 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_metrics_exporter.c b/fluent-bit/src/flb_metrics_exporter.c
new file mode 100644
index 000000000..8560fe6ee
--- /dev/null
+++ b/fluent-bit/src/flb_metrics_exporter.c
@@ -0,0 +1,336 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Metrics exporter go around each Fluent Bit subsystem and collect metrics
+ * in a fixed interval of time. This operation is atomic and happens as one
+ * event handled by the main event loop.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_http_server.h>
+#include <fluent-bit/flb_storage.h>
+#include <fluent-bit/flb_metrics.h>
+#include <fluent-bit/flb_metrics_exporter.h>
+
+static int collect_inputs(msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck,
+ struct flb_config *ctx)
+{
+ int total = 0;
+ size_t s;
+ char *buf;
+ struct mk_list *head;
+ struct flb_input_instance *i;
+
+ msgpack_pack_str(mp_pck, 5);
+ msgpack_pack_str_body(mp_pck, "input", 5);
+
+ mk_list_foreach(head, &ctx->inputs) {
+ i = mk_list_entry(head, struct flb_input_instance, _head);
+ if (!i->metrics) {
+ continue;
+ }
+ total++; /* FIXME: keep total number in cache */
+ }
+
+ msgpack_pack_map(mp_pck, total);
+ mk_list_foreach(head, &ctx->inputs) {
+ i = mk_list_entry(head, struct flb_input_instance, _head);
+ if (!i->metrics) {
+ continue;
+ }
+
+ flb_metrics_dump_values(&buf, &s, i->metrics);
+ msgpack_pack_str(mp_pck, i->metrics->title_len);
+ msgpack_pack_str_body(mp_pck, i->metrics->title, i->metrics->title_len);
+ msgpack_sbuffer_write(mp_sbuf, buf, s);
+ flb_free(buf);
+ }
+
+ return 0;
+}
+
+static int collect_filters(msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck,
+ struct flb_config *ctx)
+{
+ int total = 0;
+ size_t s;
+ char *buf;
+ struct mk_list *head;
+ struct flb_filter_instance *i;
+
+ msgpack_pack_str(mp_pck, 6);
+ msgpack_pack_str_body(mp_pck, "filter", 6);
+
+ mk_list_foreach(head, &ctx->filters) {
+ i = mk_list_entry(head, struct flb_filter_instance, _head);
+ if (!i->metrics) {
+ continue;
+ }
+ total++;
+ }
+
+ msgpack_pack_map(mp_pck, total);
+ mk_list_foreach(head, &ctx->filters) {
+ i = mk_list_entry(head, struct flb_filter_instance, _head);
+ if (!i->metrics) {
+ continue;
+ }
+
+ flb_metrics_dump_values(&buf, &s, i->metrics);
+ msgpack_pack_str(mp_pck, i->metrics->title_len);
+ msgpack_pack_str_body(mp_pck, i->metrics->title, i->metrics->title_len);
+ msgpack_sbuffer_write(mp_sbuf, buf, s);
+ flb_free(buf);
+ }
+
+ return 0;
+}
+
+static int collect_outputs(msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck,
+ struct flb_config *ctx)
+{
+ int total = 0;
+ size_t s;
+ char *buf;
+ struct mk_list *head;
+ struct flb_output_instance *i;
+
+ msgpack_pack_str(mp_pck, 6);
+ msgpack_pack_str_body(mp_pck, "output", 6);
+
+ mk_list_foreach(head, &ctx->outputs) {
+ i = mk_list_entry(head, struct flb_output_instance, _head);
+ if (!i->metrics) {
+ continue;
+ }
+ total++; /* FIXME: keep total number in cache */
+ }
+
+ msgpack_pack_map(mp_pck, total);
+ mk_list_foreach(head, &ctx->outputs) {
+ i = mk_list_entry(head, struct flb_output_instance, _head);
+ if (!i->metrics) {
+ continue;
+ }
+
+ flb_metrics_dump_values(&buf, &s, i->metrics);
+ msgpack_pack_str(mp_pck, i->metrics->title_len);
+ msgpack_pack_str_body(mp_pck, i->metrics->title, i->metrics->title_len);
+ msgpack_sbuffer_write(mp_sbuf, buf, s);
+ flb_free(buf);
+ }
+
+ return 0;
+}
+
+static int collect_metrics(struct flb_me *me)
+{
+ int ret;
+ int keys;
+ char *buf_data;
+ size_t buf_size;
+ struct flb_config *ctx = me->config;
+ struct cmt *cmt;
+
+ /*
+ * msgpack buffer for old-style /v1/metrics
+ * ----------------------------------------
+ */
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+
+ /* Prepare new outgoing buffer */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ keys = 3; /* input, filter, output */
+ msgpack_pack_map(&mp_pck, keys);
+
+ /* Collect metrics from input instances */
+ collect_inputs(&mp_sbuf, &mp_pck, me->config);
+ collect_filters(&mp_sbuf, &mp_pck, me->config);
+ collect_outputs(&mp_sbuf, &mp_pck, me->config);
+
+ /*
+ * If the built-in HTTP server is enabled, push metrics and health checks
+ * ---------------------------------------------------------------------
+ */
+
+#ifdef FLB_HAVE_HTTP_SERVER
+ if (ctx->http_server == FLB_TRUE) {
+ /* /v1/metrics (old) */
+ flb_hs_push_pipeline_metrics(ctx->http_ctx, mp_sbuf.data, mp_sbuf.size);
+
+ /* /v1/health */
+ if (ctx->health_check == FLB_TRUE) {
+ flb_hs_push_health_metrics(ctx->http_ctx, mp_sbuf.data, mp_sbuf.size);
+ }
+
+ /* /v2/metrics: retrieve a CMetrics context with internal metrics */
+ cmt = flb_me_get_cmetrics(ctx);
+ if (cmt) {
+ /* encode context to msgpack */
+ ret = cmt_encode_msgpack_create(cmt, &buf_data, &buf_size);
+ if (ret == 0) {
+ flb_hs_push_metrics(ctx->http_ctx, buf_data, buf_size);
+ cmt_encode_msgpack_destroy(buf_data);
+ }
+ cmt_destroy(cmt);
+ }
+ }
+#endif
+
+ /* destroy msgpack buffer for old-style /v1/metrics */
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+
+ return 0;
+}
+
+/* Create metrics exporter context */
+struct flb_me *flb_me_create(struct flb_config *ctx)
+{
+ int fd;
+ struct mk_event *event;
+ struct flb_me *me;
+
+ /* Context */
+ me = flb_calloc(1, sizeof(struct flb_me));
+ if (!me) {
+ flb_errno();
+ return NULL;
+ }
+ me->config = ctx;
+
+ /* Initialize event loop context */
+ event = &me->event;
+ MK_EVENT_ZERO(event);
+
+ /* Run every one second */
+ fd = mk_event_timeout_create(ctx->evl, 1, 0, &me->event);
+ if (fd == -1) {
+ flb_error("[metrics_exporter] registration failed");
+ flb_free(me);
+ return NULL;
+ }
+ me->fd = fd;
+
+ return me;
+
+}
+
+/* Handle the event loop notification: "it's time to collect metrics" */
+int flb_me_fd_event(int fd, struct flb_me *me)
+{
+ if (fd != me->fd) {
+ return -1;
+ }
+
+ flb_utils_timer_consume(fd);
+ collect_metrics(me);
+
+ return 0;
+}
+
+int flb_me_destroy(struct flb_me *me)
+{
+ mk_event_timeout_destroy(me->config->evl, &me->event);
+ flb_free(me);
+ return 0;
+}
+
+/* Export all metrics as CMetrics context */
+struct cmt *flb_me_get_cmetrics(struct flb_config *ctx)
+{
+ int ret;
+ struct mk_list *head;
+ struct flb_input_instance *i; /* inputs */
+ struct flb_filter_instance *f; /* filter */
+ struct flb_output_instance *o; /* output */
+ struct cmt *cmt;
+
+ cmt = cmt_create();
+ if (!cmt) {
+ return NULL;
+ }
+
+ /* Fluent Bit metrics */
+ flb_metrics_fluentbit_add(ctx, cmt);
+
+ if (ctx->storage_metrics == FLB_TRUE) {
+ /*
+ * Storage metrics are updated in two places:
+ *
+ * - global metrics: updated by using flb_storage_metrics_update()
+ * - input: flb_storage callback update the metrics automatically every 5 seconds
+ *
+ * In this part, we only take care about the global storage metrics.
+ */
+ flb_storage_metrics_update(ctx, ctx->storage_metrics_ctx);
+ ret = cmt_cat(cmt, ctx->storage_metrics_ctx->cmt);
+ if (ret == -1) {
+ flb_error("[metrics exporter] could not append global storage_metrics");
+ cmt_destroy(cmt);
+ return NULL;
+ }
+ }
+
+ /* Pipeline metrics: input, filters, outputs */
+ mk_list_foreach(head, &ctx->inputs) {
+ i = mk_list_entry(head, struct flb_input_instance, _head);
+ ret = cmt_cat(cmt, i->cmt);
+ if (ret == -1) {
+ flb_error("[metrics exporter] could not append metrics from %s",
+ flb_input_name(i));
+ cmt_destroy(cmt);
+ return NULL;
+ }
+ }
+
+ mk_list_foreach(head, &ctx->filters) {
+ f = mk_list_entry(head, struct flb_filter_instance, _head);
+ ret = cmt_cat(cmt, f->cmt);
+ if (ret == -1) {
+ flb_error("[metrics exporter] could not append metrics from %s",
+ flb_filter_name(f));
+ cmt_destroy(cmt);
+ return NULL;
+ }
+ }
+
+ mk_list_foreach(head, &ctx->outputs) {
+ o = mk_list_entry(head, struct flb_output_instance, _head);
+ ret = cmt_cat(cmt, o->cmt);
+ if (ret == -1) {
+ flb_error("[metrics exporter] could not append metrics from %s",
+ flb_output_name(o));
+ cmt_destroy(cmt);
+ return NULL;
+ }
+ }
+
+ return cmt;
+}