summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/http_server/api/v1/trace.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/http_server/api/v1/trace.c')
-rw-r--r--fluent-bit/src/http_server/api/v1/trace.c615
1 files changed, 615 insertions, 0 deletions
diff --git a/fluent-bit/src/http_server/api/v1/trace.c b/fluent-bit/src/http_server/api/v1/trace.c
new file mode 100644
index 000000000..95da17343
--- /dev/null
+++ b/fluent-bit/src/http_server/api/v1/trace.c
@@ -0,0 +1,615 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_http_server.h>
+#include <fluent-bit/flb_lib.h>
+#include <fluent-bit/flb_chunk_trace.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_utils.h>
+#include <msgpack.h>
+
+
+static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name)
+{
+ struct mk_list *head;
+ struct flb_input_instance *in;
+
+
+ mk_list_foreach(head, &hs->config->inputs) {
+ in = mk_list_entry(head, struct flb_input_instance, _head);
+ if (strcmp(name, in->name) == 0) {
+ return in;
+ }
+ if (in->alias) {
+ if (strcmp(name, in->alias) == 0) {
+ return in;
+ }
+ }
+ }
+ return NULL;
+}
+
+static int enable_trace_input(struct flb_hs *hs, const char *name, const char *prefix, const char *output_name, struct mk_list *props)
+{
+ struct flb_input_instance *in;
+
+
+ in = find_input(hs, name);
+ if (in == NULL) {
+ return 404;
+ }
+
+ flb_chunk_trace_context_new(in, output_name, prefix, NULL, props);
+ return (in->chunk_trace_ctxt == NULL ? 503 : 0);
+}
+
+static int disable_trace_input(struct flb_hs *hs, const char *name)
+{
+ struct flb_input_instance *in;
+
+
+ in = find_input(hs, name);
+ if (in == NULL) {
+ return 404;
+ }
+
+ if (in->chunk_trace_ctxt != NULL) {
+ flb_chunk_trace_context_destroy(in);
+ }
+ return 201;
+}
+
+static flb_sds_t get_input_name(mk_request_t *request)
+{
+ const char *base = "/api/v1/trace/";
+
+
+ if (request->real_path.data == NULL) {
+ return NULL;
+ }
+ if (request->real_path.len < strlen(base)) {
+ return NULL;
+ }
+
+ return flb_sds_create_len(&request->real_path.data[strlen(base)],
+ request->real_path.len - strlen(base));
+}
+
+static int http_disable_trace(mk_request_t *request, void *data, const char *input_name, msgpack_packer *mp_pck)
+{
+ struct flb_hs *hs = data;
+ int toggled_on = 503;
+
+
+ toggled_on = disable_trace_input(hs, input_name);
+ if (toggled_on < 300) {
+ msgpack_pack_map(mp_pck, 1);
+ msgpack_pack_str_with_body(mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok"));
+ return 201;
+ }
+
+ return toggled_on;
+}
+
+static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *result, const char *input_name)
+{
+ int ret = -1;
+ int i;
+ int x;
+ flb_sds_t prefix = NULL;
+ flb_sds_t output_name = NULL;
+ int toggled_on = -1;
+ msgpack_object *key;
+ msgpack_object *val;
+ struct mk_list *props = NULL;
+ msgpack_object_kv *param;
+ msgpack_object_str *param_key;
+ msgpack_object_str *param_val;
+
+
+ if (result->data.type == MSGPACK_OBJECT_MAP) {
+ for (i = 0; i < result->data.via.map.size; i++) {
+ key = &result->data.via.map.ptr[i].key;
+ val = &result->data.via.map.ptr[i].val;
+
+ if (key->type != MSGPACK_OBJECT_STR) {
+ ret = -1;
+ goto parse_error;
+ }
+
+ if (strncmp(key->via.str.ptr, "prefix", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_STR) {
+ ret = -1;
+ goto parse_error;
+ }
+ if (prefix != NULL) {
+ flb_sds_destroy(prefix);
+ }
+ prefix = flb_sds_create_len(val->via.str.ptr, val->via.str.size);
+ }
+ else if (strncmp(key->via.str.ptr, "output", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_STR) {
+ ret = -1;
+ goto parse_error;
+ }
+ if (output_name != NULL) {
+ flb_sds_destroy(output_name);
+ }
+ output_name = flb_sds_create_len(val->via.str.ptr, val->via.str.size);
+ }
+ else if (strncmp(key->via.str.ptr, "params", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_MAP) {
+ ret = -1;
+ goto parse_error;
+ }
+ if (props != NULL) {
+ flb_free(props);
+ }
+ props = flb_calloc(1, sizeof(struct mk_list));
+ flb_kv_init(props);
+ for (x = 0; x < val->via.map.size; x++) {
+ param = &val->via.map.ptr[x];
+ if (param->val.type != MSGPACK_OBJECT_STR) {
+ ret = -1;
+ goto parse_error;
+ }
+ if (param->key.type != MSGPACK_OBJECT_STR) {
+ ret = -1;
+ goto parse_error;
+ }
+ param_key = &param->key.via.str;
+ param_val = &param->val.via.str;
+ flb_kv_item_create_len(props,
+ (char *)param_key->ptr, param_key->size,
+ (char *)param_val->ptr, param_val->size);
+ }
+ }
+ }
+
+ if (output_name == NULL) {
+ output_name = flb_sds_create("stdout");
+ }
+
+ toggled_on = enable_trace_input(hs, input_name, prefix, output_name, props);
+ if (!toggled_on) {
+ ret = -1;
+ goto parse_error;
+ }
+ }
+
+parse_error:
+ if (prefix) flb_sds_destroy(prefix);
+ if (output_name) flb_sds_destroy(output_name);
+ if (props != NULL) {
+ flb_kv_release(props);
+ flb_free(props);
+ }
+ return ret;
+}
+
+static int http_enable_trace(mk_request_t *request, void *data, const char *input_name, msgpack_packer *mp_pck)
+{
+ char *buf = NULL;
+ size_t buf_size;
+ msgpack_unpacked result;
+ int ret = -1;
+ int rc = -1;
+ int i;
+ int x;
+ size_t off = 0;
+ int root_type = MSGPACK_OBJECT_ARRAY;
+ struct flb_hs *hs = data;
+ flb_sds_t prefix = NULL;
+ flb_sds_t output_name = NULL;
+ msgpack_object *key;
+ msgpack_object *val;
+ struct mk_list *props = NULL;
+ struct flb_chunk_trace_limit limit = { 0 };
+ struct flb_input_instance *input_instance;
+
+
+ if (request->method == MK_METHOD_GET) {
+ ret = enable_trace_input(hs, input_name, "trace.", "stdout", NULL);
+ if (ret == 0) {
+ msgpack_pack_map(mp_pck, 1);
+ msgpack_pack_str_with_body(mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok"));
+ return 200;
+ }
+ else {
+ flb_error("unable to enable tracing for %s", input_name);
+ goto input_error;
+ }
+ }
+
+ msgpack_unpacked_init(&result);
+ rc = flb_pack_json(request->data.data, request->data.len, &buf, &buf_size,
+ &root_type, NULL);
+ if (rc == -1) {
+ ret = 503;
+ flb_error("unable to parse json parameters");
+ goto unpack_error;
+ }
+
+ rc = msgpack_unpack_next(&result, buf, buf_size, &off);
+ if (rc != MSGPACK_UNPACK_SUCCESS) {
+ ret = 503;
+ flb_error("unable to unpack msgpack parameters for %s", input_name);
+ goto unpack_error;
+ }
+
+ if (result.data.type == MSGPACK_OBJECT_MAP) {
+ for (i = 0; i < result.data.via.map.size; i++) {
+ key = &result.data.via.map.ptr[i].key;
+ val = &result.data.via.map.ptr[i].val;
+
+ if (key->type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("non string key in parameters");
+ goto parse_error;
+ }
+
+ if (strncmp(key->via.str.ptr, "prefix", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("prefix is not a string");
+ goto parse_error;
+ }
+ if (prefix != NULL) {
+ flb_sds_destroy(prefix);
+ }
+ prefix = flb_sds_create_len(val->via.str.ptr, val->via.str.size);
+ }
+ else if (strncmp(key->via.str.ptr, "output", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("output is not a string");
+ goto parse_error;
+ }
+ if (output_name != NULL) {
+ flb_sds_destroy(output_name);
+ }
+ output_name = flb_sds_create_len(val->via.str.ptr, val->via.str.size);
+ }
+ else if (strncmp(key->via.str.ptr, "params", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_MAP) {
+ ret = 503;
+ flb_error("output params is not a maps");
+ goto parse_error;
+ }
+ props = flb_calloc(1, sizeof(struct mk_list));
+ flb_kv_init(props);
+ for (x = 0; x < val->via.map.size; x++) {
+ if (val->via.map.ptr[x].val.type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("output parameter key is not a string");
+ goto parse_error;
+ }
+ if (val->via.map.ptr[x].key.type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("output parameter value is not a string");
+ goto parse_error;
+ }
+ flb_kv_item_create_len(props,
+ (char *)val->via.map.ptr[x].key.via.str.ptr, val->via.map.ptr[x].key.via.str.size,
+ (char *)val->via.map.ptr[x].val.via.str.ptr, val->via.map.ptr[x].val.via.str.size);
+ }
+ }
+ else if (strncmp(key->via.str.ptr, "limit", key->via.str.size) == 0) {
+ if (val->type != MSGPACK_OBJECT_MAP) {
+ ret = 503;
+ flb_error("limit must be a map of limit types");
+ goto parse_error;
+ }
+ if (val->via.map.size != 1) {
+ ret = 503;
+ flb_error("limit must have a single limit type");
+ goto parse_error;
+ }
+ if (val->via.map.ptr[0].key.type != MSGPACK_OBJECT_STR) {
+ ret = 503;
+ flb_error("limit type (key) must be a string");
+ goto parse_error;
+ }
+ if (val->via.map.ptr[0].val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ ret = 503;
+ flb_error("limit type must be an integer");
+ goto parse_error;
+ }
+ if (strncmp(val->via.map.ptr[0].key.via.str.ptr, "seconds", val->via.map.ptr[0].key.via.str.size) == 0) {
+ limit.type = FLB_CHUNK_TRACE_LIMIT_TIME;
+ limit.seconds = val->via.map.ptr[0].val.via.u64;
+ }
+ else if (strncmp(val->via.map.ptr[0].key.via.str.ptr, "count", val->via.map.ptr[0].key.via.str.size) == 0) {
+ limit.type = FLB_CHUNK_TRACE_LIMIT_COUNT;
+ limit.count = val->via.map.ptr[0].val.via.u64;
+ }
+ else {
+ ret = 503;
+ flb_error("unknown limit type");
+ goto parse_error;
+ }
+ }
+ }
+
+ if (output_name == NULL) {
+ output_name = flb_sds_create("stdout");
+ }
+
+ ret = enable_trace_input(hs, input_name, prefix, output_name, props);
+ if (ret != 0) {
+ flb_error("error when enabling tracing");
+ goto parse_error;
+ }
+
+ if (limit.type != 0) {
+ input_instance = find_input(hs, input_name);
+ if (limit.type == FLB_CHUNK_TRACE_LIMIT_TIME) {
+ flb_chunk_trace_context_set_limit(input_instance->chunk_trace_ctxt, limit.type, limit.seconds);
+ }
+ else if (limit.type == FLB_CHUNK_TRACE_LIMIT_COUNT) {
+ flb_chunk_trace_context_set_limit(input_instance->chunk_trace_ctxt, limit.type, limit.count);
+ }
+ }
+ }
+
+ msgpack_pack_map(mp_pck, 1);
+ msgpack_pack_str_with_body(mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok"));
+
+ ret = 200;
+parse_error:
+ if (prefix) flb_sds_destroy(prefix);
+ if (output_name) flb_sds_destroy(output_name);
+ if (props != NULL) {
+ flb_kv_release(props);
+ flb_free(props);
+ }
+unpack_error:
+ msgpack_unpacked_destroy(&result);
+ if (buf != NULL) {
+ flb_free(buf);
+ }
+input_error:
+ return ret;
+}
+
+static void cb_trace(mk_request_t *request, void *data)
+{
+ flb_sds_t out_buf;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ int response = 404;
+ flb_sds_t input_name = NULL;
+
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ input_name = get_input_name(request);
+ if (input_name == NULL) {
+ response = 404;
+ goto error;
+ }
+
+ if (request->method == MK_METHOD_POST || request->method == MK_METHOD_GET) {
+ response = http_enable_trace(request, data, input_name, &mp_pck);
+ }
+ else if (request->method == MK_METHOD_DELETE) {
+ response = http_disable_trace(request, data, input_name, &mp_pck);
+ }
+error:
+ if (response == 404) {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "not found", strlen("not found"));
+ }
+ else if (response == 503) {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "error", strlen("error"));
+ }
+
+ if (input_name != NULL) {
+ flb_sds_destroy(input_name);
+ }
+
+ /* Export to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ if (out_buf == NULL) {
+ mk_http_status(request, 503);
+ mk_http_done(request);
+ return;
+ }
+
+ mk_http_status(request, response);
+ mk_http_send(request, out_buf, flb_sds_len(out_buf), NULL);
+ mk_http_done(request);
+
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ flb_sds_destroy(out_buf);
+}
+
+static void cb_traces(mk_request_t *request, void *data)
+{
+ flb_sds_t out_buf;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ int ret;
+ char *buf = NULL;
+ size_t buf_size;
+ int root_type = MSGPACK_OBJECT_ARRAY;
+ msgpack_unpacked result;
+ flb_sds_t error_msg = NULL;
+ int response = 200;
+ flb_sds_t input_name;
+ msgpack_object_array *inputs = NULL;
+ size_t off = 0;
+ int i;
+
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_unpacked_init(&result);
+ ret = flb_pack_json(request->data.data, request->data.len, &buf, &buf_size,
+ &root_type, NULL);
+ if (ret == -1) {
+ goto unpack_error;
+ }
+
+ ret = msgpack_unpack_next(&result, buf, buf_size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ ret = -1;
+ error_msg = flb_sds_create("unfinished input");
+ goto unpack_error;
+ }
+
+ if (result.data.type != MSGPACK_OBJECT_MAP) {
+ response = 503;
+ error_msg = flb_sds_create("input is not an object");
+ goto unpack_error;
+ }
+
+ for (i = 0; i < result.data.via.map.size; i++) {
+ if (result.data.via.map.ptr[i].val.type != MSGPACK_OBJECT_ARRAY) {
+ continue;
+ }
+ if (result.data.via.map.ptr[i].key.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+ if (result.data.via.map.ptr[i].key.via.str.size < strlen("inputs")) {
+ continue;
+ }
+ if (strncmp(result.data.via.map.ptr[i].key.via.str.ptr, "inputs", strlen("inputs"))) {
+ continue;
+ }
+ inputs = &result.data.via.map.ptr[i].val.via.array;
+ }
+
+ if (inputs == NULL) {
+ response = 503;
+ error_msg = flb_sds_create("inputs not found");
+ goto unpack_error;
+ }
+
+ msgpack_pack_map(&mp_pck, 2);
+
+ msgpack_pack_str_with_body(&mp_pck, "inputs", strlen("inputs"));
+ msgpack_pack_map(&mp_pck, inputs->size);
+
+ for (i = 0; i < inputs->size; i++) {
+ input_name = flb_sds_create_len(inputs->ptr[i].via.str.ptr, inputs->ptr[i].via.str.size);
+ msgpack_pack_str_with_body(&mp_pck, input_name, flb_sds_len(input_name));
+
+ if (inputs->ptr[i].type != MSGPACK_OBJECT_STR) {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "error", strlen("error"));
+ }
+ else {
+ if (request->method == MK_METHOD_POST || request->method == MK_METHOD_GET) {
+ ret = msgpack_params_enable_trace((struct flb_hs *)data, &result, input_name);
+ if (ret != 0) {
+ msgpack_pack_map(&mp_pck, 2);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "error", strlen("error"));
+ msgpack_pack_str_with_body(&mp_pck, "returncode", strlen("returncode"));
+ msgpack_pack_int64(&mp_pck, ret);
+ }
+ else {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "ok", strlen("ok"));
+ }
+ }
+ else if (request->method == MK_METHOD_DELETE) {
+ disable_trace_input((struct flb_hs *)data, input_name);
+ }
+ else {
+ msgpack_pack_map(&mp_pck, 2);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "error", strlen("error"));
+ msgpack_pack_str_with_body(&mp_pck, "message", strlen("message"));
+ msgpack_pack_str_with_body(&mp_pck, "method not allowed", strlen("method not allowed"));
+ }
+ }
+ }
+
+ msgpack_pack_str_with_body(&mp_pck, "result", strlen("result"));
+unpack_error:
+ if (buf != NULL) {
+ flb_free(buf);
+ }
+ msgpack_unpacked_destroy(&result);
+ if (response == 404) {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "not found", strlen("not found"));
+ }
+ else if (response == 503) {
+ msgpack_pack_map(&mp_pck, 2);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "error", strlen("error"));
+ msgpack_pack_str_with_body(&mp_pck, "message", strlen("message"));
+ if (error_msg) {
+ msgpack_pack_str_with_body(&mp_pck, error_msg, flb_sds_len(error_msg));
+ flb_sds_destroy(error_msg);
+ }
+ else {
+ msgpack_pack_str_with_body(&mp_pck, "unknown error", strlen("unknown error"));
+ }
+ }
+ else {
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "status", strlen("status"));
+ msgpack_pack_str_with_body(&mp_pck, "ok", strlen("ok"));
+ }
+
+ /* Export to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ if (out_buf == NULL) {
+ out_buf = flb_sds_create("serialization error");
+ }
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ mk_http_status(request, response);
+ mk_http_send(request,
+ out_buf, flb_sds_len(out_buf), NULL);
+ mk_http_done(request);
+
+ flb_sds_destroy(out_buf);
+}
+
+/* Perform registration */
+int api_v1_trace(struct flb_hs *hs)
+{
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/traces/", cb_traces, hs);
+ mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/trace/*", cb_trace, hs);
+ return 0;
+}