diff options
Diffstat (limited to '')
26 files changed, 3146 insertions, 0 deletions
diff --git a/fluent-bit/src/http_server/CMakeLists.txt b/fluent-bit/src/http_server/CMakeLists.txt new file mode 100644 index 00000000..acded936 --- /dev/null +++ b/fluent-bit/src/http_server/CMakeLists.txt @@ -0,0 +1,20 @@ +if(NOT FLB_METRICS) + message(FATAL_ERROR "FLB_HTTP_SERVER requires FLB_METRICS=On.") +endif() + +# Core Source +set(src + flb_hs.c + flb_hs_endpoints.c + flb_hs_utils.c + ) + +# api/v1 +add_subdirectory(api/v1) + +# api/v2 +add_subdirectory(api/v2) + +include_directories(${MONKEY_INCLUDE_DIR}) +add_library(flb-http-server STATIC ${src}) +target_link_libraries(flb-http-server monkey-core-static api-v1 api-v2) diff --git a/fluent-bit/src/http_server/api/v1/CMakeLists.txt b/fluent-bit/src/http_server/api/v1/CMakeLists.txt new file mode 100644 index 00000000..af86e43f --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/CMakeLists.txt @@ -0,0 +1,20 @@ +# api/v1 +set(src + uptime.c + metrics.c + storage.c + plugins.c + register.c + health.c + ) + +if(FLB_CHUNK_TRACE) + set(src + ${src} + trace.c + ) +endif() + +include_directories(${MONKEY_INCLUDE_DIR}) +add_library(api-v1 STATIC ${src}) +target_link_libraries(api-v1 monkey-core-static fluent-bit-static) diff --git a/fluent-bit/src/http_server/api/v1/health.c b/fluent-bit/src/http_server/api/v1/health.c new file mode 100644 index 00000000..713d4b87 --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/health.c @@ -0,0 +1,335 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include<stdio.h> +#include <stdlib.h> + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_macros.h> +#include <fluent-bit/flb_http_server.h> +#include <msgpack.h> + +#include "health.h" + +struct flb_health_check_metrics_counter *metrics_counter; + +pthread_key_t hs_health_key; + +static struct mk_list *hs_health_key_create() +{ + struct mk_list *metrics_list = NULL; + + metrics_list = flb_malloc(sizeof(struct mk_list)); + if (!metrics_list) { + flb_errno(); + return NULL; + } + mk_list_init(metrics_list); + pthread_setspecific(hs_health_key, metrics_list); + + return metrics_list; +} + +static void hs_health_key_destroy(void *data) +{ + struct mk_list *metrics_list = (struct mk_list*)data; + struct mk_list *tmp; + struct mk_list *head; + struct flb_hs_hc_buf *entry; + + if (metrics_list == NULL) { + return; + } + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct flb_hs_hc_buf, _head); + if (entry != NULL) { + mk_list_del(&entry->_head); + flb_free(entry); + } + } + + flb_free(metrics_list); +} + +/* initialize the metrics counters */ +static void counter_init(struct flb_hs *hs) { + + metrics_counter = flb_malloc(sizeof(struct flb_health_check_metrics_counter)); + + if (!metrics_counter) { + flb_errno(); + return; + } + + metrics_counter->error_counter = 0; + metrics_counter->retry_failure_counter = 0; + metrics_counter->error_limit = hs->config->hc_errors_count; + metrics_counter->retry_failure_limit = hs->config->hc_retry_failure_count; + metrics_counter->period_counter = 0; + metrics_counter->period_limit = hs->config->health_check_period; + +} + +/* +* tell what's the current status for health check +* One default background is that the metrics received and saved into +* message queue every time is a accumulation of error numbers, +* not a error number in recent second. So to get the error number +* in a period, we need to use: +* the error number of the newest metrics message minus +* the error number in oldest metrics of period +*/ +static int is_healthy() { + + struct mk_list *metrics_list; + struct flb_hs_hc_buf *buf; + int period_errors; + int period_retry_failure; + + metrics_list = pthread_getspecific(hs_health_key); + if (metrics_list == NULL) { + metrics_list = hs_health_key_create(); + if (metrics_list == NULL) { + return FLB_FALSE; + } + } + + if (mk_list_is_empty(metrics_list) == 0) { + return FLB_TRUE; + } + + /* Get the error metrics entry from the start time of current period */ + buf = mk_list_entry_first(metrics_list, struct flb_hs_hc_buf, _head); + + /* + * increase user so clean up function won't + * free the memory and delete the data + */ + buf->users++; + + /* the error count saved in message queue is the number of + * error count at that time. So the math is that: + * the error count in current period = (current error count in total) - + * (begin error count in the period) + */ + period_errors = metrics_counter->error_counter - buf->error_count; + period_retry_failure = metrics_counter->retry_failure_counter - + buf->retry_failure_count; + buf->users--; + + if (period_errors > metrics_counter->error_limit || + period_retry_failure > metrics_counter->retry_failure_limit) { + + return FLB_FALSE; + } + + return FLB_TRUE; +} + +/* read the metrics from message queue and update the counter*/ +static void read_metrics(void *data, size_t size, int* error_count, + int* retry_failure_count) +{ + int i; + int j; + int m; + msgpack_unpacked result; + msgpack_object map; + size_t off = 0; + int errors = 0; + int retry_failure = 0; + + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, data, size, &off); + map = result.data; + + for (i = 0; i < map.via.map.size; i++) { + msgpack_object k; + msgpack_object v; + + /* Keys: input, output */ + k = map.via.map.ptr[i].key; + v = map.via.map.ptr[i].val; + if (k.via.str.size != sizeof("output") - 1 || + strncmp(k.via.str.ptr, "output", k.via.str.size) != 0) { + + continue; + } + /* Iterate sub-map */ + for (j = 0; j < v.via.map.size; j++) { + msgpack_object sv; + + /* Keys: plugin name , values: metrics */ + sv = v.via.map.ptr[j].val; + + for (m = 0; m < sv.via.map.size; m++) { + msgpack_object mk; + msgpack_object mv; + + mk = sv.via.map.ptr[m].key; + mv = sv.via.map.ptr[m].val; + + if (mk.via.str.size == sizeof("errors") - 1 && + strncmp(mk.via.str.ptr, "errors", mk.via.str.size) == 0) { + errors += mv.via.u64; + } + else if (mk.via.str.size == sizeof("retries_failed") - 1 && + strncmp(mk.via.str.ptr, "retries_failed", + mk.via.str.size) == 0) { + retry_failure += mv.via.u64; + } + } + } + } + + *error_count = errors; + *retry_failure_count = retry_failure; + msgpack_unpacked_destroy(&result); +} + +/* +* Delete unused metrics, note that we only care about the latest node +* we use this function to maintain the metrics queue only save the metrics +* in a period. The old metrics which is out of period will be removed +*/ +static int cleanup_metrics() +{ + int c = 0; + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *metrics_list; + struct flb_hs_hc_buf *entry; + + metrics_list = pthread_getspecific(hs_health_key); + if (!metrics_list) { + return -1; + } + + if (metrics_counter->period_counter < metrics_counter->period_limit) { + return 0; + } + + /* remove the oldest metrics if it's out of period */ + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct flb_hs_hc_buf, _head); + if (metrics_counter->period_counter > metrics_counter->period_limit && + entry->users == 0) { + metrics_counter->period_counter--; + mk_list_del(&entry->_head); + flb_free(entry); + c++; + } + else { + break; + } + } + + return c; +} + +/* + * Callback invoked every time some metrics are received through a + * message queue channel. This function runs in a Monkey HTTP thread + * worker and it purpose is to take the metrics data and record the health + * status based on the metrics. + * This happens every second based on the event config. + * So we treat period_counter to count the time. + * And we maintain a message queue with the size of period limit number + * so every time we get a new metrics data in, if the message queue size is + * large than period limit, we will do the clean up func to + * remove the oldest metrics. + */ +static void cb_mq_health(mk_mq_t *queue, void *data, size_t size) +{ + struct flb_hs_hc_buf *buf; + struct mk_list *metrics_list = NULL; + int error_count = 0; + int retry_failure_count = 0; + + metrics_list = pthread_getspecific(hs_health_key); + + if (metrics_list == NULL) { + metrics_list = hs_health_key_create(); + if (metrics_list == NULL) { + return; + } + } + + metrics_counter->period_counter++; + + /* this is to remove the metrics out of period*/ + cleanup_metrics(); + + buf = flb_malloc(sizeof(struct flb_hs_hc_buf)); + if (!buf) { + flb_errno(); + return; + } + + buf->users = 0; + + read_metrics(data, size, &error_count, &retry_failure_count); + + metrics_counter->error_counter = error_count; + metrics_counter->retry_failure_counter = retry_failure_count; + + buf->error_count = error_count; + buf->retry_failure_count = retry_failure_count; + + mk_list_add(&buf->_head, metrics_list); +} + +/* API: Get fluent Bit Health Status */ +static void cb_health(mk_request_t *request, void *data) +{ + int status = is_healthy(); + + if (status == FLB_TRUE) { + mk_http_status(request, 200); + mk_http_send(request, "ok\n", strlen("ok\n"), NULL); + mk_http_done(request); + } + else { + mk_http_status(request, 500); + mk_http_send(request, "error\n", strlen("error\n"), NULL); + mk_http_done(request); + } +} + +/* Perform registration */ +int api_v1_health(struct flb_hs *hs) +{ + + pthread_key_create(&hs_health_key, hs_health_key_destroy); + + counter_init(hs); + /* Create a message queue */ + hs->qid_health = mk_mq_create(hs->ctx, "/health", + cb_mq_health, NULL); + + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/health", cb_health, hs); + return 0; +} + +void flb_hs_health_destroy() +{ + flb_free(metrics_counter); +} diff --git a/fluent-bit/src/http_server/api/v1/health.h b/fluent-bit/src/http_server/api/v1/health.h new file mode 100644 index 00000000..27a826f4 --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/health.h @@ -0,0 +1,73 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +#ifndef FLB_HS_API_V1_HEALTH_H +#define FLB_HS_API_V1_HEALTH_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> + +struct flb_health_check_metrics_counter { + + /* + * health check error limit, + * setup by customer through config: HC_Errors_Count + */ + int error_limit; + + /* counter the error number in metrics*/ + int error_counter; + + /* + * health check retry failed limit, + * setup by customer through config: HC_Retry_Failure_Count + */ + int retry_failure_limit; + + /* count the retry failed number in metrics*/ + int retry_failure_counter; + + /*period limit, setup by customer through config: HC_Period*/ + int period_limit; + + /* count the seconds in one period*/ + int period_counter; + +}; + + +/* + * error and retry failure buffers that contains certain cached data to be used + * by health check. + */ +struct flb_hs_hc_buf { + int users; + int error_count; + int retry_failure_count; + struct mk_list _head; +}; + +/* health endpoint*/ +int api_v1_health(struct flb_hs *hs); + +/* clean up health resource when shutdown*/ +void flb_hs_health_destroy(); +#endif diff --git a/fluent-bit/src/http_server/api/v1/metrics.c b/fluent-bit/src/http_server/api/v1/metrics.c new file mode 100644 index 00000000..4a541eaa --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/metrics.c @@ -0,0 +1,579 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_version.h> +#include <fluent-bit/flb_time.h> +#include "metrics.h" + +#include <fluent-bit/flb_http_server.h> +#include <msgpack.h> + +#define null_check(x) do { if (!x) { goto error; } else {sds = x;} } while (0) + +pthread_key_t hs_metrics_key; + +static struct mk_list *hs_metrics_key_create() +{ + struct mk_list *metrics_list = NULL; + + metrics_list = flb_malloc(sizeof(struct mk_list)); + if (metrics_list == NULL) { + flb_errno(); + return NULL; + } + mk_list_init(metrics_list); + pthread_setspecific(hs_metrics_key, metrics_list); + + return metrics_list; +} + +static void hs_metrics_key_destroy(void *data) +{ + struct mk_list *metrics_list = (struct mk_list*)data; + struct mk_list *tmp; + struct mk_list *head; + struct flb_hs_buf *entry; + + if (metrics_list == NULL) { + return; + } + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct flb_hs_buf, _head); + if (entry != NULL) { + if (entry->raw_data != NULL) { + flb_free(entry->raw_data); + entry->raw_data = NULL; + } + if (entry->data) { + flb_sds_destroy(entry->data); + entry->data = NULL; + } + mk_list_del(&entry->_head); + flb_free(entry); + } + } + + flb_free(metrics_list); +} + +/* Return the newest metrics buffer */ +static struct flb_hs_buf *metrics_get_latest() +{ + struct flb_hs_buf *buf; + struct mk_list *metrics_list; + + metrics_list = pthread_getspecific(hs_metrics_key); + if (!metrics_list) { + return NULL; + } + + if (mk_list_size(metrics_list) == 0) { + return NULL; + } + + buf = mk_list_entry_last(metrics_list, struct flb_hs_buf, _head); + return buf; +} + +/* Delete unused metrics, note that we only care about the latest node */ +static int cleanup_metrics() +{ + int c = 0; + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *metrics_list; + struct flb_hs_buf *last; + struct flb_hs_buf *entry; + + metrics_list = pthread_getspecific(hs_metrics_key); + if (!metrics_list) { + return -1; + } + + last = metrics_get_latest(); + if (!last) { + return -1; + } + + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct flb_hs_buf, _head); + if (entry != last && entry->users == 0) { + mk_list_del(&entry->_head); + flb_sds_destroy(entry->data); + flb_free(entry->raw_data); + flb_free(entry); + c++; + } + } + + return c; +} + +/* + * Callback invoked every time some metrics are received through a + * message queue channel. This function runs in a Monkey HTTP thread + * worker and it purpose is to take the metrics data and store it + * somewhere so then it can be available by the end-points upon + * HTTP client requests. + */ +static void cb_mq_metrics(mk_mq_t *queue, void *data, size_t size) +{ + flb_sds_t out_data; + struct flb_hs_buf *buf; + struct mk_list *metrics_list = NULL; + + metrics_list = pthread_getspecific(hs_metrics_key); + if (!metrics_list) { + metrics_list = hs_metrics_key_create(); + if (metrics_list == NULL) { + return; + } + } + + /* Convert msgpack to JSON */ + out_data = flb_msgpack_raw_to_json_sds(data, size); + if (!out_data) { + return; + } + + buf = flb_malloc(sizeof(struct flb_hs_buf)); + if (!buf) { + flb_errno(); + flb_sds_destroy(out_data); + return; + } + buf->users = 0; + buf->data = out_data; + + buf->raw_data = flb_malloc(size); + if (!buf->raw_data) { + flb_errno(); + flb_sds_destroy(out_data); + flb_free(buf); + return; + } + memcpy(buf->raw_data, data, size); + buf->raw_size = size; + + mk_list_add(&buf->_head, metrics_list); + + cleanup_metrics(); +} + +int string_cmp(const void* a_arg, const void* b_arg) { + char *a = *(char **)a_arg; + char *b = *(char **)b_arg; + + return strcmp(a, b); +} + +size_t extract_metric_name_end_position(char *s) { + int i; + + for (i = 0; i < flb_sds_len(s); i++) { + if (s[i] == '{') { + return i; + } + } + return 0; +} + +int is_same_metric(char *s1, char *s2) { + int i; + int p1 = extract_metric_name_end_position(s1); + int p2 = extract_metric_name_end_position(s2); + + if (p1 != p2) { + return 0; + } + + for (i = 0; i < p1; i++) { + if (s1[i] != s2[i]) { + return 0; + } + } + return 1; +} + +/* derive HELP text from metricname */ +/* if help text length > 128, increase init memory for metric_helptxt */ +flb_sds_t metrics_help_txt(char *metric_name, flb_sds_t *metric_helptxt) +{ + if (strstr(metric_name, "input_bytes")) { + return flb_sds_cat(*metric_helptxt, " Number of input bytes.\n", 24); + } + else if (strstr(metric_name, "input_records")) { + return flb_sds_cat(*metric_helptxt, " Number of input records.\n", 26); + } + else if (strstr(metric_name, "output_bytes")) { + return flb_sds_cat(*metric_helptxt, " Number of output bytes.\n", 25); + } + else if (strstr(metric_name, "output_records")) { + return flb_sds_cat(*metric_helptxt, " Number of output records.\n", 27); + } + else if (strstr(metric_name, "output_errors")) { + return flb_sds_cat(*metric_helptxt, " Number of output errors.\n", 26); + } + else if (strstr(metric_name, "output_retries_failed")) { + return flb_sds_cat(*metric_helptxt, " Number of abandoned batches because the maximum number of re-tries was reached.\n", 81); + } + else if (strstr(metric_name, "output_retries")) { + return flb_sds_cat(*metric_helptxt, " Number of output retries.\n", 27); + } + else if (strstr(metric_name, "output_proc_records")) { + return flb_sds_cat(*metric_helptxt, " Number of processed output records.\n", 37); + } + else if (strstr(metric_name, "output_proc_bytes")) { + return flb_sds_cat(*metric_helptxt, " Number of processed output bytes.\n", 35); + } + else if (strstr(metric_name, "output_dropped_records")) { + return flb_sds_cat(*metric_helptxt, " Number of dropped records.\n", 28); + } + else if (strstr(metric_name, "output_retried_records")) { + return flb_sds_cat(*metric_helptxt, " Number of retried records.\n", 28); + } + else { + return (flb_sds_cat(*metric_helptxt, " Fluentbit metrics.\n", 20)); + } +} + +/* API: expose metrics in Prometheus format /api/v1/metrics/prometheus */ +void cb_metrics_prometheus(mk_request_t *request, void *data) +{ + int i; + int j; + int m; + int len; + int time_len; + int start_time_len; + uint64_t uptime; + size_t index; + size_t num_metrics = 0; + long now; + flb_sds_t sds; + flb_sds_t sds_metric; + flb_sds_t tmp_sds; + struct flb_sds *metric_helptxt_head; + flb_sds_t metric_helptxt; + size_t off = 0; + struct flb_hs_buf *buf; + msgpack_unpacked result; + msgpack_object map; + char tmp[32]; + char time_str[64]; + char start_time_str[64]; + char* *metrics_arr; + struct flb_time tp; + struct flb_hs *hs = data; + struct flb_config *config = hs->config; + + buf = metrics_get_latest(); + if (!buf) { + mk_http_status(request, 404); + mk_http_done(request); + return; + } + + /* ref count */ + buf->users++; + + /* Compose outgoing buffer string */ + sds = flb_sds_create_size(1024); + if (!sds) { + mk_http_status(request, 500); + mk_http_done(request); + buf->users--; + return; + } + + /* length of HELP text */ + metric_helptxt = flb_sds_create_size(128); + if (!metric_helptxt) { + flb_sds_destroy(sds); + mk_http_status(request, 500); + mk_http_done(request); + buf->users--; + return; + } + metric_helptxt_head = FLB_SDS_HEADER(metric_helptxt); + + /* + * fluentbit_input_records[name="cpu0", hostname="${HOSTNAME}"] NUM TIMESTAMP + * fluentbit_input_bytes[name="cpu0", hostname="${HOSTNAME}"] NUM TIMESTAMP + */ + index = 0; + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, buf->raw_data, buf->raw_size, &off); + map = result.data; + + /* we need to know number of exposed metrics to reserve a memory */ + for (i = 0; i < map.via.map.size; i++) { + msgpack_object v = map.via.map.ptr[i].val; + /* Iterate sub-map */ + for (j = 0; j < v.via.map.size; j++) { + msgpack_object sv = v.via.map.ptr[j].val; + for (m = 0; m < sv.via.map.size; m++) { + num_metrics++; + } + } + } + metrics_arr = flb_malloc(num_metrics * sizeof(char*)); + if (!metrics_arr) { + flb_errno(); + + mk_http_status(request, 500); + mk_http_done(request); + buf->users--; + + flb_sds_destroy(sds); + flb_sds_destroy(metric_helptxt); + msgpack_unpacked_destroy(&result); + return; + } + + flb_time_get(&tp); + now = flb_time_to_nanosec(&tp) / 1000000; /* in milliseconds */ + time_len = snprintf(time_str, sizeof(time_str) - 1, "%lu", now); + + for (i = 0; i < map.via.map.size; i++) { + msgpack_object k; + msgpack_object v; + + /* Keys: input, output */ + k = map.via.map.ptr[i].key; + v = map.via.map.ptr[i].val; + + /* Iterate sub-map */ + for (j = 0; j < v.via.map.size; j++) { + msgpack_object sk; + msgpack_object sv; + + /* Keys: plugin name , values: metrics */ + sk = v.via.map.ptr[j].key; + sv = v.via.map.ptr[j].val; + + for (m = 0; m < sv.via.map.size; m++) { + msgpack_object mk; + msgpack_object mv; + + mk = sv.via.map.ptr[m].key; + mv = sv.via.map.ptr[m].val; + + /* Convert metric value to string */ + len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 " ", mv.via.u64); + if (len < 0) { + goto error; + } + + /* Allocate buffer */ + sds_metric = flb_sds_create_size(k.via.str.size + + mk.via.str.size + + sk.via.str.size + + len + time_len + 28); + if (sds_metric == NULL) { + goto error; + } + + sds_metric = flb_sds_cat(sds_metric, "fluentbit_", 10); + sds_metric = flb_sds_cat(sds_metric, k.via.str.ptr, k.via.str.size); + sds_metric = flb_sds_cat(sds_metric, "_", 1); + sds_metric = flb_sds_cat(sds_metric, mk.via.str.ptr, mk.via.str.size); + sds_metric = flb_sds_cat(sds_metric, "_total{name=\"", 13); + sds_metric = flb_sds_cat(sds_metric, sk.via.str.ptr, sk.via.str.size); + sds_metric = flb_sds_cat(sds_metric, "\"} ", 3); + sds_metric = flb_sds_cat(sds_metric, tmp, len); + sds_metric = flb_sds_cat(sds_metric, time_str, time_len); + sds_metric = flb_sds_cat(sds_metric, "\n", 1); + metrics_arr[index] = sds_metric; + index++; + } + } + } + + /* Sort metrics in alphabetic order, so we can group them later. */ + qsort(metrics_arr, num_metrics, sizeof(char *), string_cmp); + + /* When a new metric starts add HELP and TYPE annotation. */ + tmp_sds = flb_sds_cat(sds, "# HELP ", 7); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, metrics_arr[0], extract_metric_name_end_position(metrics_arr[0])); + null_check(tmp_sds); + if (!metrics_help_txt(metrics_arr[0], &metric_helptxt)) { + goto error; + } + tmp_sds = flb_sds_cat(sds, metric_helptxt, metric_helptxt_head->len); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "# TYPE ", 7); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, metrics_arr[0], extract_metric_name_end_position(metrics_arr[0])); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, " counter\n", 9); + null_check(tmp_sds); + + for (i = 0; i < num_metrics; i++) { + tmp_sds = flb_sds_cat(sds, metrics_arr[i], strlen(metrics_arr[i])); + null_check(tmp_sds); + if ((i != num_metrics - 1) && (is_same_metric(metrics_arr[i], metrics_arr[i+1]) == 0)) { + tmp_sds = flb_sds_cat(sds, "# HELP ", 7); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, metrics_arr[i+1], extract_metric_name_end_position(metrics_arr[i+1])); + null_check(tmp_sds); + metric_helptxt_head->len = 0; + if (!metrics_help_txt(metrics_arr[i+1], &metric_helptxt)) { + goto error; + } + tmp_sds = flb_sds_cat(sds, metric_helptxt, metric_helptxt_head->len); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "# TYPE ", 7); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, metrics_arr[i+1], extract_metric_name_end_position(metrics_arr[i+1])); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, " counter\n", 9); + null_check(tmp_sds); + } + } + + /* Attach uptime */ + uptime = time(NULL) - config->init_time; + len = snprintf(time_str, sizeof(time_str) - 1, "%lu", uptime); + + tmp_sds = flb_sds_cat(sds, + "# HELP fluentbit_uptime Number of seconds that Fluent Bit has " + "been running.\n", 76); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "# TYPE fluentbit_uptime counter\n", 32); + null_check(tmp_sds); + + tmp_sds = flb_sds_cat(sds, "fluentbit_uptime ", 17); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, time_str, len); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "\n", 1); + null_check(tmp_sds); + + /* Attach process_start_time_seconds metric. */ + start_time_len = snprintf(start_time_str, sizeof(start_time_str) - 1, + "%lu", config->init_time); + + tmp_sds = flb_sds_cat(sds, "# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.\n", 89); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "# TYPE process_start_time_seconds gauge\n", 40); + null_check(tmp_sds); + + tmp_sds = flb_sds_cat(sds, "process_start_time_seconds ", 27); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, start_time_str, start_time_len); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "\n", 1); + null_check(tmp_sds); + + /* Attach fluentbit_build_info metric. */ + tmp_sds = flb_sds_cat(sds, "# HELP fluentbit_build_info Build version information.\n", 55); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "# TYPE fluentbit_build_info gauge\n", 34); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "fluentbit_build_info{version=\"", 30); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, FLB_VERSION_STR, sizeof(FLB_VERSION_STR) - 1); + null_check(tmp_sds); + tmp_sds = flb_sds_cat(sds, "\",edition=\"", 11); + null_check(tmp_sds); +#ifdef FLB_ENTERPRISE + tmp_sds = flb_sds_cat(sds, "Enterprise\"} 1\n", 15); + null_check(tmp_sds); +#else + tmp_sds = flb_sds_cat(sds, "Community\"} 1\n", 14); + null_check(tmp_sds); +#endif + + msgpack_unpacked_destroy(&result); + buf->users--; + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_PROMETHEUS); + mk_http_send(request, sds, flb_sds_len(sds), NULL); + for (i = 0; i < num_metrics; i++) { + flb_sds_destroy(metrics_arr[i]); + } + flb_free(metrics_arr); + flb_sds_destroy(sds); + flb_sds_destroy(metric_helptxt); + + mk_http_done(request); + return; + +error: + mk_http_status(request, 500); + mk_http_done(request); + buf->users--; + + for (i = 0; i < index; i++) { + flb_sds_destroy(metrics_arr[i]); + } + flb_free(metrics_arr); + flb_sds_destroy(sds); + flb_sds_destroy(metric_helptxt); + msgpack_unpacked_destroy(&result); +} + +/* API: expose built-in metrics /api/v1/metrics */ +static void cb_metrics(mk_request_t *request, void *data) +{ + struct flb_hs_buf *buf; + + buf = metrics_get_latest(); + if (!buf) { + mk_http_status(request, 404); + mk_http_done(request); + return; + } + + buf->users++; + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON); + mk_http_send(request, buf->data, flb_sds_len(buf->data), NULL); + mk_http_done(request); + + buf->users--; +} + +/* Perform registration */ +int api_v1_metrics(struct flb_hs *hs) +{ + + pthread_key_create(&hs_metrics_key, hs_metrics_key_destroy); + + /* Create a message queue */ + hs->qid_metrics = mk_mq_create(hs->ctx, "/metrics", + cb_mq_metrics, NULL); + + /* HTTP end-points */ + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/metrics/prometheus", + cb_metrics_prometheus, hs); + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/metrics", cb_metrics, hs); + + return 0; +} diff --git a/fluent-bit/src/http_server/api/v1/metrics.h b/fluent-bit/src/http_server/api/v1/metrics.h new file mode 100644 index 00000000..f6bb0d01 --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/metrics.h @@ -0,0 +1,30 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_HS_API_V1_METRICS_H +#define FLB_HS_API_V1_METRICS_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> +#include <fluent-bit/flb_sds.h> + +int api_v1_metrics(struct flb_hs *hs); +flb_sds_t metrics_help_txt(char *metric_name, flb_sds_t *metric_helptxt); + +#endif diff --git a/fluent-bit/src/http_server/api/v1/plugins.c b/fluent-bit/src/http_server/api/v1/plugins.c new file mode 100644 index 00000000..1b63843c --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/plugins.c @@ -0,0 +1,109 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_http_server.h> +#include <msgpack.h> + +/* API: List all built-in plugins */ +static void cb_plugins(mk_request_t *request, void *data) +{ + int len; + flb_sds_t out_buf; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + struct mk_list *head; + struct flb_input_plugin *in; + struct flb_filter_plugin *filter; + struct flb_output_plugin *out; + struct flb_hs *hs = data; + struct flb_config *config = hs->config; + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str(&mp_pck, 7); + msgpack_pack_str_body(&mp_pck, "plugins", 7); + + /* + * plugins are: inputs, filters, outputs + */ + msgpack_pack_map(&mp_pck, 3); + + /* Inputs */ + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "inputs", 6); + len = mk_list_size(&config->in_plugins); + msgpack_pack_array(&mp_pck, len); + mk_list_foreach(head, &hs->config->in_plugins) { + in = mk_list_entry(head, struct flb_input_plugin, _head); + len = strlen(in->name); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, in->name, len); + } + + /* Filters */ + msgpack_pack_str(&mp_pck, 7); + msgpack_pack_str_body(&mp_pck, "filters", 7); + len = mk_list_size(&config->filter_plugins); + msgpack_pack_array(&mp_pck, len); + mk_list_foreach(head, &config->filter_plugins) { + filter = mk_list_entry(head, struct flb_filter_plugin, _head); + len = strlen(filter->name); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, filter->name, len); + } + + /* Outputs */ + msgpack_pack_str(&mp_pck, 7); + msgpack_pack_str_body(&mp_pck, "outputs", 7); + len = mk_list_size(&config->out_plugins); + msgpack_pack_array(&mp_pck, len); + mk_list_foreach(head, &config->out_plugins) { + out = mk_list_entry(head, struct flb_output_plugin, _head); + len = strlen(out->name); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, out->name, len); + } + + /* Export to JSON */ + out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); + + mk_http_status(request, 200); + mk_http_send(request, + out_buf, flb_sds_len(out_buf), NULL); + mk_http_done(request); + + flb_sds_destroy(out_buf); +} + +/* Perform registration */ +int api_v1_plugins(struct flb_hs *hs) +{ + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/plugins", cb_plugins, hs); + return 0; +} diff --git a/fluent-bit/src/http_server/api/v1/plugins.h b/fluent-bit/src/http_server/api/v1/plugins.h new file mode 100644 index 00000000..d2094032 --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/plugins.h @@ -0,0 +1,28 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_HS_API_V1_PLUGINS_H +#define FLB_HS_API_V1_PLUGINS_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> + +int api_v1_plugins(struct flb_hs *hs); + +#endif diff --git a/fluent-bit/src/http_server/api/v1/register.c b/fluent-bit/src/http_server/api/v1/register.c new file mode 100644 index 00000000..093644b3 --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/register.c @@ -0,0 +1,49 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> + +#include "uptime.h" +#include "metrics.h" +#include "storage.h" +#include "plugins.h" +#include "health.h" +#include "trace.h" + +int api_v1_registration(struct flb_hs *hs) +{ + api_v1_uptime(hs); + api_v1_metrics(hs); + api_v1_plugins(hs); + +#ifdef FLB_HAVE_CHUNK_TRACE + api_v1_trace(hs); +#endif /* FLB_HAVE_CHUNK_TRACE */ + + if (hs->config->health_check == FLB_TRUE) { + api_v1_health(hs); + } + + if (hs->config->storage_metrics == FLB_TRUE) { + api_v1_storage_metrics(hs); + } + + return 0; +} diff --git a/fluent-bit/src/http_server/api/v1/register.h b/fluent-bit/src/http_server/api/v1/register.h new file mode 100644 index 00000000..978db9e0 --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/register.h @@ -0,0 +1,28 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_API_V1_REG_H +#define FLB_API_V1_REG_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> + +int api_v1_registration(struct flb_hs *hs); + +#endif diff --git a/fluent-bit/src/http_server/api/v1/storage.c b/fluent-bit/src/http_server/api/v1/storage.c new file mode 100644 index 00000000..df47859d --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/storage.c @@ -0,0 +1,204 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_sds.h> +#include "storage.h" + +#include <fluent-bit/flb_http_server.h> +#include <msgpack.h> + +pthread_key_t hs_storage_metrics_key; + +/* Return the newest storage metrics buffer */ +static struct flb_hs_buf *storage_metrics_get_latest() +{ + struct flb_hs_buf *buf; + struct mk_list *metrics_list; + + metrics_list = pthread_getspecific(hs_storage_metrics_key); + if (!metrics_list) { + return NULL; + } + + if (mk_list_size(metrics_list) == 0) { + return NULL; + } + + buf = mk_list_entry_last(metrics_list, struct flb_hs_buf, _head); + return buf; +} + +/* Delete unused metrics, note that we only care about the latest node */ +static int cleanup_metrics() +{ + int c = 0; + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *metrics_list; + struct flb_hs_buf *last; + struct flb_hs_buf *entry; + + metrics_list = pthread_getspecific(hs_storage_metrics_key); + if (!metrics_list) { + return -1; + } + + last = storage_metrics_get_latest(); + if (!last) { + return -1; + } + + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct flb_hs_buf, _head); + if (entry != last && entry->users == 0) { + mk_list_del(&entry->_head); + flb_sds_destroy(entry->data); + flb_free(entry->raw_data); + flb_free(entry); + c++; + } + } + + return c; +} + +/* + * Callback invoked every time some storage metrics are received through a + * message queue channel. This function runs in a Monkey HTTP thread + * worker and it purpose is to take the metrics data and store it + * somewhere so then it can be available by the end-points upon + * HTTP client requests. + */ +static void cb_mq_storage_metrics(mk_mq_t *queue, void *data, size_t size) +{ + flb_sds_t out_data; + struct flb_hs_buf *buf; + struct mk_list *metrics_list = NULL; + + metrics_list = pthread_getspecific(hs_storage_metrics_key); + if (!metrics_list) { + metrics_list = flb_malloc(sizeof(struct mk_list)); + if (!metrics_list) { + flb_errno(); + return; + } + mk_list_init(metrics_list); + pthread_setspecific(hs_storage_metrics_key, metrics_list); + } + + /* Convert msgpack to JSON */ + out_data = flb_msgpack_raw_to_json_sds(data, size); + if (!out_data) { + return; + } + + buf = flb_malloc(sizeof(struct flb_hs_buf)); + if (!buf) { + flb_errno(); + flb_sds_destroy(out_data); + return; + } + buf->users = 0; + buf->data = out_data; + + buf->raw_data = flb_malloc(size); + memcpy(buf->raw_data, data, size); + buf->raw_size = size; + + mk_list_add(&buf->_head, metrics_list); + + cleanup_metrics(); +} + +/* FIXME: pending implementation of metrics exit interface +static void cb_mq_storage_metrics_exit(mk_mq_t *queue, void *data) +{ + +} +*/ + +/* API: expose built-in storage metrics /api/v1/storage */ +static void cb_storage(mk_request_t *request, void *data) +{ + struct flb_hs_buf *buf; + + buf = storage_metrics_get_latest(); + if (!buf) { + mk_http_status(request, 404); + mk_http_done(request); + return; + } + + buf->users++; + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON); + mk_http_send(request, buf->data, flb_sds_len(buf->data), NULL); + mk_http_done(request); + + buf->users--; +} + +static void hs_storage_metrics_key_destroy(void *data) +{ + struct mk_list *metrics_list = (struct mk_list*)data; + struct mk_list *tmp; + struct mk_list *head; + struct flb_hs_buf *entry; + + if (metrics_list == NULL) { + return; + } + + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct flb_hs_buf, _head); + if (entry != NULL) { + if (entry->raw_data != NULL) { + flb_free(entry->raw_data); + entry->raw_data = NULL; + } + if (entry->data) { + flb_sds_destroy(entry->data); + entry->data = NULL; + } + mk_list_del(&entry->_head); + flb_free(entry); + } + } + + flb_free(metrics_list); +} + +/* Perform registration */ +int api_v1_storage_metrics(struct flb_hs *hs) +{ + pthread_key_create(&hs_storage_metrics_key, hs_storage_metrics_key_destroy); + + /* Create a message queue */ + hs->qid_storage = mk_mq_create(hs->ctx, "/storage", + cb_mq_storage_metrics, + NULL); + + /* HTTP end-point */ + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/storage", cb_storage, hs); + + return 0; +} diff --git a/fluent-bit/src/http_server/api/v1/storage.h b/fluent-bit/src/http_server/api/v1/storage.h new file mode 100644 index 00000000..27410c79 --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/storage.h @@ -0,0 +1,28 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_HS_API_V1_STORAGE_METRICS_H +#define FLB_HS_API_V1_STORAGE_METRICS_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> + +int api_v1_storage_metrics(struct flb_hs *hs); + +#endif diff --git a/fluent-bit/src/http_server/api/v1/trace.c b/fluent-bit/src/http_server/api/v1/trace.c new file mode 100644 index 00000000..95da1734 --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/trace.c @@ -0,0 +1,615 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_http_server.h> +#include <fluent-bit/flb_lib.h> +#include <fluent-bit/flb_chunk_trace.h> +#include <fluent-bit/flb_kv.h> +#include <fluent-bit/flb_utils.h> +#include <msgpack.h> + + +static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name) +{ + struct mk_list *head; + struct flb_input_instance *in; + + + mk_list_foreach(head, &hs->config->inputs) { + in = mk_list_entry(head, struct flb_input_instance, _head); + if (strcmp(name, in->name) == 0) { + return in; + } + if (in->alias) { + if (strcmp(name, in->alias) == 0) { + return in; + } + } + } + return NULL; +} + +static int enable_trace_input(struct flb_hs *hs, const char *name, const char *prefix, const char *output_name, struct mk_list *props) +{ + struct flb_input_instance *in; + + + in = find_input(hs, name); + if (in == NULL) { + return 404; + } + + flb_chunk_trace_context_new(in, output_name, prefix, NULL, props); + return (in->chunk_trace_ctxt == NULL ? 503 : 0); +} + +static int disable_trace_input(struct flb_hs *hs, const char *name) +{ + struct flb_input_instance *in; + + + in = find_input(hs, name); + if (in == NULL) { + return 404; + } + + if (in->chunk_trace_ctxt != NULL) { + flb_chunk_trace_context_destroy(in); + } + return 201; +} + +static flb_sds_t get_input_name(mk_request_t *request) +{ + const char *base = "/api/v1/trace/"; + + + if (request->real_path.data == NULL) { + return NULL; + } + if (request->real_path.len < strlen(base)) { + return NULL; + } + + return flb_sds_create_len(&request->real_path.data[strlen(base)], + request->real_path.len - strlen(base)); +} + +static int http_disable_trace(mk_request_t *request, void *data, const char *input_name, msgpack_packer *mp_pck) +{ + struct flb_hs *hs = data; + int toggled_on = 503; + + + toggled_on = disable_trace_input(hs, input_name); + if (toggled_on < 300) { + msgpack_pack_map(mp_pck, 1); + msgpack_pack_str_with_body(mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok")); + return 201; + } + + return toggled_on; +} + +static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *result, const char *input_name) +{ + int ret = -1; + int i; + int x; + flb_sds_t prefix = NULL; + flb_sds_t output_name = NULL; + int toggled_on = -1; + msgpack_object *key; + msgpack_object *val; + struct mk_list *props = NULL; + msgpack_object_kv *param; + msgpack_object_str *param_key; + msgpack_object_str *param_val; + + + if (result->data.type == MSGPACK_OBJECT_MAP) { + for (i = 0; i < result->data.via.map.size; i++) { + key = &result->data.via.map.ptr[i].key; + val = &result->data.via.map.ptr[i].val; + + if (key->type != MSGPACK_OBJECT_STR) { + ret = -1; + goto parse_error; + } + + if (strncmp(key->via.str.ptr, "prefix", key->via.str.size) == 0) { + if (val->type != MSGPACK_OBJECT_STR) { + ret = -1; + goto parse_error; + } + if (prefix != NULL) { + flb_sds_destroy(prefix); + } + prefix = flb_sds_create_len(val->via.str.ptr, val->via.str.size); + } + else if (strncmp(key->via.str.ptr, "output", key->via.str.size) == 0) { + if (val->type != MSGPACK_OBJECT_STR) { + ret = -1; + goto parse_error; + } + if (output_name != NULL) { + flb_sds_destroy(output_name); + } + output_name = flb_sds_create_len(val->via.str.ptr, val->via.str.size); + } + else if (strncmp(key->via.str.ptr, "params", key->via.str.size) == 0) { + if (val->type != MSGPACK_OBJECT_MAP) { + ret = -1; + goto parse_error; + } + if (props != NULL) { + flb_free(props); + } + props = flb_calloc(1, sizeof(struct mk_list)); + flb_kv_init(props); + for (x = 0; x < val->via.map.size; x++) { + param = &val->via.map.ptr[x]; + if (param->val.type != MSGPACK_OBJECT_STR) { + ret = -1; + goto parse_error; + } + if (param->key.type != MSGPACK_OBJECT_STR) { + ret = -1; + goto parse_error; + } + param_key = ¶m->key.via.str; + param_val = ¶m->val.via.str; + flb_kv_item_create_len(props, + (char *)param_key->ptr, param_key->size, + (char *)param_val->ptr, param_val->size); + } + } + } + + if (output_name == NULL) { + output_name = flb_sds_create("stdout"); + } + + toggled_on = enable_trace_input(hs, input_name, prefix, output_name, props); + if (!toggled_on) { + ret = -1; + goto parse_error; + } + } + +parse_error: + if (prefix) flb_sds_destroy(prefix); + if (output_name) flb_sds_destroy(output_name); + if (props != NULL) { + flb_kv_release(props); + flb_free(props); + } + return ret; +} + +static int http_enable_trace(mk_request_t *request, void *data, const char *input_name, msgpack_packer *mp_pck) +{ + char *buf = NULL; + size_t buf_size; + msgpack_unpacked result; + int ret = -1; + int rc = -1; + int i; + int x; + size_t off = 0; + int root_type = MSGPACK_OBJECT_ARRAY; + struct flb_hs *hs = data; + flb_sds_t prefix = NULL; + flb_sds_t output_name = NULL; + msgpack_object *key; + msgpack_object *val; + struct mk_list *props = NULL; + struct flb_chunk_trace_limit limit = { 0 }; + struct flb_input_instance *input_instance; + + + if (request->method == MK_METHOD_GET) { + ret = enable_trace_input(hs, input_name, "trace.", "stdout", NULL); + if (ret == 0) { + msgpack_pack_map(mp_pck, 1); + msgpack_pack_str_with_body(mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok")); + return 200; + } + else { + flb_error("unable to enable tracing for %s", input_name); + goto input_error; + } + } + + msgpack_unpacked_init(&result); + rc = flb_pack_json(request->data.data, request->data.len, &buf, &buf_size, + &root_type, NULL); + if (rc == -1) { + ret = 503; + flb_error("unable to parse json parameters"); + goto unpack_error; + } + + rc = msgpack_unpack_next(&result, buf, buf_size, &off); + if (rc != MSGPACK_UNPACK_SUCCESS) { + ret = 503; + flb_error("unable to unpack msgpack parameters for %s", input_name); + goto unpack_error; + } + + if (result.data.type == MSGPACK_OBJECT_MAP) { + for (i = 0; i < result.data.via.map.size; i++) { + key = &result.data.via.map.ptr[i].key; + val = &result.data.via.map.ptr[i].val; + + if (key->type != MSGPACK_OBJECT_STR) { + ret = 503; + flb_error("non string key in parameters"); + goto parse_error; + } + + if (strncmp(key->via.str.ptr, "prefix", key->via.str.size) == 0) { + if (val->type != MSGPACK_OBJECT_STR) { + ret = 503; + flb_error("prefix is not a string"); + goto parse_error; + } + if (prefix != NULL) { + flb_sds_destroy(prefix); + } + prefix = flb_sds_create_len(val->via.str.ptr, val->via.str.size); + } + else if (strncmp(key->via.str.ptr, "output", key->via.str.size) == 0) { + if (val->type != MSGPACK_OBJECT_STR) { + ret = 503; + flb_error("output is not a string"); + goto parse_error; + } + if (output_name != NULL) { + flb_sds_destroy(output_name); + } + output_name = flb_sds_create_len(val->via.str.ptr, val->via.str.size); + } + else if (strncmp(key->via.str.ptr, "params", key->via.str.size) == 0) { + if (val->type != MSGPACK_OBJECT_MAP) { + ret = 503; + flb_error("output params is not a maps"); + goto parse_error; + } + props = flb_calloc(1, sizeof(struct mk_list)); + flb_kv_init(props); + for (x = 0; x < val->via.map.size; x++) { + if (val->via.map.ptr[x].val.type != MSGPACK_OBJECT_STR) { + ret = 503; + flb_error("output parameter key is not a string"); + goto parse_error; + } + if (val->via.map.ptr[x].key.type != MSGPACK_OBJECT_STR) { + ret = 503; + flb_error("output parameter value is not a string"); + goto parse_error; + } + flb_kv_item_create_len(props, + (char *)val->via.map.ptr[x].key.via.str.ptr, val->via.map.ptr[x].key.via.str.size, + (char *)val->via.map.ptr[x].val.via.str.ptr, val->via.map.ptr[x].val.via.str.size); + } + } + else if (strncmp(key->via.str.ptr, "limit", key->via.str.size) == 0) { + if (val->type != MSGPACK_OBJECT_MAP) { + ret = 503; + flb_error("limit must be a map of limit types"); + goto parse_error; + } + if (val->via.map.size != 1) { + ret = 503; + flb_error("limit must have a single limit type"); + goto parse_error; + } + if (val->via.map.ptr[0].key.type != MSGPACK_OBJECT_STR) { + ret = 503; + flb_error("limit type (key) must be a string"); + goto parse_error; + } + if (val->via.map.ptr[0].val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + ret = 503; + flb_error("limit type must be an integer"); + goto parse_error; + } + if (strncmp(val->via.map.ptr[0].key.via.str.ptr, "seconds", val->via.map.ptr[0].key.via.str.size) == 0) { + limit.type = FLB_CHUNK_TRACE_LIMIT_TIME; + limit.seconds = val->via.map.ptr[0].val.via.u64; + } + else if (strncmp(val->via.map.ptr[0].key.via.str.ptr, "count", val->via.map.ptr[0].key.via.str.size) == 0) { + limit.type = FLB_CHUNK_TRACE_LIMIT_COUNT; + limit.count = val->via.map.ptr[0].val.via.u64; + } + else { + ret = 503; + flb_error("unknown limit type"); + goto parse_error; + } + } + } + + if (output_name == NULL) { + output_name = flb_sds_create("stdout"); + } + + ret = enable_trace_input(hs, input_name, prefix, output_name, props); + if (ret != 0) { + flb_error("error when enabling tracing"); + goto parse_error; + } + + if (limit.type != 0) { + input_instance = find_input(hs, input_name); + if (limit.type == FLB_CHUNK_TRACE_LIMIT_TIME) { + flb_chunk_trace_context_set_limit(input_instance->chunk_trace_ctxt, limit.type, limit.seconds); + } + else if (limit.type == FLB_CHUNK_TRACE_LIMIT_COUNT) { + flb_chunk_trace_context_set_limit(input_instance->chunk_trace_ctxt, limit.type, limit.count); + } + } + } + + msgpack_pack_map(mp_pck, 1); + msgpack_pack_str_with_body(mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok")); + + ret = 200; +parse_error: + if (prefix) flb_sds_destroy(prefix); + if (output_name) flb_sds_destroy(output_name); + if (props != NULL) { + flb_kv_release(props); + flb_free(props); + } +unpack_error: + msgpack_unpacked_destroy(&result); + if (buf != NULL) { + flb_free(buf); + } +input_error: + return ret; +} + +static void cb_trace(mk_request_t *request, void *data) +{ + flb_sds_t out_buf; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + int response = 404; + flb_sds_t input_name = NULL; + + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + input_name = get_input_name(request); + if (input_name == NULL) { + response = 404; + goto error; + } + + if (request->method == MK_METHOD_POST || request->method == MK_METHOD_GET) { + response = http_enable_trace(request, data, input_name, &mp_pck); + } + else if (request->method == MK_METHOD_DELETE) { + response = http_disable_trace(request, data, input_name, &mp_pck); + } +error: + if (response == 404) { + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(&mp_pck, "not found", strlen("not found")); + } + else if (response == 503) { + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); + } + + if (input_name != NULL) { + flb_sds_destroy(input_name); + } + + /* Export to JSON */ + out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + if (out_buf == NULL) { + mk_http_status(request, 503); + mk_http_done(request); + return; + } + + mk_http_status(request, response); + mk_http_send(request, out_buf, flb_sds_len(out_buf), NULL); + mk_http_done(request); + + msgpack_sbuffer_destroy(&mp_sbuf); + flb_sds_destroy(out_buf); +} + +static void cb_traces(mk_request_t *request, void *data) +{ + flb_sds_t out_buf; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + int ret; + char *buf = NULL; + size_t buf_size; + int root_type = MSGPACK_OBJECT_ARRAY; + msgpack_unpacked result; + flb_sds_t error_msg = NULL; + int response = 200; + flb_sds_t input_name; + msgpack_object_array *inputs = NULL; + size_t off = 0; + int i; + + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_unpacked_init(&result); + ret = flb_pack_json(request->data.data, request->data.len, &buf, &buf_size, + &root_type, NULL); + if (ret == -1) { + goto unpack_error; + } + + ret = msgpack_unpack_next(&result, buf, buf_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + ret = -1; + error_msg = flb_sds_create("unfinished input"); + goto unpack_error; + } + + if (result.data.type != MSGPACK_OBJECT_MAP) { + response = 503; + error_msg = flb_sds_create("input is not an object"); + goto unpack_error; + } + + for (i = 0; i < result.data.via.map.size; i++) { + if (result.data.via.map.ptr[i].val.type != MSGPACK_OBJECT_ARRAY) { + continue; + } + if (result.data.via.map.ptr[i].key.type != MSGPACK_OBJECT_STR) { + continue; + } + if (result.data.via.map.ptr[i].key.via.str.size < strlen("inputs")) { + continue; + } + if (strncmp(result.data.via.map.ptr[i].key.via.str.ptr, "inputs", strlen("inputs"))) { + continue; + } + inputs = &result.data.via.map.ptr[i].val.via.array; + } + + if (inputs == NULL) { + response = 503; + error_msg = flb_sds_create("inputs not found"); + goto unpack_error; + } + + msgpack_pack_map(&mp_pck, 2); + + msgpack_pack_str_with_body(&mp_pck, "inputs", strlen("inputs")); + msgpack_pack_map(&mp_pck, inputs->size); + + for (i = 0; i < inputs->size; i++) { + input_name = flb_sds_create_len(inputs->ptr[i].via.str.ptr, inputs->ptr[i].via.str.size); + msgpack_pack_str_with_body(&mp_pck, input_name, flb_sds_len(input_name)); + + if (inputs->ptr[i].type != MSGPACK_OBJECT_STR) { + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); + } + else { + if (request->method == MK_METHOD_POST || request->method == MK_METHOD_GET) { + ret = msgpack_params_enable_trace((struct flb_hs *)data, &result, input_name); + if (ret != 0) { + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); + msgpack_pack_str_with_body(&mp_pck, "returncode", strlen("returncode")); + msgpack_pack_int64(&mp_pck, ret); + } + else { + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(&mp_pck, "ok", strlen("ok")); + } + } + else if (request->method == MK_METHOD_DELETE) { + disable_trace_input((struct flb_hs *)data, input_name); + } + else { + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); + msgpack_pack_str_with_body(&mp_pck, "message", strlen("message")); + msgpack_pack_str_with_body(&mp_pck, "method not allowed", strlen("method not allowed")); + } + } + } + + msgpack_pack_str_with_body(&mp_pck, "result", strlen("result")); +unpack_error: + if (buf != NULL) { + flb_free(buf); + } + msgpack_unpacked_destroy(&result); + if (response == 404) { + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(&mp_pck, "not found", strlen("not found")); + } + else if (response == 503) { + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); + msgpack_pack_str_with_body(&mp_pck, "message", strlen("message")); + if (error_msg) { + msgpack_pack_str_with_body(&mp_pck, error_msg, flb_sds_len(error_msg)); + flb_sds_destroy(error_msg); + } + else { + msgpack_pack_str_with_body(&mp_pck, "unknown error", strlen("unknown error")); + } + } + else { + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); + msgpack_pack_str_with_body(&mp_pck, "ok", strlen("ok")); + } + + /* Export to JSON */ + out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + if (out_buf == NULL) { + out_buf = flb_sds_create("serialization error"); + } + msgpack_sbuffer_destroy(&mp_sbuf); + + mk_http_status(request, response); + mk_http_send(request, + out_buf, flb_sds_len(out_buf), NULL); + mk_http_done(request); + + flb_sds_destroy(out_buf); +} + +/* Perform registration */ +int api_v1_trace(struct flb_hs *hs) +{ + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/traces/", cb_traces, hs); + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/trace/*", cb_trace, hs); + return 0; +} diff --git a/fluent-bit/src/http_server/api/v1/trace.h b/fluent-bit/src/http_server/api/v1/trace.h new file mode 100644 index 00000000..236925de --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/trace.h @@ -0,0 +1,28 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_HS_API_V1_TRACE_H +#define FLB_HS_API_V1_TRACE_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> + +int api_v1_trace(struct flb_hs *hs); + +#endif diff --git a/fluent-bit/src/http_server/api/v1/uptime.c b/fluent-bit/src/http_server/api/v1/uptime.c new file mode 100644 index 00000000..37f97187 --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/uptime.c @@ -0,0 +1,111 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_http_server.h> +#include <fluent-bit/flb_mem.h> + +#define FLB_UPTIME_ONEDAY 86400 +#define FLB_UPTIME_ONEHOUR 3600 +#define FLB_UPTIME_ONEMINUTE 60 + +/* Append human-readable uptime */ +static void uptime_hr(time_t uptime, msgpack_packer *mp_pck) +{ + int len; + int days; + int hours; + int minutes; + int seconds; + long int upmind; + long int upminh; + char buf[256]; + + /* days */ + days = uptime / FLB_UPTIME_ONEDAY; + upmind = uptime - (days * FLB_UPTIME_ONEDAY); + + /* hours */ + hours = upmind / FLB_UPTIME_ONEHOUR; + upminh = upmind - hours * FLB_UPTIME_ONEHOUR; + + /* minutes */ + minutes = upminh / FLB_UPTIME_ONEMINUTE; + seconds = upminh - minutes * FLB_UPTIME_ONEMINUTE; + + len = snprintf(buf, sizeof(buf) - 1, + "Fluent Bit has been running: " + " %i day%s, %i hour%s, %i minute%s and %i second%s", + days, (days > 1) ? "s" : "", hours, \ + (hours > 1) ? "s" : "", minutes, \ + (minutes > 1) ? "s" : "", seconds, \ + (seconds > 1) ? "s" : ""); + msgpack_pack_str(mp_pck, 9); + msgpack_pack_str_body(mp_pck, "uptime_hr", 9); + msgpack_pack_str(mp_pck, len); + msgpack_pack_str_body(mp_pck, buf, len); +} + +/* API: List all built-in plugins */ +static void cb_uptime(mk_request_t *request, void *data) +{ + flb_sds_t out_buf; + size_t out_size; + time_t uptime; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + struct flb_hs *hs = data; + struct flb_config *config = hs->config; + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str(&mp_pck, 10); + msgpack_pack_str_body(&mp_pck, "uptime_sec", 10); + + uptime = time(NULL) - config->init_time; + msgpack_pack_uint64(&mp_pck, uptime); + + uptime_hr(uptime, &mp_pck); + + /* Export to JSON */ + out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); + if (!out_buf) { + return; + } + out_size = flb_sds_len(out_buf); + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON); + mk_http_send(request, out_buf, out_size, NULL); + mk_http_done(request); + + flb_sds_destroy(out_buf); +} + +/* Perform registration */ +int api_v1_uptime(struct flb_hs *hs) +{ + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/uptime", cb_uptime, hs); + return 0; +} diff --git a/fluent-bit/src/http_server/api/v1/uptime.h b/fluent-bit/src/http_server/api/v1/uptime.h new file mode 100644 index 00000000..5a424779 --- /dev/null +++ b/fluent-bit/src/http_server/api/v1/uptime.h @@ -0,0 +1,28 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_HS_API_V1_UPTIME_H +#define FLB_HS_API_V1_UPTIME_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> + +int api_v1_uptime(struct flb_hs *hs); + +#endif diff --git a/fluent-bit/src/http_server/api/v2/CMakeLists.txt b/fluent-bit/src/http_server/api/v2/CMakeLists.txt new file mode 100644 index 00000000..a9d590fb --- /dev/null +++ b/fluent-bit/src/http_server/api/v2/CMakeLists.txt @@ -0,0 +1,10 @@ +# api/v2 +set(src + metrics.c + reload.c + register.c + ) + +include_directories(${MONKEY_INCLUDE_DIR}) +add_library(api-v2 STATIC ${src}) +target_link_libraries(api-v2 monkey-core-static fluent-bit-static) diff --git a/fluent-bit/src/http_server/api/v2/metrics.c b/fluent-bit/src/http_server/api/v2/metrics.c new file mode 100644 index 00000000..27513b7a --- /dev/null +++ b/fluent-bit/src/http_server/api/v2/metrics.c @@ -0,0 +1,259 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_version.h> +#include <fluent-bit/flb_time.h> +#include "metrics.h" + +#include <fluent-bit/flb_http_server.h> + +#define null_check(x) do { if (!x) { goto error; } else {sds = x;} } while (0) + +pthread_key_t hs_metrics_v2_key; + +static struct mk_list *hs_metrics_v2_key_create() +{ + struct mk_list *metrics_list = NULL; + + metrics_list = flb_malloc(sizeof(struct mk_list)); + if (metrics_list == NULL) { + flb_errno(); + return NULL; + } + mk_list_init(metrics_list); + pthread_setspecific(hs_metrics_v2_key, metrics_list); + + return metrics_list; +} + +static void hs_metrics_v2_key_destroy(void *data) +{ + struct mk_list *metrics_list = (struct mk_list*) data; + struct mk_list *tmp; + struct mk_list *head; + struct flb_hs_buf *entry; + + if (metrics_list == NULL) { + return; + } + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct flb_hs_buf, _head); + if (entry != NULL) { + if (entry->raw_data != NULL) { + cmt_destroy(entry->raw_data); + entry->raw_data = NULL; + } + mk_list_del(&entry->_head); + flb_free(entry); + } + } + + flb_free(metrics_list); +} + +/* Return the newest metrics buffer */ +static struct flb_hs_buf *metrics_get_latest() +{ + struct flb_hs_buf *buf; + struct mk_list *metrics_list; + + metrics_list = pthread_getspecific(hs_metrics_v2_key); + if (!metrics_list) { + return NULL; + } + + if (mk_list_size(metrics_list) == 0) { + return NULL; + } + + buf = mk_list_entry_last(metrics_list, struct flb_hs_buf, _head); + return buf; +} + +/* Delete unused metrics, note that we only care about the latest node */ +static int cleanup_metrics() +{ + int c = 0; + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *metrics_list; + struct flb_hs_buf *last; + struct flb_hs_buf *entry; + + metrics_list = pthread_getspecific(hs_metrics_v2_key); + if (!metrics_list) { + return -1; + } + + last = metrics_get_latest(); + if (!last) { + return -1; + } + + mk_list_foreach_safe(head, tmp, metrics_list) { + entry = mk_list_entry(head, struct flb_hs_buf, _head); + if (entry != last && entry->users == 0) { + mk_list_del(&entry->_head); + cmt_destroy(entry->raw_data); + flb_free(entry); + c++; + } + } + + return c; +} + +/* + * Callback invoked every time some metrics are received through a message queue channel. + * This function runs in a Monkey HTTP thread worker and it purpose is to take the metrics + * data and store it somewhere so then it can be available by the end-points upon + * HTTP client requests. + */ +static void cb_mq_metrics(mk_mq_t *queue, void *data, size_t size) +{ + int ret; + size_t off = 0; + struct cmt *cmt; + struct flb_hs_buf *buf; + struct mk_list *metrics_list = NULL; + + metrics_list = pthread_getspecific(hs_metrics_v2_key); + if (!metrics_list) { + metrics_list = hs_metrics_v2_key_create(); + if (metrics_list == NULL) { + return; + } + } + + /* decode cmetrics */ + ret = cmt_decode_msgpack_create(&cmt, data, size, &off); + if (ret != 0) { + return; + } + + buf = flb_malloc(sizeof(struct flb_hs_buf)); + if (!buf) { + flb_errno(); + return; + } + buf->users = 0; + buf->data = NULL; + + /* Store CMetrics context as the raw_data */ + buf->raw_data = cmt; + buf->raw_size = 0; + + mk_list_add(&buf->_head, metrics_list); + cleanup_metrics(); +} + +/* API: expose metrics in Prometheus format /api/v2/metrics/prometheus */ +static void cb_metrics_prometheus(mk_request_t *request, void *data) +{ + struct cmt *cmt; + struct flb_hs_buf *buf; + cfl_sds_t payload; + + buf = metrics_get_latest(); + if (!buf) { + mk_http_status(request, 404); + mk_http_done(request); + return; + } + + cmt = (struct cmt *) buf->raw_data; + + /* convert CMetrics to text */ + payload = cmt_encode_prometheus_create(cmt, CMT_FALSE); + if (!payload) { + mk_http_status(request, 500); + mk_http_done(request); + return; + } + + buf->users++; + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_PROMETHEUS); + mk_http_send(request, payload, cfl_sds_len(payload), NULL); + mk_http_done(request); + + cmt_encode_prometheus_destroy(payload); + + buf->users--; +} + +/* API: expose built-in metrics /api/v1/metrics (JSON format) */ +static void cb_metrics(mk_request_t *request, void *data) +{ + struct cmt *cmt; + struct flb_hs_buf *buf; + cfl_sds_t payload; + + buf = metrics_get_latest(); + if (!buf) { + mk_http_status(request, 404); + mk_http_done(request); + return; + } + + cmt = (struct cmt *) buf->raw_data; + + /* convert CMetrics to text */ + payload = cmt_encode_text_create(cmt); + if (!payload) { + mk_http_status(request, 500); + mk_http_done(request); + return; + } + + buf->users++; + + mk_http_status(request, 200); + mk_http_send(request, payload, cfl_sds_len(payload), NULL); + mk_http_done(request); + + cmt_encode_text_destroy(payload); + + buf->users--; +} + +/* Perform registration */ +int api_v2_metrics(struct flb_hs *hs) +{ + + pthread_key_create(&hs_metrics_v2_key, hs_metrics_v2_key_destroy); + + /* Create a message queue */ + hs->qid_metrics_v2 = mk_mq_create(hs->ctx, "/metrics_v2", + cb_mq_metrics, NULL); + /* HTTP end-points */ + mk_vhost_handler(hs->ctx, hs->vid, "/api/v2/metrics/prometheus", + cb_metrics_prometheus, hs); + + mk_vhost_handler(hs->ctx, hs->vid, "/api/v2/metrics", cb_metrics, hs); + + return 0; +} diff --git a/fluent-bit/src/http_server/api/v2/metrics.h b/fluent-bit/src/http_server/api/v2/metrics.h new file mode 100644 index 00000000..5336865d --- /dev/null +++ b/fluent-bit/src/http_server/api/v2/metrics.h @@ -0,0 +1,28 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_HS_API_V2_METRICS_H +#define FLB_HS_API_V2_METRICS_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> + +int api_v2_metrics(struct flb_hs *hs); + +#endif diff --git a/fluent-bit/src/http_server/api/v2/register.c b/fluent-bit/src/http_server/api/v2/register.c new file mode 100644 index 00000000..7a0956fb --- /dev/null +++ b/fluent-bit/src/http_server/api/v2/register.c @@ -0,0 +1,31 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> + +#include "metrics.h" +#include "reload.h" + +int api_v2_registration(struct flb_hs *hs) +{ + api_v2_reload(hs); + api_v2_metrics(hs); + return 0; +} diff --git a/fluent-bit/src/http_server/api/v2/register.h b/fluent-bit/src/http_server/api/v2/register.h new file mode 100644 index 00000000..da6d78f3 --- /dev/null +++ b/fluent-bit/src/http_server/api/v2/register.h @@ -0,0 +1,28 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_API_V2_REG_H +#define FLB_API_V2_REG_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> + +int api_v2_registration(struct flb_hs *hs); + +#endif diff --git a/fluent-bit/src/http_server/api/v2/reload.c b/fluent-bit/src/http_server/api/v2/reload.c new file mode 100644 index 00000000..3bb5159f --- /dev/null +++ b/fluent-bit/src/http_server/api/v2/reload.c @@ -0,0 +1,161 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_version.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_lib.h> +#include <fluent-bit/flb_reload.h> +#include "reload.h" + +#include <signal.h> + +#include <fluent-bit/flb_http_server.h> + +static void handle_reload_request(mk_request_t *request, struct flb_config *config) +{ + int ret; + flb_sds_t out_buf; + size_t out_size; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "reload", 6); + +#ifdef FLB_SYSTEM_WINDOWS + ret = -1; + + msgpack_pack_str(&mp_pck, 11); + msgpack_pack_str_body(&mp_pck, "unsupported", 11); + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "status", 6); + msgpack_pack_int64(&mp_pck, ret); +#else + if (config->enable_hot_reload != FLB_TRUE) { + msgpack_pack_str(&mp_pck, 11); + msgpack_pack_str_body(&mp_pck, "not enabled", 11); + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "status", 6); + msgpack_pack_int64(&mp_pck, -1); + } + else { + ret = kill(getpid(), SIGHUP); + if (ret != 0) { + mk_http_status(request, 500); + mk_http_done(request); + return; + } + + msgpack_pack_str(&mp_pck, 4); + msgpack_pack_str_body(&mp_pck, "done", 4); + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "status", 6); + msgpack_pack_int64(&mp_pck, ret); + } + +#endif + + /* Export to JSON */ + out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); + if (!out_buf) { + mk_http_status(request, 400); + mk_http_done(request); + return; + } + out_size = flb_sds_len(out_buf); + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON); + mk_http_send(request, out_buf, out_size, NULL); + mk_http_done(request); + + flb_sds_destroy(out_buf); +} + +static void handle_get_reload_status(mk_request_t *request, struct flb_config *config) +{ + flb_sds_t out_buf; + size_t out_size; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str(&mp_pck, 16); + msgpack_pack_str_body(&mp_pck, "hot_reload_count", 16); + msgpack_pack_int64(&mp_pck, config->hot_reloaded_count); + + /* Export to JSON */ + out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); + if (!out_buf) { + mk_http_status(request, 400); + mk_http_done(request); + return; + } + out_size = flb_sds_len(out_buf); + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON); + mk_http_send(request, out_buf, out_size, NULL); + mk_http_done(request); + + flb_sds_destroy(out_buf); +} + +static void cb_reload(mk_request_t *request, void *data) +{ + struct flb_hs *hs = data; + struct flb_config *config = hs->config; + + if (request->method == MK_METHOD_POST || + request->method == MK_METHOD_PUT) { + handle_reload_request(request, config); + } + else if (request->method == MK_METHOD_GET) { + handle_get_reload_status(request, config); + } + else { + mk_http_status(request, 400); + mk_http_done(request); + } +} + +/* Perform registration */ +int api_v2_reload(struct flb_hs *hs) +{ + mk_vhost_handler(hs->ctx, hs->vid, "/api/v2/reload", cb_reload, hs); + + return 0; +} diff --git a/fluent-bit/src/http_server/api/v2/reload.h b/fluent-bit/src/http_server/api/v2/reload.h new file mode 100644 index 00000000..e64e867d --- /dev/null +++ b/fluent-bit/src/http_server/api/v2/reload.h @@ -0,0 +1,28 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_HS_API_V2_RELOAD_H +#define FLB_HS_API_V2_RELOAD_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_http_server.h> + +int api_v2_reload(struct flb_hs *hs); + +#endif diff --git a/fluent-bit/src/http_server/flb_hs.c b/fluent-bit/src/http_server/flb_hs.c new file mode 100644 index 00000000..40cc8467 --- /dev/null +++ b/fluent-bit/src/http_server/flb_hs.c @@ -0,0 +1,147 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2020 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_http_server.h> + +#include <monkey/mk_lib.h> + +/* v1 */ +#include "api/v1/register.h" +#include "api/v1/health.h" + +/* v2 */ +#include "api/v2/register.h" + +static void cb_root(mk_request_t *request, void *data) +{ + struct flb_hs *hs = data; + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON); + mk_http_send(request, hs->ep_root_buf, hs->ep_root_size, NULL); + mk_http_done(request); +} + +/* Ingest health metrics into the web service context */ +int flb_hs_push_health_metrics(struct flb_hs *hs, void *data, size_t size) +{ + return mk_mq_send(hs->ctx, hs->qid_health, data, size); +} + +/* Ingest pipeline metrics into the web service context */ +int flb_hs_push_pipeline_metrics(struct flb_hs *hs, void *data, size_t size) +{ + return mk_mq_send(hs->ctx, hs->qid_metrics, data, size); +} + +/* Ingest pipeline metrics into the web service context */ +int flb_hs_push_metrics(struct flb_hs *hs, void *data, size_t size) +{ + return mk_mq_send(hs->ctx, hs->qid_metrics_v2, data, size); +} + +/* Ingest storage metrics into the web service context */ +int flb_hs_push_storage_metrics(struct flb_hs *hs, void *data, size_t size) +{ + return mk_mq_send(hs->ctx, hs->qid_storage, data, size); +} + +/* Create ROOT endpoints */ +struct flb_hs *flb_hs_create(const char *listen, const char *tcp_port, + struct flb_config *config) +{ + int vid; + char tmp[32]; + struct flb_hs *hs; + + hs = flb_calloc(1, sizeof(struct flb_hs)); + if (!hs) { + flb_errno(); + return NULL; + } + hs->config = config; + + /* Setup endpoint specific data */ + flb_hs_endpoints(hs); + + /* Create HTTP server context */ + hs->ctx = mk_create(); + if (!hs->ctx) { + flb_error("[http_server] could not create context"); + flb_free(hs); + return NULL; + } + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) -1, "%s:%s", listen, tcp_port); + mk_config_set(hs->ctx, "Listen", tmp, NULL); + vid = mk_vhost_create(hs->ctx, NULL); + hs->vid = vid; + + /* Setup virtual host */ + mk_vhost_set(hs->ctx, vid, + "Name", "fluent-bit", + NULL); + + + /* Register endpoints for /api/v1 */ + api_v1_registration(hs); + + /* Register endpoints for /api/v2 */ + api_v2_registration(hs); + + /* Root */ + mk_vhost_handler(hs->ctx, vid, "/", cb_root, hs); + + return hs; +} + +int flb_hs_start(struct flb_hs *hs) +{ + int ret; + struct flb_config *config = hs->config; + + ret = mk_start(hs->ctx); + + if (ret == 0) { + flb_info("[http_server] listen iface=%s tcp_port=%s", + config->http_listen, config->http_port); + } + return ret; +} + +int flb_hs_destroy(struct flb_hs *hs) +{ + if (!hs) { + return 0; + } + flb_hs_health_destroy(); + mk_stop(hs->ctx); + mk_destroy(hs->ctx); + + flb_hs_endpoints_free(hs); + flb_free(hs); + + + return 0; +} diff --git a/fluent-bit/src/http_server/flb_hs_endpoints.c b/fluent-bit/src/http_server/flb_hs_endpoints.c new file mode 100644 index 00000000..2ea12a98 --- /dev/null +++ b/fluent-bit/src/http_server/flb_hs_endpoints.c @@ -0,0 +1,121 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2020 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_version.h> +#include <fluent-bit/flb_http_server.h> +#include <msgpack.h> + +/* Create a JSON buffer with informational data about the running service */ +static int endpoint_root(struct flb_hs *hs) +{ + int c; + flb_sds_t out_buf; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + struct mk_list *head; + struct mk_list *list; + struct flb_split_entry *entry; + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str(&mp_pck, 10); + msgpack_pack_str_body(&mp_pck, "fluent-bit", 10); + + /* entries under fluent-bit parent: + * + * - version + * - edition + * - built flags + */ + msgpack_pack_map(&mp_pck, 3); + + /* fluent-bit['version'] */ + msgpack_pack_str(&mp_pck, 7); + msgpack_pack_str_body(&mp_pck, "version", 7); + msgpack_pack_str(&mp_pck, sizeof(FLB_VERSION_STR) - 1); + msgpack_pack_str_body(&mp_pck, FLB_VERSION_STR, sizeof(FLB_VERSION_STR) - 1); + + /* fluent-bit['edition'] */ + msgpack_pack_str(&mp_pck, 7); + msgpack_pack_str_body(&mp_pck, "edition", 7); +#ifdef FLB_ENTERPRISE + msgpack_pack_str(&mp_pck, 10); + msgpack_pack_str_body(&mp_pck, "Enterprise", 10); +#else + msgpack_pack_str(&mp_pck, 9); + msgpack_pack_str_body(&mp_pck, "Community", 9); +#endif + + /* fluent-bit['flags'] */ + msgpack_pack_str(&mp_pck, 5); + msgpack_pack_str_body(&mp_pck, "flags", 5); + + c = 0; + list = flb_utils_split(FLB_INFO_FLAGS, ' ', -1); + mk_list_foreach(head, list) { + entry = mk_list_entry(head, struct flb_split_entry, _head); + if (strncmp(entry->value, "FLB_", 4) == 0) { + c++; + } + } + + msgpack_pack_array(&mp_pck, c); + mk_list_foreach(head, list) { + entry = mk_list_entry(head, struct flb_split_entry, _head); + if (strncmp(entry->value, "FLB_", 4) == 0) { + msgpack_pack_str(&mp_pck, entry->len); + msgpack_pack_str_body(&mp_pck, entry->value, entry->len); + } + } + flb_utils_split_free(list); + + /* export as JSON */ + out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); + + if (out_buf) { + hs->ep_root_buf = out_buf; + hs->ep_root_size = flb_sds_len(out_buf); + } + + return -1; +} + +int flb_hs_endpoints(struct flb_hs *hs) +{ + endpoint_root(hs); + return 0; +} + +/* Release cached data from endpoints */ +int flb_hs_endpoints_free(struct flb_hs *hs) +{ + if (hs->ep_root_buf) { + flb_sds_destroy(hs->ep_root_buf); + } + + return 0; +} diff --git a/fluent-bit/src/http_server/flb_hs_utils.c b/fluent-bit/src/http_server/flb_hs_utils.c new file mode 100644 index 00000000..7b39bff2 --- /dev/null +++ b/fluent-bit/src/http_server/flb_hs_utils.c @@ -0,0 +1,48 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_http_server.h> +#include <monkey/mk_lib.h> + +int flb_hs_add_content_type_to_req(mk_request_t *request, int type) +{ + if (request == NULL) { + return -1; + } + + switch (type) { + case FLB_HS_CONTENT_TYPE_JSON: + mk_http_header(request, + FLB_HS_CONTENT_TYPE_KEY_STR, FLB_HS_CONTENT_TYPE_KEY_LEN, + FLB_HS_CONTENT_TYPE_JSON_STR, FLB_HS_CONTENT_TYPE_JSON_LEN); + break; + case FLB_HS_CONTENT_TYPE_PROMETHEUS: + mk_http_header(request, + FLB_HS_CONTENT_TYPE_KEY_STR, FLB_HS_CONTENT_TYPE_KEY_LEN, + FLB_HS_CONTENT_TYPE_PROMETHEUS_STR, FLB_HS_CONTENT_TYPE_PROMETHEUS_LEN); + break; + default: + flb_error("[%s] unknown type=%d", __FUNCTION__, type); + return -1; + } + + return 0; +} |