summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/http_server
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--fluent-bit/src/http_server/CMakeLists.txt20
-rw-r--r--fluent-bit/src/http_server/api/v1/CMakeLists.txt20
-rw-r--r--fluent-bit/src/http_server/api/v1/health.c335
-rw-r--r--fluent-bit/src/http_server/api/v1/health.h73
-rw-r--r--fluent-bit/src/http_server/api/v1/metrics.c579
-rw-r--r--fluent-bit/src/http_server/api/v1/metrics.h30
-rw-r--r--fluent-bit/src/http_server/api/v1/plugins.c109
-rw-r--r--fluent-bit/src/http_server/api/v1/plugins.h28
-rw-r--r--fluent-bit/src/http_server/api/v1/register.c49
-rw-r--r--fluent-bit/src/http_server/api/v1/register.h28
-rw-r--r--fluent-bit/src/http_server/api/v1/storage.c204
-rw-r--r--fluent-bit/src/http_server/api/v1/storage.h28
-rw-r--r--fluent-bit/src/http_server/api/v1/trace.c615
-rw-r--r--fluent-bit/src/http_server/api/v1/trace.h28
-rw-r--r--fluent-bit/src/http_server/api/v1/uptime.c111
-rw-r--r--fluent-bit/src/http_server/api/v1/uptime.h28
-rw-r--r--fluent-bit/src/http_server/api/v2/CMakeLists.txt10
-rw-r--r--fluent-bit/src/http_server/api/v2/metrics.c259
-rw-r--r--fluent-bit/src/http_server/api/v2/metrics.h28
-rw-r--r--fluent-bit/src/http_server/api/v2/register.c31
-rw-r--r--fluent-bit/src/http_server/api/v2/register.h28
-rw-r--r--fluent-bit/src/http_server/api/v2/reload.c161
-rw-r--r--fluent-bit/src/http_server/api/v2/reload.h28
-rw-r--r--fluent-bit/src/http_server/flb_hs.c147
-rw-r--r--fluent-bit/src/http_server/flb_hs_endpoints.c121
-rw-r--r--fluent-bit/src/http_server/flb_hs_utils.c48
26 files changed, 3146 insertions, 0 deletions
diff --git a/fluent-bit/src/http_server/CMakeLists.txt b/fluent-bit/src/http_server/CMakeLists.txt
new file mode 100644
index 00000000..acded936
--- /dev/null
+++ b/fluent-bit/src/http_server/CMakeLists.txt
@@ -0,0 +1,20 @@
+if(NOT FLB_METRICS)
+ message(FATAL_ERROR "FLB_HTTP_SERVER requires FLB_METRICS=On.")
+endif()
+
+# Core Source
+set(src
+ flb_hs.c
+ flb_hs_endpoints.c
+ flb_hs_utils.c
+ )
+
+# api/v1
+add_subdirectory(api/v1)
+
+# api/v2
+add_subdirectory(api/v2)
+
+include_directories(${MONKEY_INCLUDE_DIR})
+add_library(flb-http-server STATIC ${src})
+target_link_libraries(flb-http-server monkey-core-static api-v1 api-v2)
diff --git a/fluent-bit/src/http_server/api/v1/CMakeLists.txt b/fluent-bit/src/http_server/api/v1/CMakeLists.txt
new file mode 100644
index 00000000..af86e43f
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/CMakeLists.txt
@@ -0,0 +1,20 @@
+# api/v1
+set(src
+ uptime.c
+ metrics.c
+ storage.c
+ plugins.c
+ register.c
+ health.c
+ )
+
+if(FLB_CHUNK_TRACE)
+ set(src
+ ${src}
+ trace.c
+ )
+endif()
+
+include_directories(${MONKEY_INCLUDE_DIR})
+add_library(api-v1 STATIC ${src})
+target_link_libraries(api-v1 monkey-core-static fluent-bit-static)
diff --git a/fluent-bit/src/http_server/api/v1/health.c b/fluent-bit/src/http_server/api/v1/health.c
new file mode 100644
index 00000000..713d4b87
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/health.c
@@ -0,0 +1,335 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include<stdio.h>
+#include <stdlib.h>
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_macros.h>
+#include <fluent-bit/flb_http_server.h>
+#include <msgpack.h>
+
+#include "health.h"
+
+struct flb_health_check_metrics_counter *metrics_counter;
+
+pthread_key_t hs_health_key;
+
+static struct mk_list *hs_health_key_create()
+{
+ struct mk_list *metrics_list = NULL;
+
+ metrics_list = flb_malloc(sizeof(struct mk_list));
+ if (!metrics_list) {
+ flb_errno();
+ return NULL;
+ }
+ mk_list_init(metrics_list);
+ pthread_setspecific(hs_health_key, metrics_list);
+
+ return metrics_list;
+}
+
+static void hs_health_key_destroy(void *data)
+{
+ struct mk_list *metrics_list = (struct mk_list*)data;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_hs_hc_buf *entry;
+
+ if (metrics_list == NULL) {
+ return;
+ }
+ mk_list_foreach_safe(head, tmp, metrics_list) {
+ entry = mk_list_entry(head, struct flb_hs_hc_buf, _head);
+ if (entry != NULL) {
+ mk_list_del(&entry->_head);
+ flb_free(entry);
+ }
+ }
+
+ flb_free(metrics_list);
+}
+
+/* initialize the metrics counters */
+static void counter_init(struct flb_hs *hs) {
+
+ metrics_counter = flb_malloc(sizeof(struct flb_health_check_metrics_counter));
+
+ if (!metrics_counter) {
+ flb_errno();
+ return;
+ }
+
+ metrics_counter->error_counter = 0;
+ metrics_counter->retry_failure_counter = 0;
+ metrics_counter->error_limit = hs->config->hc_errors_count;
+ metrics_counter->retry_failure_limit = hs->config->hc_retry_failure_count;
+ metrics_counter->period_counter = 0;
+ metrics_counter->period_limit = hs->config->health_check_period;
+
+}
+
+/*
+* tell what's the current status for health check
+* One default background is that the metrics received and saved into
+* message queue every time is a accumulation of error numbers,
+* not a error number in recent second. So to get the error number
+* in a period, we need to use:
+* the error number of the newest metrics message minus
+* the error number in oldest metrics of period
+*/
+static int is_healthy() {
+
+ struct mk_list *metrics_list;
+ struct flb_hs_hc_buf *buf;
+ int period_errors;
+ int period_retry_failure;
+
+ metrics_list = pthread_getspecific(hs_health_key);
+ if (metrics_list == NULL) {
+ metrics_list = hs_health_key_create();
+ if (metrics_list == NULL) {
+ return FLB_FALSE;
+ }
+ }
+
+ if (mk_list_is_empty(metrics_list) == 0) {
+ return FLB_TRUE;
+ }
+
+ /* Get the error metrics entry from the start time of current period */
+ buf = mk_list_entry_first(metrics_list, struct flb_hs_hc_buf, _head);
+
+ /*
+ * increase user so clean up function won't
+ * free the memory and delete the data
+ */
+ buf->users++;
+
+ /* the error count saved in message queue is the number of
+ * error count at that time. So the math is that:
+ * the error count in current period = (current error count in total) -
+ * (begin error count in the period)
+ */
+ period_errors = metrics_counter->error_counter - buf->error_count;
+ period_retry_failure = metrics_counter->retry_failure_counter -
+ buf->retry_failure_count;
+ buf->users--;
+
+ if (period_errors > metrics_counter->error_limit ||
+ period_retry_failure > metrics_counter->retry_failure_limit) {
+
+ return FLB_FALSE;
+ }
+
+ return FLB_TRUE;
+}
+
+/* read the metrics from message queue and update the counter*/
+static void read_metrics(void *data, size_t size, int* error_count,
+ int* retry_failure_count)
+{
+ int i;
+ int j;
+ int m;
+ msgpack_unpacked result;
+ msgpack_object map;
+ size_t off = 0;
+ int errors = 0;
+ int retry_failure = 0;
+
+ msgpack_unpacked_init(&result);
+ msgpack_unpack_next(&result, data, size, &off);
+ map = result.data;
+
+ for (i = 0; i < map.via.map.size; i++) {
+ msgpack_object k;
+ msgpack_object v;
+
+ /* Keys: input, output */
+ k = map.via.map.ptr[i].key;
+ v = map.via.map.ptr[i].val;
+ if (k.via.str.size != sizeof("output") - 1 ||
+ strncmp(k.via.str.ptr, "output", k.via.str.size) != 0) {
+
+ continue;
+ }
+ /* Iterate sub-map */
+ for (j = 0; j < v.via.map.size; j++) {
+ msgpack_object sv;
+
+ /* Keys: plugin name , values: metrics */
+ sv = v.via.map.ptr[j].val;
+
+ for (m = 0; m < sv.via.map.size; m++) {
+ msgpack_object mk;
+ msgpack_object mv;
+
+ mk = sv.via.map.ptr[m].key;
+ mv = sv.via.map.ptr[m].val;
+
+ if (mk.via.str.size == sizeof("errors") - 1 &&
+ strncmp(mk.via.str.ptr, "errors", mk.via.str.size) == 0) {
+ errors += mv.via.u64;
+ }
+ else if (mk.via.str.size == sizeof("retries_failed") - 1 &&
+ strncmp(mk.via.str.ptr, "retries_failed",
+ mk.via.str.size) == 0) {
+ retry_failure += mv.via.u64;
+ }
+ }
+ }
+ }
+
+ *error_count = errors;
+ *retry_failure_count = retry_failure;
+ msgpack_unpacked_destroy(&result);
+}
+
+/*
+* Delete unused metrics, note that we only care about the latest node
+* we use this function to maintain the metrics queue only save the metrics
+* in a period. The old metrics which is out of period will be removed
+*/
+static int cleanup_metrics()
+{
+ int c = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct mk_list *metrics_list;
+ struct flb_hs_hc_buf *entry;
+
+ metrics_list = pthread_getspecific(hs_health_key);
+ if (!metrics_list) {
+ return -1;
+ }
+
+ if (metrics_counter->period_counter < metrics_counter->period_limit) {
+ return 0;
+ }
+
+ /* remove the oldest metrics if it's out of period */
+ mk_list_foreach_safe(head, tmp, metrics_list) {
+ entry = mk_list_entry(head, struct flb_hs_hc_buf, _head);
+ if (metrics_counter->period_counter > metrics_counter->period_limit &&
+ entry->users == 0) {
+ metrics_counter->period_counter--;
+ mk_list_del(&entry->_head);
+ flb_free(entry);
+ c++;
+ }
+ else {
+ break;
+ }
+ }
+
+ return c;
+}
+
+/*
+ * Callback invoked every time some metrics are received through a
+ * message queue channel. This function runs in a Monkey HTTP thread
+ * worker and it purpose is to take the metrics data and record the health
+ * status based on the metrics.
+ * This happens every second based on the event config.
+ * So we treat period_counter to count the time.
+ * And we maintain a message queue with the size of period limit number
+ * so every time we get a new metrics data in, if the message queue size is
+ * large than period limit, we will do the clean up func to
+ * remove the oldest metrics.
+ */
+static void cb_mq_health(mk_mq_t *queue, void *data, size_t size)
+{
+ struct flb_hs_hc_buf *buf;
+ struct mk_list *metrics_list = NULL;
+ int error_count = 0;
+ int retry_failure_count = 0;
+
+ metrics_list = pthread_getspecific(hs_health_key);
+
+ if (metrics_list == NULL) {
+ metrics_list = hs_health_key_create();
+ if (metrics_list == NULL) {
+ return;
+ }
+ }
+
+ metrics_counter->period_counter++;
+
+ /* this is to remove the metrics out of period*/
+ cleanup_metrics();
+
+ buf = flb_malloc(sizeof(struct flb_hs_hc_buf));
+ if (!buf) {
+ flb_errno();
+ return;
+ }
+
+ buf->users = 0;
+
+ read_metrics(data, size, &error_count, &retry_failure_count);
+
+ metrics_counter->error_counter = error_count;
+ metrics_counter->retry_failure_counter = retry_failure_count;
+
+ buf->error_count = error_count;
+ buf->retry_failure_count = retry_failure_count;
+
+ mk_list_add(&buf->_head, metrics_list);
+}
+
+/* API: Get fluent Bit Health Status */
+static void cb_health(mk_request_t *request, void *data)
+{
+ int status = is_healthy();
+
+ if (status == FLB_TRUE) {
+ mk_http_status(request, 200);
+ mk_http_send(request, "ok\n", strlen("ok\n"), NULL);
+ mk_http_done(request);
+ }
+ else {
+ mk_http_status(request, 500);
+ mk_http_send(request, "error\n", strlen("error\n"), NULL);
+ mk_http_done(request);
+ }
+}
+
+/* Perform registration */
+int api_v1_health(struct flb_hs *hs)
+{
+
+ pthread_key_create(&hs_health_key, hs_health_key_destroy);
+
+ counter_init(hs);
+ /* Create a message queue */
+ hs->qid_health = mk_mq_create(hs->ctx, "/health",
+ cb_mq_health, NULL);
+
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/health", cb_health, hs);
+ return 0;
+}
+
+void flb_hs_health_destroy()
+{
+ flb_free(metrics_counter);
+}
diff --git a/fluent-bit/src/http_server/api/v1/health.h b/fluent-bit/src/http_server/api/v1/health.h
new file mode 100644
index 00000000..27a826f4
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/health.h
@@ -0,0 +1,73 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+#ifndef FLB_HS_API_V1_HEALTH_H
+#define FLB_HS_API_V1_HEALTH_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+
+struct flb_health_check_metrics_counter {
+
+ /*
+ * health check error limit,
+ * setup by customer through config: HC_Errors_Count
+ */
+ int error_limit;
+
+ /* counter the error number in metrics*/
+ int error_counter;
+
+ /*
+ * health check retry failed limit,
+ * setup by customer through config: HC_Retry_Failure_Count
+ */
+ int retry_failure_limit;
+
+ /* count the retry failed number in metrics*/
+ int retry_failure_counter;
+
+ /*period limit, setup by customer through config: HC_Period*/
+ int period_limit;
+
+ /* count the seconds in one period*/
+ int period_counter;
+
+};
+
+
+/*
+ * error and retry failure buffers that contains certain cached data to be used
+ * by health check.
+ */
+struct flb_hs_hc_buf {
+ int users;
+ int error_count;
+ int retry_failure_count;
+ struct mk_list _head;
+};
+
+/* health endpoint*/
+int api_v1_health(struct flb_hs *hs);
+
+/* clean up health resource when shutdown*/
+void flb_hs_health_destroy();
+#endif
diff --git a/fluent-bit/src/http_server/api/v1/metrics.c b/fluent-bit/src/http_server/api/v1/metrics.c
new file mode 100644
index 00000000..4a541eaa
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/metrics.c
@@ -0,0 +1,579 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_version.h>
+#include <fluent-bit/flb_time.h>
+#include "metrics.h"
+
+#include <fluent-bit/flb_http_server.h>
+#include <msgpack.h>
+
+#define null_check(x) do { if (!x) { goto error; } else {sds = x;} } while (0)
+
+pthread_key_t hs_metrics_key;
+
+static struct mk_list *hs_metrics_key_create()
+{
+ struct mk_list *metrics_list = NULL;
+
+ metrics_list = flb_malloc(sizeof(struct mk_list));
+ if (metrics_list == NULL) {
+ flb_errno();
+ return NULL;
+ }
+ mk_list_init(metrics_list);
+ pthread_setspecific(hs_metrics_key, metrics_list);
+
+ return metrics_list;
+}
+
+static void hs_metrics_key_destroy(void *data)
+{
+ struct mk_list *metrics_list = (struct mk_list*)data;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_hs_buf *entry;
+
+ if (metrics_list == NULL) {
+ return;
+ }
+ mk_list_foreach_safe(head, tmp, metrics_list) {
+ entry = mk_list_entry(head, struct flb_hs_buf, _head);
+ if (entry != NULL) {
+ if (entry->raw_data != NULL) {
+ flb_free(entry->raw_data);
+ entry->raw_data = NULL;
+ }
+ if (entry->data) {
+ flb_sds_destroy(entry->data);
+ entry->data = NULL;
+ }
+ mk_list_del(&entry->_head);
+ flb_free(entry);
+ }
+ }
+
+ flb_free(metrics_list);
+}
+
+/* Return the newest metrics buffer */
+static struct flb_hs_buf *metrics_get_latest()
+{
+ struct flb_hs_buf *buf;
+ struct mk_list *metrics_list;
+
+ metrics_list = pthread_getspecific(hs_metrics_key);
+ if (!metrics_list) {
+ return NULL;
+ }
+
+ if (mk_list_size(metrics_list) == 0) {
+ return NULL;
+ }
+
+ buf = mk_list_entry_last(metrics_list, struct flb_hs_buf, _head);
+ return buf;
+}
+
+/* Delete unused metrics, note that we only care about the latest node */
+static int cleanup_metrics()
+{
+ int c = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct mk_list *metrics_list;
+ struct flb_hs_buf *last;
+ struct flb_hs_buf *entry;
+
+ metrics_list = pthread_getspecific(hs_metrics_key);
+ if (!metrics_list) {
+ return -1;
+ }
+
+ last = metrics_get_latest();
+ if (!last) {
+ return -1;
+ }
+
+ mk_list_foreach_safe(head, tmp, metrics_list) {
+ entry = mk_list_entry(head, struct flb_hs_buf, _head);
+ if (entry != last && entry->users == 0) {
+ mk_list_del(&entry->_head);
+ flb_sds_destroy(entry->data);
+ flb_free(entry->raw_data);
+ flb_free(entry);
+ c++;
+ }
+ }
+
+ return c;
+}
+
+/*
+ * Callback invoked every time some metrics are received through a
+ * message queue channel. This function runs in a Monkey HTTP thread
+ * worker and it purpose is to take the metrics data and store it
+ * somewhere so then it can be available by the end-points upon
+ * HTTP client requests.
+ */
+static void cb_mq_metrics(mk_mq_t *queue, void *data, size_t size)
+{
+ flb_sds_t out_data;
+ struct flb_hs_buf *buf;
+ struct mk_list *metrics_list = NULL;
+
+ metrics_list = pthread_getspecific(hs_metrics_key);
+ if (!metrics_list) {
+ metrics_list = hs_metrics_key_create();
+ if (metrics_list == NULL) {
+ return;
+ }
+ }
+
+ /* Convert msgpack to JSON */
+ out_data = flb_msgpack_raw_to_json_sds(data, size);
+ if (!out_data) {
+ return;
+ }
+
+ buf = flb_malloc(sizeof(struct flb_hs_buf));
+ if (!buf) {
+ flb_errno();
+ flb_sds_destroy(out_data);
+ return;
+ }
+ buf->users = 0;
+ buf->data = out_data;
+
+ buf->raw_data = flb_malloc(size);
+ if (!buf->raw_data) {
+ flb_errno();
+ flb_sds_destroy(out_data);
+ flb_free(buf);
+ return;
+ }
+ memcpy(buf->raw_data, data, size);
+ buf->raw_size = size;
+
+ mk_list_add(&buf->_head, metrics_list);
+
+ cleanup_metrics();
+}
+
+int string_cmp(const void* a_arg, const void* b_arg) {
+ char *a = *(char **)a_arg;
+ char *b = *(char **)b_arg;
+
+ return strcmp(a, b);
+}
+
+size_t extract_metric_name_end_position(char *s) {
+ int i;
+
+ for (i = 0; i < flb_sds_len(s); i++) {
+ if (s[i] == '{') {
+ return i;
+ }
+ }
+ return 0;
+}
+
+int is_same_metric(char *s1, char *s2) {
+ int i;
+ int p1 = extract_metric_name_end_position(s1);
+ int p2 = extract_metric_name_end_position(s2);
+
+ if (p1 != p2) {
+ return 0;
+ }
+
+ for (i = 0; i < p1; i++) {
+ if (s1[i] != s2[i]) {
+ return 0;
+ }
+ }
+ return 1;
+}
+
+/* derive HELP text from metricname */
+/* if help text length > 128, increase init memory for metric_helptxt */
+flb_sds_t metrics_help_txt(char *metric_name, flb_sds_t *metric_helptxt)
+{
+ if (strstr(metric_name, "input_bytes")) {
+ return flb_sds_cat(*metric_helptxt, " Number of input bytes.\n", 24);
+ }
+ else if (strstr(metric_name, "input_records")) {
+ return flb_sds_cat(*metric_helptxt, " Number of input records.\n", 26);
+ }
+ else if (strstr(metric_name, "output_bytes")) {
+ return flb_sds_cat(*metric_helptxt, " Number of output bytes.\n", 25);
+ }
+ else if (strstr(metric_name, "output_records")) {
+ return flb_sds_cat(*metric_helptxt, " Number of output records.\n", 27);
+ }
+ else if (strstr(metric_name, "output_errors")) {
+ return flb_sds_cat(*metric_helptxt, " Number of output errors.\n", 26);
+ }
+ else if (strstr(metric_name, "output_retries_failed")) {
+ return flb_sds_cat(*metric_helptxt, " Number of abandoned batches because the maximum number of re-tries was reached.\n", 81);
+ }
+ else if (strstr(metric_name, "output_retries")) {
+ return flb_sds_cat(*metric_helptxt, " Number of output retries.\n", 27);
+ }
+ else if (strstr(metric_name, "output_proc_records")) {
+ return flb_sds_cat(*metric_helptxt, " Number of processed output records.\n", 37);
+ }
+ else if (strstr(metric_name, "output_proc_bytes")) {
+ return flb_sds_cat(*metric_helptxt, " Number of processed output bytes.\n", 35);
+ }
+ else if (strstr(metric_name, "output_dropped_records")) {
+ return flb_sds_cat(*metric_helptxt, " Number of dropped records.\n", 28);
+ }
+ else if (strstr(metric_name, "output_retried_records")) {
+ return flb_sds_cat(*metric_helptxt, " Number of retried records.\n", 28);
+ }
+ else {
+ return (flb_sds_cat(*metric_helptxt, " Fluentbit metrics.\n", 20));
+ }
+}
+
+/* API: expose metrics in Prometheus format /api/v1/metrics/prometheus */
+void cb_metrics_prometheus(mk_request_t *request, void *data)
+{
+ int i;
+ int j;
+ int m;
+ int len;
+ int time_len;
+ int start_time_len;
+ uint64_t uptime;
+ size_t index;
+ size_t num_metrics = 0;
+ long now;
+ flb_sds_t sds;
+ flb_sds_t sds_metric;
+ flb_sds_t tmp_sds;
+ struct flb_sds *metric_helptxt_head;
+ flb_sds_t metric_helptxt;
+ size_t off = 0;
+ struct flb_hs_buf *buf;
+ msgpack_unpacked result;
+ msgpack_object map;
+ char tmp[32];
+ char time_str[64];
+ char start_time_str[64];
+ char* *metrics_arr;
+ struct flb_time tp;
+ struct flb_hs *hs = data;
+ struct flb_config *config = hs->config;
+
+ buf = metrics_get_latest();
+ if (!buf) {
+ mk_http_status(request, 404);
+ mk_http_done(request);
+ return;
+ }
+
+ /* ref count */
+ buf->users++;
+
+ /* Compose outgoing buffer string */
+ sds = flb_sds_create_size(1024);
+ if (!sds) {
+ mk_http_status(request, 500);
+ mk_http_done(request);
+ buf->users--;
+ return;
+ }
+
+ /* length of HELP text */
+ metric_helptxt = flb_sds_create_size(128);
+ if (!metric_helptxt) {
+ flb_sds_destroy(sds);
+ mk_http_status(request, 500);
+ mk_http_done(request);
+ buf->users--;
+ return;
+ }
+ metric_helptxt_head = FLB_SDS_HEADER(metric_helptxt);
+
+ /*
+ * fluentbit_input_records[name="cpu0", hostname="${HOSTNAME}"] NUM TIMESTAMP
+ * fluentbit_input_bytes[name="cpu0", hostname="${HOSTNAME}"] NUM TIMESTAMP
+ */
+ index = 0;
+ msgpack_unpacked_init(&result);
+ msgpack_unpack_next(&result, buf->raw_data, buf->raw_size, &off);
+ map = result.data;
+
+ /* we need to know number of exposed metrics to reserve a memory */
+ for (i = 0; i < map.via.map.size; i++) {
+ msgpack_object v = map.via.map.ptr[i].val;
+ /* Iterate sub-map */
+ for (j = 0; j < v.via.map.size; j++) {
+ msgpack_object sv = v.via.map.ptr[j].val;
+ for (m = 0; m < sv.via.map.size; m++) {
+ num_metrics++;
+ }
+ }
+ }
+ metrics_arr = flb_malloc(num_metrics * sizeof(char*));
+ if (!metrics_arr) {
+ flb_errno();
+
+ mk_http_status(request, 500);
+ mk_http_done(request);
+ buf->users--;
+
+ flb_sds_destroy(sds);
+ flb_sds_destroy(metric_helptxt);
+ msgpack_unpacked_destroy(&result);
+ return;
+ }
+
+ flb_time_get(&tp);
+ now = flb_time_to_nanosec(&tp) / 1000000; /* in milliseconds */
+ time_len = snprintf(time_str, sizeof(time_str) - 1, "%lu", now);
+
+ for (i = 0; i < map.via.map.size; i++) {
+ msgpack_object k;
+ msgpack_object v;
+
+ /* Keys: input, output */
+ k = map.via.map.ptr[i].key;
+ v = map.via.map.ptr[i].val;
+
+ /* Iterate sub-map */
+ for (j = 0; j < v.via.map.size; j++) {
+ msgpack_object sk;
+ msgpack_object sv;
+
+ /* Keys: plugin name , values: metrics */
+ sk = v.via.map.ptr[j].key;
+ sv = v.via.map.ptr[j].val;
+
+ for (m = 0; m < sv.via.map.size; m++) {
+ msgpack_object mk;
+ msgpack_object mv;
+
+ mk = sv.via.map.ptr[m].key;
+ mv = sv.via.map.ptr[m].val;
+
+ /* Convert metric value to string */
+ len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 " ", mv.via.u64);
+ if (len < 0) {
+ goto error;
+ }
+
+ /* Allocate buffer */
+ sds_metric = flb_sds_create_size(k.via.str.size
+ + mk.via.str.size
+ + sk.via.str.size
+ + len + time_len + 28);
+ if (sds_metric == NULL) {
+ goto error;
+ }
+
+ sds_metric = flb_sds_cat(sds_metric, "fluentbit_", 10);
+ sds_metric = flb_sds_cat(sds_metric, k.via.str.ptr, k.via.str.size);
+ sds_metric = flb_sds_cat(sds_metric, "_", 1);
+ sds_metric = flb_sds_cat(sds_metric, mk.via.str.ptr, mk.via.str.size);
+ sds_metric = flb_sds_cat(sds_metric, "_total{name=\"", 13);
+ sds_metric = flb_sds_cat(sds_metric, sk.via.str.ptr, sk.via.str.size);
+ sds_metric = flb_sds_cat(sds_metric, "\"} ", 3);
+ sds_metric = flb_sds_cat(sds_metric, tmp, len);
+ sds_metric = flb_sds_cat(sds_metric, time_str, time_len);
+ sds_metric = flb_sds_cat(sds_metric, "\n", 1);
+ metrics_arr[index] = sds_metric;
+ index++;
+ }
+ }
+ }
+
+ /* Sort metrics in alphabetic order, so we can group them later. */
+ qsort(metrics_arr, num_metrics, sizeof(char *), string_cmp);
+
+ /* When a new metric starts add HELP and TYPE annotation. */
+ tmp_sds = flb_sds_cat(sds, "# HELP ", 7);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, metrics_arr[0], extract_metric_name_end_position(metrics_arr[0]));
+ null_check(tmp_sds);
+ if (!metrics_help_txt(metrics_arr[0], &metric_helptxt)) {
+ goto error;
+ }
+ tmp_sds = flb_sds_cat(sds, metric_helptxt, metric_helptxt_head->len);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, "# TYPE ", 7);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, metrics_arr[0], extract_metric_name_end_position(metrics_arr[0]));
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, " counter\n", 9);
+ null_check(tmp_sds);
+
+ for (i = 0; i < num_metrics; i++) {
+ tmp_sds = flb_sds_cat(sds, metrics_arr[i], strlen(metrics_arr[i]));
+ null_check(tmp_sds);
+ if ((i != num_metrics - 1) && (is_same_metric(metrics_arr[i], metrics_arr[i+1]) == 0)) {
+ tmp_sds = flb_sds_cat(sds, "# HELP ", 7);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, metrics_arr[i+1], extract_metric_name_end_position(metrics_arr[i+1]));
+ null_check(tmp_sds);
+ metric_helptxt_head->len = 0;
+ if (!metrics_help_txt(metrics_arr[i+1], &metric_helptxt)) {
+ goto error;
+ }
+ tmp_sds = flb_sds_cat(sds, metric_helptxt, metric_helptxt_head->len);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, "# TYPE ", 7);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, metrics_arr[i+1], extract_metric_name_end_position(metrics_arr[i+1]));
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, " counter\n", 9);
+ null_check(tmp_sds);
+ }
+ }
+
+ /* Attach uptime */
+ uptime = time(NULL) - config->init_time;
+ len = snprintf(time_str, sizeof(time_str) - 1, "%lu", uptime);
+
+ tmp_sds = flb_sds_cat(sds,
+ "# HELP fluentbit_uptime Number of seconds that Fluent Bit has "
+ "been running.\n", 76);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, "# TYPE fluentbit_uptime counter\n", 32);
+ null_check(tmp_sds);
+
+ tmp_sds = flb_sds_cat(sds, "fluentbit_uptime ", 17);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, time_str, len);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, "\n", 1);
+ null_check(tmp_sds);
+
+ /* Attach process_start_time_seconds metric. */
+ start_time_len = snprintf(start_time_str, sizeof(start_time_str) - 1,
+ "%lu", config->init_time);
+
+ tmp_sds = flb_sds_cat(sds, "# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.\n", 89);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, "# TYPE process_start_time_seconds gauge\n", 40);
+ null_check(tmp_sds);
+
+ tmp_sds = flb_sds_cat(sds, "process_start_time_seconds ", 27);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, start_time_str, start_time_len);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, "\n", 1);
+ null_check(tmp_sds);
+
+ /* Attach fluentbit_build_info metric. */
+ tmp_sds = flb_sds_cat(sds, "# HELP fluentbit_build_info Build version information.\n", 55);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, "# TYPE fluentbit_build_info gauge\n", 34);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, "fluentbit_build_info{version=\"", 30);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, FLB_VERSION_STR, sizeof(FLB_VERSION_STR) - 1);
+ null_check(tmp_sds);
+ tmp_sds = flb_sds_cat(sds, "\",edition=\"", 11);
+ null_check(tmp_sds);
+#ifdef FLB_ENTERPRISE
+ tmp_sds = flb_sds_cat(sds, "Enterprise\"} 1\n", 15);
+ null_check(tmp_sds);
+#else
+ tmp_sds = flb_sds_cat(sds, "Community\"} 1\n", 14);
+ null_check(tmp_sds);
+#endif
+
+ msgpack_unpacked_destroy(&result);
+ buf->users--;
+
+ mk_http_status(request, 200);
+ flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_PROMETHEUS);
+ mk_http_send(request, sds, flb_sds_len(sds), NULL);
+ for (i = 0; i < num_metrics; i++) {
+ flb_sds_destroy(metrics_arr[i]);
+ }
+ flb_free(metrics_arr);
+ flb_sds_destroy(sds);
+ flb_sds_destroy(metric_helptxt);
+
+ mk_http_done(request);
+ return;
+
+error:
+ mk_http_status(request, 500);
+ mk_http_done(request);
+ buf->users--;
+
+ for (i = 0; i < index; i++) {
+ flb_sds_destroy(metrics_arr[i]);
+ }
+ flb_free(metrics_arr);
+ flb_sds_destroy(sds);
+ flb_sds_destroy(metric_helptxt);
+ msgpack_unpacked_destroy(&result);
+}
+
+/* API: expose built-in metrics /api/v1/metrics */
+static void cb_metrics(mk_request_t *request, void *data)
+{
+ struct flb_hs_buf *buf;
+
+ buf = metrics_get_latest();
+ if (!buf) {
+ mk_http_status(request, 404);
+ mk_http_done(request);
+ return;
+ }
+
+ buf->users++;
+
+ mk_http_status(request, 200);
+ flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON);
+ mk_http_send(request, buf->data, flb_sds_len(buf->data), NULL);
+ mk_http_done(request);
+
+ buf->users--;
+}
+
+/* Perform registration */
+int api_v1_metrics(struct flb_hs *hs)
+{
+
+ pthread_key_create(&hs_metrics_key, hs_metrics_key_destroy);
+
+ /* Create a message queue */
+ hs->qid_metrics = mk_mq_create(hs->ctx, "/metrics",
+ cb_mq_metrics, NULL);
+
+ /* HTTP end-points */
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/metrics/prometheus",
+ cb_metrics_prometheus, hs);
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/metrics", cb_metrics, hs);
+
+ return 0;
+}
diff --git a/fluent-bit/src/http_server/api/v1/metrics.h b/fluent-bit/src/http_server/api/v1/metrics.h
new file mode 100644
index 00000000..f6bb0d01
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/metrics.h
@@ -0,0 +1,30 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_HS_API_V1_METRICS_H
+#define FLB_HS_API_V1_METRICS_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+#include <fluent-bit/flb_sds.h>
+
+int api_v1_metrics(struct flb_hs *hs);
+flb_sds_t metrics_help_txt(char *metric_name, flb_sds_t *metric_helptxt);
+
+#endif
diff --git a/fluent-bit/src/http_server/api/v1/plugins.c b/fluent-bit/src/http_server/api/v1/plugins.c
new file mode 100644
index 00000000..1b63843c
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/plugins.c
@@ -0,0 +1,109 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_http_server.h>
+#include <msgpack.h>
+
+/* API: List all built-in plugins */
+static void cb_plugins(mk_request_t *request, void *data)
+{
+ int len;
+ flb_sds_t out_buf;
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+ struct mk_list *head;
+ struct flb_input_plugin *in;
+ struct flb_filter_plugin *filter;
+ struct flb_output_plugin *out;
+ struct flb_hs *hs = data;
+ struct flb_config *config = hs->config;
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "plugins", 7);
+
+ /*
+ * plugins are: inputs, filters, outputs
+ */
+ msgpack_pack_map(&mp_pck, 3);
+
+ /* Inputs */
+ msgpack_pack_str(&mp_pck, 6);
+ msgpack_pack_str_body(&mp_pck, "inputs", 6);
+ len = mk_list_size(&config->in_plugins);
+ msgpack_pack_array(&mp_pck, len);
+ mk_list_foreach(head, &hs->config->in_plugins) {
+ in = mk_list_entry(head, struct flb_input_plugin, _head);
+ len = strlen(in->name);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, in->name, len);
+ }
+
+ /* Filters */
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "filters", 7);
+ len = mk_list_size(&config->filter_plugins);
+ msgpack_pack_array(&mp_pck, len);
+ mk_list_foreach(head, &config->filter_plugins) {
+ filter = mk_list_entry(head, struct flb_filter_plugin, _head);
+ len = strlen(filter->name);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, filter->name, len);
+ }
+
+ /* Outputs */
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "outputs", 7);
+ len = mk_list_size(&config->out_plugins);
+ msgpack_pack_array(&mp_pck, len);
+ mk_list_foreach(head, &config->out_plugins) {
+ out = mk_list_entry(head, struct flb_output_plugin, _head);
+ len = strlen(out->name);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, out->name, len);
+ }
+
+ /* Export to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ mk_http_status(request, 200);
+ mk_http_send(request,
+ out_buf, flb_sds_len(out_buf), NULL);
+ mk_http_done(request);
+
+ flb_sds_destroy(out_buf);
+}
+
+/* Perform registration */
+int api_v1_plugins(struct flb_hs *hs)
+{
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/plugins", cb_plugins, hs);
+ return 0;
+}
diff --git a/fluent-bit/src/http_server/api/v1/plugins.h b/fluent-bit/src/http_server/api/v1/plugins.h
new file mode 100644
index 00000000..d2094032
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/plugins.h
@@ -0,0 +1,28 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_HS_API_V1_PLUGINS_H
+#define FLB_HS_API_V1_PLUGINS_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+
+int api_v1_plugins(struct flb_hs *hs);
+
+#endif
diff --git a/fluent-bit/src/http_server/api/v1/register.c b/fluent-bit/src/http_server/api/v1/register.c
new file mode 100644
index 00000000..093644b3
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/register.c
@@ -0,0 +1,49 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+
+#include "uptime.h"
+#include "metrics.h"
+#include "storage.h"
+#include "plugins.h"
+#include "health.h"
+#include "trace.h"
+
+int api_v1_registration(struct flb_hs *hs)
+{
+ api_v1_uptime(hs);
+ api_v1_metrics(hs);
+ api_v1_plugins(hs);
+
+#ifdef FLB_HAVE_CHUNK_TRACE
+ api_v1_trace(hs);
+#endif /* FLB_HAVE_CHUNK_TRACE */
+
+ if (hs->config->health_check == FLB_TRUE) {
+ api_v1_health(hs);
+ }
+
+ if (hs->config->storage_metrics == FLB_TRUE) {
+ api_v1_storage_metrics(hs);
+ }
+
+ return 0;
+}
diff --git a/fluent-bit/src/http_server/api/v1/register.h b/fluent-bit/src/http_server/api/v1/register.h
new file mode 100644
index 00000000..978db9e0
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/register.h
@@ -0,0 +1,28 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_API_V1_REG_H
+#define FLB_API_V1_REG_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+
+int api_v1_registration(struct flb_hs *hs);
+
+#endif
diff --git a/fluent-bit/src/http_server/api/v1/storage.c b/fluent-bit/src/http_server/api/v1/storage.c
new file mode 100644
index 00000000..df47859d
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/storage.c
@@ -0,0 +1,204 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_sds.h>
+#include "storage.h"
+
+#include <fluent-bit/flb_http_server.h>
+#include <msgpack.h>
+
+pthread_key_t hs_storage_metrics_key;
+
+/* Return the newest storage metrics buffer */
+static struct flb_hs_buf *storage_metrics_get_latest()
+{
+ struct flb_hs_buf *buf;
+ struct mk_list *metrics_list;
+
+ metrics_list = pthread_getspecific(hs_storage_metrics_key);
+ if (!metrics_list) {
+ return NULL;
+ }
+
+ if (mk_list_size(metrics_list) == 0) {
+ return NULL;
+ }
+
+ buf = mk_list_entry_last(metrics_list, struct flb_hs_buf, _head);
+ return buf;
+}
+
+/* Delete unused metrics, note that we only care about the latest node */
+static int cleanup_metrics()
+{
+ int c = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct mk_list *metrics_list;
+ struct flb_hs_buf *last;
+ struct flb_hs_buf *entry;
+
+ metrics_list = pthread_getspecific(hs_storage_metrics_key);
+ if (!metrics_list) {
+ return -1;
+ }
+
+ last = storage_metrics_get_latest();
+ if (!last) {
+ return -1;
+ }
+
+ mk_list_foreach_safe(head, tmp, metrics_list) {
+ entry = mk_list_entry(head, struct flb_hs_buf, _head);
+ if (entry != last && entry->users == 0) {
+ mk_list_del(&entry->_head);
+ flb_sds_destroy(entry->data);
+ flb_free(entry->raw_data);
+ flb_free(entry);
+ c++;
+ }
+ }
+
+ return c;
+}
+
+/*
+ * Callback invoked every time some storage metrics are received through a
+ * message queue channel. This function runs in a Monkey HTTP thread
+ * worker and it purpose is to take the metrics data and store it
+ * somewhere so then it can be available by the end-points upon
+ * HTTP client requests.
+ */
+static void cb_mq_storage_metrics(mk_mq_t *queue, void *data, size_t size)
+{
+ flb_sds_t out_data;
+ struct flb_hs_buf *buf;
+ struct mk_list *metrics_list = NULL;
+
+ metrics_list = pthread_getspecific(hs_storage_metrics_key);
+ if (!metrics_list) {
+ metrics_list = flb_malloc(sizeof(struct mk_list));
+ if (!metrics_list) {
+ flb_errno();
+ return;
+ }
+ mk_list_init(metrics_list);
+ pthread_setspecific(hs_storage_metrics_key, metrics_list);
+ }
+
+ /* Convert msgpack to JSON */
+ out_data = flb_msgpack_raw_to_json_sds(data, size);
+ if (!out_data) {
+ return;
+ }
+
+ buf = flb_malloc(sizeof(struct flb_hs_buf));
+ if (!buf) {
+ flb_errno();
+ flb_sds_destroy(out_data);
+ return;
+ }
+ buf->users = 0;
+ buf->data = out_data;
+
+ buf->raw_data = flb_malloc(size);
+ memcpy(buf->raw_data, data, size);
+ buf->raw_size = size;
+
+ mk_list_add(&buf->_head, metrics_list);
+
+ cleanup_metrics();
+}
+
+/* FIXME: pending implementation of metrics exit interface
+static void cb_mq_storage_metrics_exit(mk_mq_t *queue, void *data)
+{
+
+}
+*/
+
+/* API: expose built-in storage metrics /api/v1/storage */
+static void cb_storage(mk_request_t *request, void *data)
+{
+ struct flb_hs_buf *buf;
+
+ buf = storage_metrics_get_latest();
+ if (!buf) {
+ mk_http_status(request, 404);
+ mk_http_done(request);
+ return;
+ }
+
+ buf->users++;
+
+ mk_http_status(request, 200);
+ flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON);
+ mk_http_send(request, buf->data, flb_sds_len(buf->data), NULL);
+ mk_http_done(request);
+
+ buf->users--;
+}
+
+static void hs_storage_metrics_key_destroy(void *data)
+{
+ struct mk_list *metrics_list = (struct mk_list*)data;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_hs_buf *entry;
+
+ if (metrics_list == NULL) {
+ return;
+ }
+
+ mk_list_foreach_safe(head, tmp, metrics_list) {
+ entry = mk_list_entry(head, struct flb_hs_buf, _head);
+ if (entry != NULL) {
+ if (entry->raw_data != NULL) {
+ flb_free(entry->raw_data);
+ entry->raw_data = NULL;
+ }
+ if (entry->data) {
+ flb_sds_destroy(entry->data);
+ entry->data = NULL;
+ }
+ mk_list_del(&entry->_head);
+ flb_free(entry);
+ }
+ }
+
+ flb_free(metrics_list);
+}
+
+/* Perform registration */
+int api_v1_storage_metrics(struct flb_hs *hs)
+{
+ pthread_key_create(&hs_storage_metrics_key, hs_storage_metrics_key_destroy);
+
+ /* Create a message queue */
+ hs->qid_storage = mk_mq_create(hs->ctx, "/storage",
+ cb_mq_storage_metrics,
+ NULL);
+
+ /* HTTP end-point */
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/storage", cb_storage, hs);
+
+ return 0;
+}
diff --git a/fluent-bit/src/http_server/api/v1/storage.h b/fluent-bit/src/http_server/api/v1/storage.h
new file mode 100644
index 00000000..27410c79
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/storage.h
@@ -0,0 +1,28 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_HS_API_V1_STORAGE_METRICS_H
+#define FLB_HS_API_V1_STORAGE_METRICS_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+
+int api_v1_storage_metrics(struct flb_hs *hs);
+
+#endif
diff --git a/fluent-bit/src/http_server/api/v1/trace.c b/fluent-bit/src/http_server/api/v1/trace.c
new file mode 100644
index 00000000..95da1734
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/trace.c
@@ -0,0 +1,615 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_http_server.h>
+#include <fluent-bit/flb_lib.h>
+#include <fluent-bit/flb_chunk_trace.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_utils.h>
+#include <msgpack.h>
+
+
+static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name)
+{
+ struct mk_list *head;
+ struct flb_input_instance *in;
+
+
+ mk_list_foreach(head, &hs->config->inputs) {
+ in = mk_list_entry(head, struct flb_input_instance, _head);
+ if (strcmp(name, in->name) == 0) {
+ return in;
+ }
+ if (in->alias) {
+ if (strcmp(name, in->alias) == 0) {
+ return in;
+ }
+ }
+ }
+ return NULL;
+}
+
+static int enable_trace_input(struct flb_hs *hs, const char *name, const char *prefix, const char *output_name, struct mk_list *props)
+{
+ struct flb_input_instance *in;
+
+
+ in = find_input(hs, name);
+ if (in == NULL) {
+ return 404;
+ }
+
+ flb_chunk_trace_context_new(in, output_name, prefix, NULL, props);
+ return (in->chunk_trace_ctxt == NULL ? 503 : 0);
+}
+
+static int disable_trace_input(struct flb_hs *hs, const char *name)
+{
+ struct flb_input_instance *in;
+
+
+ in = find_input(hs, name);
+ if (in == NULL) {
+ return 404;
+ }
+
+ if (in->chunk_trace_ctxt != NULL) {
+ flb_chunk_trace_context_destroy(in);
+ }
+ return 201;
+}
+
+static flb_sds_t get_input_name(mk_request_t *request)
+{
+ const char *base = "/api/v1/trace/";
+
+
+ if (request->real_path.data == NULL) {
+ return NULL;
+ }
+ if (request->real_path.len < strlen(base)) {
+ return NULL;
+ }
+
+ return flb_sds_create_len(&request->real_path.data[strlen(base)],
+ request->real_path.len - strlen(base));
+}
+
+static int http_disable_trace(mk_request_t *request, void *data, const char *input_name, msgpack_packer *mp_pck)
+{
+ struct flb_hs *hs = data;
+ int toggled_on = 503;
+
+
+ toggled_on = disable_trace_input(hs, input_name);
+ if (toggled_on < 300) {
+ msgpack_pack_map(mp_pck, 1);
+ msgpack_pack_str_with_body(mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok"));
+ return 201;
+ }
+
+ return toggled_on;
+}
+
+static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *result, const char *input_name)
+{
+ int ret = -1;
+ int i;
+ int x;
+ flb_sds_t prefix = NULL;
+ flb_sds_t output_name = NULL;
+ int toggled_on = -1;
+ msgpack_object *key;
+ msgpack_object *val;
+ struct mk_list *props = NULL;
+ msgpack_object_kv *param;
+ msgpack_object_str *param_key;
+ msgpack_object_str *param_val;
+
+
+ if (result->data.type == MSGPACK_OBJECT_MAP) {
+ for (i = 0; i < result->data.via.map.size; i++) {
+ key = &result->data.via.map.ptr[i].key;
+ val = &result->data.via.map.ptr[i].val;
+
+ if (key->type != MSGPACK_OBJECT_STR) {
+ ret = -1;
+ goto parse_error;
+ }
+
+ if (strncmp(key->via.str.ptr, "prefix", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_STR) {
+ ret = -1;
+ goto parse_error;
+ }
+ if (prefix != NULL) {
+ flb_sds_destroy(prefix);
+ }
+ prefix = flb_sds_create_len(val->via.str.ptr, val->via.str.size);
+ }
+ else if (strncmp(key->via.str.ptr, "output", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_STR) {
+ ret = -1;
+ goto parse_error;
+ }
+ if (output_name != NULL) {
+ flb_sds_destroy(output_name);
+ }
+ output_name = flb_sds_create_len(val->via.str.ptr, val->via.str.size);
+ }
+ else if (strncmp(key->via.str.ptr, "params", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_MAP) {
+ ret = -1;
+ goto parse_error;
+ }
+ if (props != NULL) {
+ flb_free(props);
+ }
+ props = flb_calloc(1, sizeof(struct mk_list));
+ flb_kv_init(props);
+ for (x = 0; x < val->via.map.size; x++) {
+ param = &val->via.map.ptr[x];
+ if (param->val.type != MSGPACK_OBJECT_STR) {
+ ret = -1;
+ goto parse_error;
+ }
+ if (param->key.type != MSGPACK_OBJECT_STR) {
+ ret = -1;
+ goto parse_error;
+ }
+ param_key = &param->key.via.str;
+ param_val = &param->val.via.str;
+ flb_kv_item_create_len(props,
+ (char *)param_key->ptr, param_key->size,
+ (char *)param_val->ptr, param_val->size);
+ }
+ }
+ }
+
+ if (output_name == NULL) {
+ output_name = flb_sds_create("stdout");
+ }
+
+ toggled_on = enable_trace_input(hs, input_name, prefix, output_name, props);
+ if (!toggled_on) {
+ ret = -1;
+ goto parse_error;
+ }
+ }
+
+parse_error:
+ if (prefix) flb_sds_destroy(prefix);
+ if (output_name) flb_sds_destroy(output_name);
+ if (props != NULL) {
+ flb_kv_release(props);
+ flb_free(props);
+ }
+ return ret;
+}
+
+static int http_enable_trace(mk_request_t *request, void *data, const char *input_name, msgpack_packer *mp_pck)
+{
+ char *buf = NULL;
+ size_t buf_size;
+ msgpack_unpacked result;
+ int ret = -1;
+ int rc = -1;
+ int i;
+ int x;
+ size_t off = 0;
+ int root_type = MSGPACK_OBJECT_ARRAY;
+ struct flb_hs *hs = data;
+ flb_sds_t prefix = NULL;
+ flb_sds_t output_name = NULL;
+ msgpack_object *key;
+ msgpack_object *val;
+ struct mk_list *props = NULL;
+ struct flb_chunk_trace_limit limit = { 0 };
+ struct flb_input_instance *input_instance;
+
+
+ if (request->method == MK_METHOD_GET) {
+ ret = enable_trace_input(hs, input_name, "trace.", "stdout", NULL);
+ if (ret == 0) {
+ msgpack_pack_map(mp_pck, 1);
+ msgpack_pack_str_with_body(mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok"));
+ return 200;
+ }
+ else {
+ flb_error("unable to enable tracing for %s", input_name);
+ goto input_error;
+ }
+ }
+
+ msgpack_unpacked_init(&result);
+ rc = flb_pack_json(request->data.data, request->data.len, &buf, &buf_size,
+ &root_type, NULL);
+ if (rc == -1) {
+ ret = 503;
+ flb_error("unable to parse json parameters");
+ goto unpack_error;
+ }
+
+ rc = msgpack_unpack_next(&result, buf, buf_size, &off);
+ if (rc != MSGPACK_UNPACK_SUCCESS) {
+ ret = 503;
+ flb_error("unable to unpack msgpack parameters for %s", input_name);
+ goto unpack_error;
+ }
+
+ if (result.data.type == MSGPACK_OBJECT_MAP) {
+ for (i = 0; i < result.data.via.map.size; i++) {
+ key = &result.data.via.map.ptr[i].key;
+ val = &result.data.via.map.ptr[i].val;
+
+ if (key->type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("non string key in parameters");
+ goto parse_error;
+ }
+
+ if (strncmp(key->via.str.ptr, "prefix", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("prefix is not a string");
+ goto parse_error;
+ }
+ if (prefix != NULL) {
+ flb_sds_destroy(prefix);
+ }
+ prefix = flb_sds_create_len(val->via.str.ptr, val->via.str.size);
+ }
+ else if (strncmp(key->via.str.ptr, "output", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("output is not a string");
+ goto parse_error;
+ }
+ if (output_name != NULL) {
+ flb_sds_destroy(output_name);
+ }
+ output_name = flb_sds_create_len(val->via.str.ptr, val->via.str.size);
+ }
+ else if (strncmp(key->via.str.ptr, "params", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_MAP) {
+ ret = 503;
+ flb_error("output params is not a maps");
+ goto parse_error;
+ }
+ props = flb_calloc(1, sizeof(struct mk_list));
+ flb_kv_init(props);
+ for (x = 0; x < val->via.map.size; x++) {
+ if (val->via.map.ptr[x].val.type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("output parameter key is not a string");
+ goto parse_error;
+ }
+ if (val->via.map.ptr[x].key.type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("output parameter value is not a string");
+ goto parse_error;
+ }
+ flb_kv_item_create_len(props,
+ (char *)val->via.map.ptr[x].key.via.str.ptr, val->via.map.ptr[x].key.via.str.size,
+ (char *)val->via.map.ptr[x].val.via.str.ptr, val->via.map.ptr[x].val.via.str.size);
+ }
+ }
+ else if (strncmp(key->via.str.ptr, "limit", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_MAP) {
+ ret = 503;
+ flb_error("limit must be a map of limit types");
+ goto parse_error;
+ }
+ if (val->via.map.size != 1) {
+ ret = 503;
+ flb_error("limit must have a single limit type");
+ goto parse_error;
+ }
+ if (val->via.map.ptr[0].key.type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("limit type (key) must be a string");
+ goto parse_error;
+ }
+ if (val->via.map.ptr[0].val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ ret = 503;
+ flb_error("limit type must be an integer");
+ goto parse_error;
+ }
+ if (strncmp(val->via.map.ptr[0].key.via.str.ptr, "seconds", val->via.map.ptr[0].key.via.str.size) == 0) {
+ limit.type = FLB_CHUNK_TRACE_LIMIT_TIME;
+ limit.seconds = val->via.map.ptr[0].val.via.u64;
+ }
+ else if (strncmp(val->via.map.ptr[0].key.via.str.ptr, "count", val->via.map.ptr[0].key.via.str.size) == 0) {
+ limit.type = FLB_CHUNK_TRACE_LIMIT_COUNT;
+ limit.count = val->via.map.ptr[0].val.via.u64;
+ }
+ else {
+ ret = 503;
+ flb_error("unknown limit type");
+ goto parse_error;
+ }
+ }
+ }
+
+ if (output_name == NULL) {
+ output_name = flb_sds_create("stdout");
+ }
+
+ ret = enable_trace_input(hs, input_name, prefix, output_name, props);
+ if (ret != 0) {
+ flb_error("error when enabling tracing");
+ goto parse_error;
+ }
+
+ if (limit.type != 0) {
+ input_instance = find_input(hs, input_name);
+ if (limit.type == FLB_CHUNK_TRACE_LIMIT_TIME) {
+ flb_chunk_trace_context_set_limit(input_instance->chunk_trace_ctxt, limit.type, limit.seconds);
+ }
+ else if (limit.type == FLB_CHUNK_TRACE_LIMIT_COUNT) {
+ flb_chunk_trace_context_set_limit(input_instance->chunk_trace_ctxt, limit.type, limit.count);
+ }
+ }
+ }
+
+ msgpack_pack_map(mp_pck, 1);
+ msgpack_pack_str_with_body(mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok"));
+
+ ret = 200;
+parse_error:
+ if (prefix) flb_sds_destroy(prefix);
+ if (output_name) flb_sds_destroy(output_name);
+ if (props != NULL) {
+ flb_kv_release(props);
+ flb_free(props);
+ }
+unpack_error:
+ msgpack_unpacked_destroy(&result);
+ if (buf != NULL) {
+ flb_free(buf);
+ }
+input_error:
+ return ret;
+}
+
+static void cb_trace(mk_request_t *request, void *data)
+{
+ flb_sds_t out_buf;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ int response = 404;
+ flb_sds_t input_name = NULL;
+
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ input_name = get_input_name(request);
+ if (input_name == NULL) {
+ response = 404;
+ goto error;
+ }
+
+ if (request->method == MK_METHOD_POST || request->method == MK_METHOD_GET) {
+ response = http_enable_trace(request, data, input_name, &mp_pck);
+ }
+ else if (request->method == MK_METHOD_DELETE) {
+ response = http_disable_trace(request, data, input_name, &mp_pck);
+ }
+error:
+ if (response == 404) {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "not found", strlen("not found"));
+ }
+ else if (response == 503) {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "error", strlen("error"));
+ }
+
+ if (input_name != NULL) {
+ flb_sds_destroy(input_name);
+ }
+
+ /* Export to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ if (out_buf == NULL) {
+ mk_http_status(request, 503);
+ mk_http_done(request);
+ return;
+ }
+
+ mk_http_status(request, response);
+ mk_http_send(request, out_buf, flb_sds_len(out_buf), NULL);
+ mk_http_done(request);
+
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ flb_sds_destroy(out_buf);
+}
+
+static void cb_traces(mk_request_t *request, void *data)
+{
+ flb_sds_t out_buf;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ int ret;
+ char *buf = NULL;
+ size_t buf_size;
+ int root_type = MSGPACK_OBJECT_ARRAY;
+ msgpack_unpacked result;
+ flb_sds_t error_msg = NULL;
+ int response = 200;
+ flb_sds_t input_name;
+ msgpack_object_array *inputs = NULL;
+ size_t off = 0;
+ int i;
+
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_unpacked_init(&result);
+ ret = flb_pack_json(request->data.data, request->data.len, &buf, &buf_size,
+ &root_type, NULL);
+ if (ret == -1) {
+ goto unpack_error;
+ }
+
+ ret = msgpack_unpack_next(&result, buf, buf_size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ ret = -1;
+ error_msg = flb_sds_create("unfinished input");
+ goto unpack_error;
+ }
+
+ if (result.data.type != MSGPACK_OBJECT_MAP) {
+ response = 503;
+ error_msg = flb_sds_create("input is not an object");
+ goto unpack_error;
+ }
+
+ for (i = 0; i < result.data.via.map.size; i++) {
+ if (result.data.via.map.ptr[i].val.type != MSGPACK_OBJECT_ARRAY) {
+ continue;
+ }
+ if (result.data.via.map.ptr[i].key.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+ if (result.data.via.map.ptr[i].key.via.str.size < strlen("inputs")) {
+ continue;
+ }
+ if (strncmp(result.data.via.map.ptr[i].key.via.str.ptr, "inputs", strlen("inputs"))) {
+ continue;
+ }
+ inputs = &result.data.via.map.ptr[i].val.via.array;
+ }
+
+ if (inputs == NULL) {
+ response = 503;
+ error_msg = flb_sds_create("inputs not found");
+ goto unpack_error;
+ }
+
+ msgpack_pack_map(&mp_pck, 2);
+
+ msgpack_pack_str_with_body(&mp_pck, "inputs", strlen("inputs"));
+ msgpack_pack_map(&mp_pck, inputs->size);
+
+ for (i = 0; i < inputs->size; i++) {
+ input_name = flb_sds_create_len(inputs->ptr[i].via.str.ptr, inputs->ptr[i].via.str.size);
+ msgpack_pack_str_with_body(&mp_pck, input_name, flb_sds_len(input_name));
+
+ if (inputs->ptr[i].type != MSGPACK_OBJECT_STR) {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "error", strlen("error"));
+ }
+ else {
+ if (request->method == MK_METHOD_POST || request->method == MK_METHOD_GET) {
+ ret = msgpack_params_enable_trace((struct flb_hs *)data, &result, input_name);
+ if (ret != 0) {
+ msgpack_pack_map(&mp_pck, 2);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "error", strlen("error"));
+ msgpack_pack_str_with_body(&mp_pck, "returncode", strlen("returncode"));
+ msgpack_pack_int64(&mp_pck, ret);
+ }
+ else {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "ok", strlen("ok"));
+ }
+ }
+ else if (request->method == MK_METHOD_DELETE) {
+ disable_trace_input((struct flb_hs *)data, input_name);
+ }
+ else {
+ msgpack_pack_map(&mp_pck, 2);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "error", strlen("error"));
+ msgpack_pack_str_with_body(&mp_pck, "message", strlen("message"));
+ msgpack_pack_str_with_body(&mp_pck, "method not allowed", strlen("method not allowed"));
+ }
+ }
+ }
+
+ msgpack_pack_str_with_body(&mp_pck, "result", strlen("result"));
+unpack_error:
+ if (buf != NULL) {
+ flb_free(buf);
+ }
+ msgpack_unpacked_destroy(&result);
+ if (response == 404) {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "not found", strlen("not found"));
+ }
+ else if (response == 503) {
+ msgpack_pack_map(&mp_pck, 2);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "error", strlen("error"));
+ msgpack_pack_str_with_body(&mp_pck, "message", strlen("message"));
+ if (error_msg) {
+ msgpack_pack_str_with_body(&mp_pck, error_msg, flb_sds_len(error_msg));
+ flb_sds_destroy(error_msg);
+ }
+ else {
+ msgpack_pack_str_with_body(&mp_pck, "unknown error", strlen("unknown error"));
+ }
+ }
+ else {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "ok", strlen("ok"));
+ }
+
+ /* Export to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ if (out_buf == NULL) {
+ out_buf = flb_sds_create("serialization error");
+ }
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ mk_http_status(request, response);
+ mk_http_send(request,
+ out_buf, flb_sds_len(out_buf), NULL);
+ mk_http_done(request);
+
+ flb_sds_destroy(out_buf);
+}
+
+/* Perform registration */
+int api_v1_trace(struct flb_hs *hs)
+{
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/traces/", cb_traces, hs);
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/trace/*", cb_trace, hs);
+ return 0;
+}
diff --git a/fluent-bit/src/http_server/api/v1/trace.h b/fluent-bit/src/http_server/api/v1/trace.h
new file mode 100644
index 00000000..236925de
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/trace.h
@@ -0,0 +1,28 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_HS_API_V1_TRACE_H
+#define FLB_HS_API_V1_TRACE_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+
+int api_v1_trace(struct flb_hs *hs);
+
+#endif
diff --git a/fluent-bit/src/http_server/api/v1/uptime.c b/fluent-bit/src/http_server/api/v1/uptime.c
new file mode 100644
index 00000000..37f97187
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/uptime.c
@@ -0,0 +1,111 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_http_server.h>
+#include <fluent-bit/flb_mem.h>
+
+#define FLB_UPTIME_ONEDAY 86400
+#define FLB_UPTIME_ONEHOUR 3600
+#define FLB_UPTIME_ONEMINUTE 60
+
+/* Append human-readable uptime */
+static void uptime_hr(time_t uptime, msgpack_packer *mp_pck)
+{
+ int len;
+ int days;
+ int hours;
+ int minutes;
+ int seconds;
+ long int upmind;
+ long int upminh;
+ char buf[256];
+
+ /* days */
+ days = uptime / FLB_UPTIME_ONEDAY;
+ upmind = uptime - (days * FLB_UPTIME_ONEDAY);
+
+ /* hours */
+ hours = upmind / FLB_UPTIME_ONEHOUR;
+ upminh = upmind - hours * FLB_UPTIME_ONEHOUR;
+
+ /* minutes */
+ minutes = upminh / FLB_UPTIME_ONEMINUTE;
+ seconds = upminh - minutes * FLB_UPTIME_ONEMINUTE;
+
+ len = snprintf(buf, sizeof(buf) - 1,
+ "Fluent Bit has been running: "
+ " %i day%s, %i hour%s, %i minute%s and %i second%s",
+ days, (days > 1) ? "s" : "", hours, \
+ (hours > 1) ? "s" : "", minutes, \
+ (minutes > 1) ? "s" : "", seconds, \
+ (seconds > 1) ? "s" : "");
+ msgpack_pack_str(mp_pck, 9);
+ msgpack_pack_str_body(mp_pck, "uptime_hr", 9);
+ msgpack_pack_str(mp_pck, len);
+ msgpack_pack_str_body(mp_pck, buf, len);
+}
+
+/* API: List all built-in plugins */
+static void cb_uptime(mk_request_t *request, void *data)
+{
+ flb_sds_t out_buf;
+ size_t out_size;
+ time_t uptime;
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+ struct flb_hs *hs = data;
+ struct flb_config *config = hs->config;
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_map(&mp_pck, 2);
+ msgpack_pack_str(&mp_pck, 10);
+ msgpack_pack_str_body(&mp_pck, "uptime_sec", 10);
+
+ uptime = time(NULL) - config->init_time;
+ msgpack_pack_uint64(&mp_pck, uptime);
+
+ uptime_hr(uptime, &mp_pck);
+
+ /* Export to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ if (!out_buf) {
+ return;
+ }
+ out_size = flb_sds_len(out_buf);
+
+ mk_http_status(request, 200);
+ flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON);
+ mk_http_send(request, out_buf, out_size, NULL);
+ mk_http_done(request);
+
+ flb_sds_destroy(out_buf);
+}
+
+/* Perform registration */
+int api_v1_uptime(struct flb_hs *hs)
+{
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/uptime", cb_uptime, hs);
+ return 0;
+}
diff --git a/fluent-bit/src/http_server/api/v1/uptime.h b/fluent-bit/src/http_server/api/v1/uptime.h
new file mode 100644
index 00000000..5a424779
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/uptime.h
@@ -0,0 +1,28 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_HS_API_V1_UPTIME_H
+#define FLB_HS_API_V1_UPTIME_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+
+int api_v1_uptime(struct flb_hs *hs);
+
+#endif
diff --git a/fluent-bit/src/http_server/api/v2/CMakeLists.txt b/fluent-bit/src/http_server/api/v2/CMakeLists.txt
new file mode 100644
index 00000000..a9d590fb
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v2/CMakeLists.txt
@@ -0,0 +1,10 @@
+# api/v2
+set(src
+ metrics.c
+ reload.c
+ register.c
+ )
+
+include_directories(${MONKEY_INCLUDE_DIR})
+add_library(api-v2 STATIC ${src})
+target_link_libraries(api-v2 monkey-core-static fluent-bit-static)
diff --git a/fluent-bit/src/http_server/api/v2/metrics.c b/fluent-bit/src/http_server/api/v2/metrics.c
new file mode 100644
index 00000000..27513b7a
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v2/metrics.c
@@ -0,0 +1,259 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_version.h>
+#include <fluent-bit/flb_time.h>
+#include "metrics.h"
+
+#include <fluent-bit/flb_http_server.h>
+
+#define null_check(x) do { if (!x) { goto error; } else {sds = x;} } while (0)
+
+pthread_key_t hs_metrics_v2_key;
+
+static struct mk_list *hs_metrics_v2_key_create()
+{
+ struct mk_list *metrics_list = NULL;
+
+ metrics_list = flb_malloc(sizeof(struct mk_list));
+ if (metrics_list == NULL) {
+ flb_errno();
+ return NULL;
+ }
+ mk_list_init(metrics_list);
+ pthread_setspecific(hs_metrics_v2_key, metrics_list);
+
+ return metrics_list;
+}
+
+static void hs_metrics_v2_key_destroy(void *data)
+{
+ struct mk_list *metrics_list = (struct mk_list*) data;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_hs_buf *entry;
+
+ if (metrics_list == NULL) {
+ return;
+ }
+ mk_list_foreach_safe(head, tmp, metrics_list) {
+ entry = mk_list_entry(head, struct flb_hs_buf, _head);
+ if (entry != NULL) {
+ if (entry->raw_data != NULL) {
+ cmt_destroy(entry->raw_data);
+ entry->raw_data = NULL;
+ }
+ mk_list_del(&entry->_head);
+ flb_free(entry);
+ }
+ }
+
+ flb_free(metrics_list);
+}
+
+/* Return the newest metrics buffer */
+static struct flb_hs_buf *metrics_get_latest()
+{
+ struct flb_hs_buf *buf;
+ struct mk_list *metrics_list;
+
+ metrics_list = pthread_getspecific(hs_metrics_v2_key);
+ if (!metrics_list) {
+ return NULL;
+ }
+
+ if (mk_list_size(metrics_list) == 0) {
+ return NULL;
+ }
+
+ buf = mk_list_entry_last(metrics_list, struct flb_hs_buf, _head);
+ return buf;
+}
+
+/* Delete unused metrics, note that we only care about the latest node */
+static int cleanup_metrics()
+{
+ int c = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct mk_list *metrics_list;
+ struct flb_hs_buf *last;
+ struct flb_hs_buf *entry;
+
+ metrics_list = pthread_getspecific(hs_metrics_v2_key);
+ if (!metrics_list) {
+ return -1;
+ }
+
+ last = metrics_get_latest();
+ if (!last) {
+ return -1;
+ }
+
+ mk_list_foreach_safe(head, tmp, metrics_list) {
+ entry = mk_list_entry(head, struct flb_hs_buf, _head);
+ if (entry != last && entry->users == 0) {
+ mk_list_del(&entry->_head);
+ cmt_destroy(entry->raw_data);
+ flb_free(entry);
+ c++;
+ }
+ }
+
+ return c;
+}
+
+/*
+ * Callback invoked every time some metrics are received through a message queue channel.
+ * This function runs in a Monkey HTTP thread worker and it purpose is to take the metrics
+ * data and store it somewhere so then it can be available by the end-points upon
+ * HTTP client requests.
+ */
+static void cb_mq_metrics(mk_mq_t *queue, void *data, size_t size)
+{
+ int ret;
+ size_t off = 0;
+ struct cmt *cmt;
+ struct flb_hs_buf *buf;
+ struct mk_list *metrics_list = NULL;
+
+ metrics_list = pthread_getspecific(hs_metrics_v2_key);
+ if (!metrics_list) {
+ metrics_list = hs_metrics_v2_key_create();
+ if (metrics_list == NULL) {
+ return;
+ }
+ }
+
+ /* decode cmetrics */
+ ret = cmt_decode_msgpack_create(&cmt, data, size, &off);
+ if (ret != 0) {
+ return;
+ }
+
+ buf = flb_malloc(sizeof(struct flb_hs_buf));
+ if (!buf) {
+ flb_errno();
+ return;
+ }
+ buf->users = 0;
+ buf->data = NULL;
+
+ /* Store CMetrics context as the raw_data */
+ buf->raw_data = cmt;
+ buf->raw_size = 0;
+
+ mk_list_add(&buf->_head, metrics_list);
+ cleanup_metrics();
+}
+
+/* API: expose metrics in Prometheus format /api/v2/metrics/prometheus */
+static void cb_metrics_prometheus(mk_request_t *request, void *data)
+{
+ struct cmt *cmt;
+ struct flb_hs_buf *buf;
+ cfl_sds_t payload;
+
+ buf = metrics_get_latest();
+ if (!buf) {
+ mk_http_status(request, 404);
+ mk_http_done(request);
+ return;
+ }
+
+ cmt = (struct cmt *) buf->raw_data;
+
+ /* convert CMetrics to text */
+ payload = cmt_encode_prometheus_create(cmt, CMT_FALSE);
+ if (!payload) {
+ mk_http_status(request, 500);
+ mk_http_done(request);
+ return;
+ }
+
+ buf->users++;
+
+ mk_http_status(request, 200);
+ flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_PROMETHEUS);
+ mk_http_send(request, payload, cfl_sds_len(payload), NULL);
+ mk_http_done(request);
+
+ cmt_encode_prometheus_destroy(payload);
+
+ buf->users--;
+}
+
+/* API: expose built-in metrics /api/v1/metrics (JSON format) */
+static void cb_metrics(mk_request_t *request, void *data)
+{
+ struct cmt *cmt;
+ struct flb_hs_buf *buf;
+ cfl_sds_t payload;
+
+ buf = metrics_get_latest();
+ if (!buf) {
+ mk_http_status(request, 404);
+ mk_http_done(request);
+ return;
+ }
+
+ cmt = (struct cmt *) buf->raw_data;
+
+ /* convert CMetrics to text */
+ payload = cmt_encode_text_create(cmt);
+ if (!payload) {
+ mk_http_status(request, 500);
+ mk_http_done(request);
+ return;
+ }
+
+ buf->users++;
+
+ mk_http_status(request, 200);
+ mk_http_send(request, payload, cfl_sds_len(payload), NULL);
+ mk_http_done(request);
+
+ cmt_encode_text_destroy(payload);
+
+ buf->users--;
+}
+
+/* Perform registration */
+int api_v2_metrics(struct flb_hs *hs)
+{
+
+ pthread_key_create(&hs_metrics_v2_key, hs_metrics_v2_key_destroy);
+
+ /* Create a message queue */
+ hs->qid_metrics_v2 = mk_mq_create(hs->ctx, "/metrics_v2",
+ cb_mq_metrics, NULL);
+ /* HTTP end-points */
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v2/metrics/prometheus",
+ cb_metrics_prometheus, hs);
+
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v2/metrics", cb_metrics, hs);
+
+ return 0;
+}
diff --git a/fluent-bit/src/http_server/api/v2/metrics.h b/fluent-bit/src/http_server/api/v2/metrics.h
new file mode 100644
index 00000000..5336865d
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v2/metrics.h
@@ -0,0 +1,28 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_HS_API_V2_METRICS_H
+#define FLB_HS_API_V2_METRICS_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+
+int api_v2_metrics(struct flb_hs *hs);
+
+#endif
diff --git a/fluent-bit/src/http_server/api/v2/register.c b/fluent-bit/src/http_server/api/v2/register.c
new file mode 100644
index 00000000..7a0956fb
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v2/register.c
@@ -0,0 +1,31 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+
+#include "metrics.h"
+#include "reload.h"
+
+int api_v2_registration(struct flb_hs *hs)
+{
+ api_v2_reload(hs);
+ api_v2_metrics(hs);
+ return 0;
+}
diff --git a/fluent-bit/src/http_server/api/v2/register.h b/fluent-bit/src/http_server/api/v2/register.h
new file mode 100644
index 00000000..da6d78f3
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v2/register.h
@@ -0,0 +1,28 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_API_V2_REG_H
+#define FLB_API_V2_REG_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+
+int api_v2_registration(struct flb_hs *hs);
+
+#endif
diff --git a/fluent-bit/src/http_server/api/v2/reload.c b/fluent-bit/src/http_server/api/v2/reload.c
new file mode 100644
index 00000000..3bb5159f
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v2/reload.c
@@ -0,0 +1,161 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_version.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_lib.h>
+#include <fluent-bit/flb_reload.h>
+#include "reload.h"
+
+#include <signal.h>
+
+#include <fluent-bit/flb_http_server.h>
+
+static void handle_reload_request(mk_request_t *request, struct flb_config *config)
+{
+ int ret;
+ flb_sds_t out_buf;
+ size_t out_size;
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_map(&mp_pck, 2);
+ msgpack_pack_str(&mp_pck, 6);
+ msgpack_pack_str_body(&mp_pck, "reload", 6);
+
+#ifdef FLB_SYSTEM_WINDOWS
+ ret = -1;
+
+ msgpack_pack_str(&mp_pck, 11);
+ msgpack_pack_str_body(&mp_pck, "unsupported", 11);
+ msgpack_pack_str(&mp_pck, 6);
+ msgpack_pack_str_body(&mp_pck, "status", 6);
+ msgpack_pack_int64(&mp_pck, ret);
+#else
+ if (config->enable_hot_reload != FLB_TRUE) {
+ msgpack_pack_str(&mp_pck, 11);
+ msgpack_pack_str_body(&mp_pck, "not enabled", 11);
+ msgpack_pack_str(&mp_pck, 6);
+ msgpack_pack_str_body(&mp_pck, "status", 6);
+ msgpack_pack_int64(&mp_pck, -1);
+ }
+ else {
+ ret = kill(getpid(), SIGHUP);
+ if (ret != 0) {
+ mk_http_status(request, 500);
+ mk_http_done(request);
+ return;
+ }
+
+ msgpack_pack_str(&mp_pck, 4);
+ msgpack_pack_str_body(&mp_pck, "done", 4);
+ msgpack_pack_str(&mp_pck, 6);
+ msgpack_pack_str_body(&mp_pck, "status", 6);
+ msgpack_pack_int64(&mp_pck, ret);
+ }
+
+#endif
+
+ /* Export to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ if (!out_buf) {
+ mk_http_status(request, 400);
+ mk_http_done(request);
+ return;
+ }
+ out_size = flb_sds_len(out_buf);
+
+ mk_http_status(request, 200);
+ flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON);
+ mk_http_send(request, out_buf, out_size, NULL);
+ mk_http_done(request);
+
+ flb_sds_destroy(out_buf);
+}
+
+static void handle_get_reload_status(mk_request_t *request, struct flb_config *config)
+{
+ flb_sds_t out_buf;
+ size_t out_size;
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str(&mp_pck, 16);
+ msgpack_pack_str_body(&mp_pck, "hot_reload_count", 16);
+ msgpack_pack_int64(&mp_pck, config->hot_reloaded_count);
+
+ /* Export to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ if (!out_buf) {
+ mk_http_status(request, 400);
+ mk_http_done(request);
+ return;
+ }
+ out_size = flb_sds_len(out_buf);
+
+ mk_http_status(request, 200);
+ flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON);
+ mk_http_send(request, out_buf, out_size, NULL);
+ mk_http_done(request);
+
+ flb_sds_destroy(out_buf);
+}
+
+static void cb_reload(mk_request_t *request, void *data)
+{
+ struct flb_hs *hs = data;
+ struct flb_config *config = hs->config;
+
+ if (request->method == MK_METHOD_POST ||
+ request->method == MK_METHOD_PUT) {
+ handle_reload_request(request, config);
+ }
+ else if (request->method == MK_METHOD_GET) {
+ handle_get_reload_status(request, config);
+ }
+ else {
+ mk_http_status(request, 400);
+ mk_http_done(request);
+ }
+}
+
+/* Perform registration */
+int api_v2_reload(struct flb_hs *hs)
+{
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v2/reload", cb_reload, hs);
+
+ return 0;
+}
diff --git a/fluent-bit/src/http_server/api/v2/reload.h b/fluent-bit/src/http_server/api/v2/reload.h
new file mode 100644
index 00000000..e64e867d
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v2/reload.h
@@ -0,0 +1,28 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_HS_API_V2_RELOAD_H
+#define FLB_HS_API_V2_RELOAD_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_http_server.h>
+
+int api_v2_reload(struct flb_hs *hs);
+
+#endif
diff --git a/fluent-bit/src/http_server/flb_hs.c b/fluent-bit/src/http_server/flb_hs.c
new file mode 100644
index 00000000..40cc8467
--- /dev/null
+++ b/fluent-bit/src/http_server/flb_hs.c
@@ -0,0 +1,147 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2019-2020 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_http_server.h>
+
+#include <monkey/mk_lib.h>
+
+/* v1 */
+#include "api/v1/register.h"
+#include "api/v1/health.h"
+
+/* v2 */
+#include "api/v2/register.h"
+
+static void cb_root(mk_request_t *request, void *data)
+{
+ struct flb_hs *hs = data;
+
+ mk_http_status(request, 200);
+ flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON);
+ mk_http_send(request, hs->ep_root_buf, hs->ep_root_size, NULL);
+ mk_http_done(request);
+}
+
+/* Ingest health metrics into the web service context */
+int flb_hs_push_health_metrics(struct flb_hs *hs, void *data, size_t size)
+{
+ return mk_mq_send(hs->ctx, hs->qid_health, data, size);
+}
+
+/* Ingest pipeline metrics into the web service context */
+int flb_hs_push_pipeline_metrics(struct flb_hs *hs, void *data, size_t size)
+{
+ return mk_mq_send(hs->ctx, hs->qid_metrics, data, size);
+}
+
+/* Ingest pipeline metrics into the web service context */
+int flb_hs_push_metrics(struct flb_hs *hs, void *data, size_t size)
+{
+ return mk_mq_send(hs->ctx, hs->qid_metrics_v2, data, size);
+}
+
+/* Ingest storage metrics into the web service context */
+int flb_hs_push_storage_metrics(struct flb_hs *hs, void *data, size_t size)
+{
+ return mk_mq_send(hs->ctx, hs->qid_storage, data, size);
+}
+
+/* Create ROOT endpoints */
+struct flb_hs *flb_hs_create(const char *listen, const char *tcp_port,
+ struct flb_config *config)
+{
+ int vid;
+ char tmp[32];
+ struct flb_hs *hs;
+
+ hs = flb_calloc(1, sizeof(struct flb_hs));
+ if (!hs) {
+ flb_errno();
+ return NULL;
+ }
+ hs->config = config;
+
+ /* Setup endpoint specific data */
+ flb_hs_endpoints(hs);
+
+ /* Create HTTP server context */
+ hs->ctx = mk_create();
+ if (!hs->ctx) {
+ flb_error("[http_server] could not create context");
+ flb_free(hs);
+ return NULL;
+ }
+
+ /* Compose listen address */
+ snprintf(tmp, sizeof(tmp) -1, "%s:%s", listen, tcp_port);
+ mk_config_set(hs->ctx, "Listen", tmp, NULL);
+ vid = mk_vhost_create(hs->ctx, NULL);
+ hs->vid = vid;
+
+ /* Setup virtual host */
+ mk_vhost_set(hs->ctx, vid,
+ "Name", "fluent-bit",
+ NULL);
+
+
+ /* Register endpoints for /api/v1 */
+ api_v1_registration(hs);
+
+ /* Register endpoints for /api/v2 */
+ api_v2_registration(hs);
+
+ /* Root */
+ mk_vhost_handler(hs->ctx, vid, "/", cb_root, hs);
+
+ return hs;
+}
+
+int flb_hs_start(struct flb_hs *hs)
+{
+ int ret;
+ struct flb_config *config = hs->config;
+
+ ret = mk_start(hs->ctx);
+
+ if (ret == 0) {
+ flb_info("[http_server] listen iface=%s tcp_port=%s",
+ config->http_listen, config->http_port);
+ }
+ return ret;
+}
+
+int flb_hs_destroy(struct flb_hs *hs)
+{
+ if (!hs) {
+ return 0;
+ }
+ flb_hs_health_destroy();
+ mk_stop(hs->ctx);
+ mk_destroy(hs->ctx);
+
+ flb_hs_endpoints_free(hs);
+ flb_free(hs);
+
+
+ return 0;
+}
diff --git a/fluent-bit/src/http_server/flb_hs_endpoints.c b/fluent-bit/src/http_server/flb_hs_endpoints.c
new file mode 100644
index 00000000..2ea12a98
--- /dev/null
+++ b/fluent-bit/src/http_server/flb_hs_endpoints.c
@@ -0,0 +1,121 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2019-2020 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_version.h>
+#include <fluent-bit/flb_http_server.h>
+#include <msgpack.h>
+
+/* Create a JSON buffer with informational data about the running service */
+static int endpoint_root(struct flb_hs *hs)
+{
+ int c;
+ flb_sds_t out_buf;
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+ struct mk_list *head;
+ struct mk_list *list;
+ struct flb_split_entry *entry;
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str(&mp_pck, 10);
+ msgpack_pack_str_body(&mp_pck, "fluent-bit", 10);
+
+ /* entries under fluent-bit parent:
+ *
+ * - version
+ * - edition
+ * - built flags
+ */
+ msgpack_pack_map(&mp_pck, 3);
+
+ /* fluent-bit['version'] */
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "version", 7);
+ msgpack_pack_str(&mp_pck, sizeof(FLB_VERSION_STR) - 1);
+ msgpack_pack_str_body(&mp_pck, FLB_VERSION_STR, sizeof(FLB_VERSION_STR) - 1);
+
+ /* fluent-bit['edition'] */
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "edition", 7);
+#ifdef FLB_ENTERPRISE
+ msgpack_pack_str(&mp_pck, 10);
+ msgpack_pack_str_body(&mp_pck, "Enterprise", 10);
+#else
+ msgpack_pack_str(&mp_pck, 9);
+ msgpack_pack_str_body(&mp_pck, "Community", 9);
+#endif
+
+ /* fluent-bit['flags'] */
+ msgpack_pack_str(&mp_pck, 5);
+ msgpack_pack_str_body(&mp_pck, "flags", 5);
+
+ c = 0;
+ list = flb_utils_split(FLB_INFO_FLAGS, ' ', -1);
+ mk_list_foreach(head, list) {
+ entry = mk_list_entry(head, struct flb_split_entry, _head);
+ if (strncmp(entry->value, "FLB_", 4) == 0) {
+ c++;
+ }
+ }
+
+ msgpack_pack_array(&mp_pck, c);
+ mk_list_foreach(head, list) {
+ entry = mk_list_entry(head, struct flb_split_entry, _head);
+ if (strncmp(entry->value, "FLB_", 4) == 0) {
+ msgpack_pack_str(&mp_pck, entry->len);
+ msgpack_pack_str_body(&mp_pck, entry->value, entry->len);
+ }
+ }
+ flb_utils_split_free(list);
+
+ /* export as JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ if (out_buf) {
+ hs->ep_root_buf = out_buf;
+ hs->ep_root_size = flb_sds_len(out_buf);
+ }
+
+ return -1;
+}
+
+int flb_hs_endpoints(struct flb_hs *hs)
+{
+ endpoint_root(hs);
+ return 0;
+}
+
+/* Release cached data from endpoints */
+int flb_hs_endpoints_free(struct flb_hs *hs)
+{
+ if (hs->ep_root_buf) {
+ flb_sds_destroy(hs->ep_root_buf);
+ }
+
+ return 0;
+}
diff --git a/fluent-bit/src/http_server/flb_hs_utils.c b/fluent-bit/src/http_server/flb_hs_utils.c
new file mode 100644
index 00000000..7b39bff2
--- /dev/null
+++ b/fluent-bit/src/http_server/flb_hs_utils.c
@@ -0,0 +1,48 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_http_server.h>
+#include <monkey/mk_lib.h>
+
+int flb_hs_add_content_type_to_req(mk_request_t *request, int type)
+{
+ if (request == NULL) {
+ return -1;
+ }
+
+ switch (type) {
+ case FLB_HS_CONTENT_TYPE_JSON:
+ mk_http_header(request,
+ FLB_HS_CONTENT_TYPE_KEY_STR, FLB_HS_CONTENT_TYPE_KEY_LEN,
+ FLB_HS_CONTENT_TYPE_JSON_STR, FLB_HS_CONTENT_TYPE_JSON_LEN);
+ break;
+ case FLB_HS_CONTENT_TYPE_PROMETHEUS:
+ mk_http_header(request,
+ FLB_HS_CONTENT_TYPE_KEY_STR, FLB_HS_CONTENT_TYPE_KEY_LEN,
+ FLB_HS_CONTENT_TYPE_PROMETHEUS_STR, FLB_HS_CONTENT_TYPE_PROMETHEUS_LEN);
+ break;
+ default:
+ flb_error("[%s] unknown type=%d", __FUNCTION__, type);
+ return -1;
+ }
+
+ return 0;
+}