summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_prometheus_exporter/prom.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/out_prometheus_exporter/prom.c')
-rw-r--r--src/fluent-bit/plugins/out_prometheus_exporter/prom.c298
1 files changed, 298 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_prometheus_exporter/prom.c b/src/fluent-bit/plugins/out_prometheus_exporter/prom.c
new file mode 100644
index 000000000..d471d2bab
--- /dev/null
+++ b/src/fluent-bit/plugins/out_prometheus_exporter/prom.c
@@ -0,0 +1,298 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_metrics.h>
+
+#include "prom.h"
+#include "prom_http.h"
+
+static int config_add_labels(struct flb_output_instance *ins,
+ struct prom_exporter *ctx)
+{
+ struct mk_list *head;
+ struct flb_config_map_val *mv;
+ struct flb_slist_entry *k = NULL;
+ struct flb_slist_entry *v = NULL;
+ struct flb_kv *kv;
+
+ if (!ctx->add_labels || mk_list_size(ctx->add_labels) == 0) {
+ return 0;
+ }
+
+ /* iterate all 'add_label' definitions */
+ flb_config_map_foreach(head, mv, ctx->add_labels) {
+ if (mk_list_size(mv->val.list) != 2) {
+ flb_plg_error(ins, "'add_label' expects a key and a value, "
+ "e.g: 'add_label version 1.8.0'");
+ return -1;
+ }
+
+ k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
+ v = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
+
+ kv = flb_kv_item_create(&ctx->kv_labels, k->str, v->str);
+ if (!kv) {
+ flb_plg_error(ins, "could not append label %s=%s\n", k->str, v->str);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int cb_prom_init(struct flb_output_instance *ins,
+ struct flb_config *config,
+ void *data)
+{
+ int ret;
+ struct prom_exporter *ctx;
+
+ flb_output_net_default("0.0.0.0", 2021 , ins);
+
+ ctx = flb_calloc(1, sizeof(struct prom_exporter));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+ ctx->ins = ins;
+ flb_kv_init(&ctx->kv_labels);
+ flb_output_set_context(ins, ctx);
+
+ /* Load config map */
+ ret = flb_output_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* Parse 'add_label' */
+ ret = config_add_labels(ins, ctx);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* HTTP Server context */
+ ctx->http = prom_http_server_create(ctx,
+ ins->host.name, ins->host.port, config);
+ if (!ctx->http) {
+ flb_plg_error(ctx->ins, "could not initialize HTTP server, aborting");
+ return -1;
+ }
+
+ /* Hash table for metrics */
+ ctx->ht_metrics = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 32, 0);
+ if (!ctx->ht_metrics) {
+ flb_plg_error(ctx->ins, "could not initialize hash table for metrics");
+ return -1;
+ }
+
+ /* Start HTTP Server */
+ ret = prom_http_server_start(ctx->http);
+ if (ret == -1) {
+ return -1;
+ }
+
+ flb_plg_info(ctx->ins, "listening iface=%s tcp_port=%d",
+ ins->host.name, ins->host.port);
+ return 0;
+}
+
+static void append_labels(struct prom_exporter *ctx, struct cmt *cmt)
+{
+ struct flb_kv *kv;
+ struct mk_list *head;
+
+ mk_list_foreach(head, &ctx->kv_labels) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+ cmt_label_add(cmt, kv->key, kv->val);
+ }
+}
+
+static int hash_store(struct prom_exporter *ctx, struct flb_input_instance *ins,
+ cfl_sds_t buf)
+{
+ int ret;
+ int len;
+
+ len = strlen(ins->name);
+
+ /* store/override the content into the hash table */
+ ret = flb_hash_table_add(ctx->ht_metrics, ins->name, len,
+ buf, cfl_sds_len(buf));
+ if (ret < 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static flb_sds_t hash_format_metrics(struct prom_exporter *ctx)
+{
+ int size = 2048;
+ flb_sds_t buf;
+
+ struct mk_list *head;
+ struct flb_hash_table_entry *entry;
+
+
+ buf = flb_sds_create_size(size);
+ if (!buf) {
+ return NULL;
+ }
+
+ /* Take every hash entry and compose one buffer with the whole content */
+ mk_list_foreach(head, &ctx->ht_metrics->entries) {
+ entry = mk_list_entry(head, struct flb_hash_table_entry, _head_parent);
+ flb_sds_cat_safe(&buf, entry->val, entry->val_size);
+ }
+
+ return buf;
+}
+
+static void cb_prom_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *ins, void *out_context,
+ struct flb_config *config)
+{
+ int ret;
+ int add_ts;
+ size_t off = 0;
+ flb_sds_t metrics;
+ cfl_sds_t text;
+ struct cmt *cmt;
+ struct prom_exporter *ctx = out_context;
+
+ /*
+ * A new set of metrics has arrived, perform decoding, apply labels,
+ * convert to Prometheus text format and store the output in the
+ * hash table for metrics.
+ */
+ ret = cmt_decode_msgpack_create(&cmt,
+ (char *) event_chunk->data,
+ event_chunk->size, &off);
+ if (ret != 0) {
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+
+ /* append labels set by config */
+ append_labels(ctx, cmt);
+
+ /* add timestamp in the output format ? */
+ if (ctx->add_timestamp) {
+ add_ts = CMT_TRUE;
+ }
+ else {
+ add_ts = CMT_FALSE;
+ }
+
+ /* convert to text representation */
+ text = cmt_encode_prometheus_create(cmt, add_ts);
+ if (!text) {
+ cmt_destroy(cmt);
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+ cmt_destroy(cmt);
+
+ if (cfl_sds_len(text) == 0) {
+ flb_plg_debug(ctx->ins, "context without metrics (empty)");
+ cmt_encode_text_destroy(text);
+ FLB_OUTPUT_RETURN(FLB_OK);
+ }
+
+ /* register payload of metrics / override previous one */
+ ret = hash_store(ctx, ins, text);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not store metrics coming from: %s",
+ flb_input_name(ins));
+ cmt_encode_prometheus_destroy(text);
+ cmt_destroy(cmt);
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+ cmt_encode_prometheus_destroy(text);
+
+ /* retrieve a full copy of all metrics */
+ metrics = hash_format_metrics(ctx);
+ if (!metrics) {
+ flb_plg_error(ctx->ins, "could not retrieve metrics");
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+
+ /* push new (full) metrics payload */
+ ret = prom_http_server_mq_push_metrics(ctx->http,
+ (char *) metrics,
+ flb_sds_len(metrics));
+ flb_sds_destroy(metrics);
+
+ if (ret != 0) {
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+
+ FLB_OUTPUT_RETURN(FLB_OK);
+}
+
+static int cb_prom_exit(void *data, struct flb_config *config)
+{
+ struct prom_exporter *ctx = data;
+
+ if (!ctx) {
+ return 0;
+ }
+
+ if (ctx->ht_metrics) {
+ flb_hash_table_destroy(ctx->ht_metrics);
+ }
+
+ flb_kv_release(&ctx->kv_labels);
+ prom_http_server_stop(ctx->http);
+ prom_http_server_destroy(ctx->http);
+ flb_free(ctx);
+
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_BOOL, "add_timestamp", "false",
+ 0, FLB_TRUE, offsetof(struct prom_exporter, add_timestamp),
+ "Add timestamp to every metric honoring collection time."
+ },
+
+ {
+ FLB_CONFIG_MAP_SLIST_1, "add_label", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct prom_exporter, add_labels),
+ "TCP port for listening for HTTP connections."
+ },
+
+ /* EOF */
+ {0}
+};
+
+/* Plugin reference */
+struct flb_output_plugin out_prometheus_exporter_plugin = {
+ .name = "prometheus_exporter",
+ .description = "Prometheus Exporter",
+ .cb_init = cb_prom_init,
+ .cb_flush = cb_prom_flush,
+ .cb_exit = cb_prom_exit,
+ .flags = FLB_OUTPUT_NET,
+ .event_type = FLB_OUTPUT_METRICS,
+ .config_map = config_map,
+};