diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_vivo_exporter/vivo_http.c')
-rw-r--r-- | src/fluent-bit/plugins/out_vivo_exporter/vivo_http.c | 266 |
1 files changed, 266 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_vivo_exporter/vivo_http.c b/src/fluent-bit/plugins/out_vivo_exporter/vivo_http.c new file mode 100644 index 000000000..efd39dcc8 --- /dev/null +++ b/src/fluent-bit/plugins/out_vivo_exporter/vivo_http.c @@ -0,0 +1,266 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_http_server.h> + +#include "vivo.h" +#include "vivo_http.h" +#include "vivo_stream.h" + +#define VIVO_CONTENT_TYPE "Content-Type" +#define VIVO_CONTENT_TYPE_JSON "application/json" +#define VIVO_STREAM_START_ID "Vivo-Stream-Start-ID" +#define VIVO_STREAM_END_ID "Vivo-Stream-End-ID" + +static int stream_get_uri_properties(mk_request_t *request, + int64_t *from, int64_t *to, int64_t *limit) +{ + char *ptr; + flb_sds_t buf; + + *from = -1; + *to = -1; + *limit = -1; + + buf = flb_sds_create_len(request->query_string.data, request->query_string.len); + if (!buf) { + return -1; + } + + ptr = strstr(buf, "from="); + if (ptr) { + *from = atol(ptr + 5); + } + + ptr = strstr(buf, "to="); + if (ptr) { + *to = atol(ptr + 3); + } + + ptr = strstr(buf, "limit="); + if (ptr) { + *limit = atol(ptr + 6); + } + + flb_sds_destroy(buf); + + return 0; +} + +static void headers_set(mk_request_t *request, struct vivo_stream *vs) +{ + struct vivo_exporter *ctx; + + + /* parent context */ + ctx = vs->parent; + + /* content type */ + mk_http_header(request, + VIVO_CONTENT_TYPE, sizeof(VIVO_CONTENT_TYPE) - 1, + VIVO_CONTENT_TYPE_JSON, sizeof(VIVO_CONTENT_TYPE_JSON) - 1); + + /* CORS */ + if (ctx->http_cors_allow_origin) { + mk_http_header(request, + "Access-Control-Allow-Origin", + sizeof("Access-Control-Allow-Origin") - 1, + ctx->http_cors_allow_origin, + flb_sds_len(ctx->http_cors_allow_origin)); + + mk_http_header(request, + "Access-Control-Allow-Headers", + sizeof("Access-Control-Allow-Headers") - 1, + "Origin, X-Requested-With, Content-Type, Accept", + sizeof("Origin, X-Requested-With, Content-Type, Accept") - 1); + + mk_http_header(request, + "Access-Control-Expose-Headers", + sizeof("Access-Control-Expose-Headers") - 1, + "vivo-stream-start-id, vivo-stream-end-id", + sizeof("vivo-stream-start-id, vivo-stream-end-id") - 1); + + } +} + +static void serve_content(mk_request_t *request, struct vivo_stream *vs) +{ + int64_t from = -1; + int64_t to = -1; + int64_t limit = -1; + int64_t stream_start_id = -1; + int64_t stream_end_id = -1; + flb_sds_t payload; + flb_sds_t str_start; + flb_sds_t str_end; + + + if (request->query_string.len > 0) { + stream_get_uri_properties(request, &from, &to, &limit); + } + + payload = vivo_stream_get_content(vs, from, to, limit, + &stream_start_id, &stream_end_id); + if (!payload) { + mk_http_status(request, 500); + return; + } + + if (flb_sds_len(payload) == 0) { + mk_http_status(request, 200); + headers_set(request, vs); + flb_sds_destroy(payload); + return; + } + + mk_http_status(request, 200); + + /* set response headers */ + headers_set(request, vs); + + /* stream ids served: compose buffer and set headers */ + str_start = flb_sds_create_size(32); + flb_sds_printf(&str_start, "%" PRId64, stream_start_id); + + str_end = flb_sds_create_size(32); + flb_sds_printf(&str_end, "%" PRId64, stream_end_id); + + mk_http_header(request, + VIVO_STREAM_START_ID, sizeof(VIVO_STREAM_START_ID) - 1, + str_start, flb_sds_len(str_start)); + + mk_http_header(request, + VIVO_STREAM_END_ID, sizeof(VIVO_STREAM_END_ID) - 1, + str_end, flb_sds_len(str_end)); + + /* send payload */ + mk_http_send(request, payload, flb_sds_len(payload), NULL); + + /* release */ + flb_sds_destroy(payload); + flb_sds_destroy(str_start); + flb_sds_destroy(str_end); +} + +/* HTTP endpoint: /logs */ +static void cb_logs(mk_request_t *request, void *data) +{ + struct vivo_exporter *ctx; + + ctx = (struct vivo_exporter *) data; + + serve_content(request, ctx->stream_logs); + mk_http_done(request); +} + +/* HTTP endpoint: /metrics */ +static void cb_metrics(mk_request_t *request, void *data) +{ + struct vivo_exporter *ctx; + + ctx = (struct vivo_exporter *) data; + + serve_content(request, ctx->stream_metrics); + mk_http_done(request); +} + +static void cb_traces(mk_request_t *request, void *data) +{ + struct vivo_exporter *ctx; + + ctx = (struct vivo_exporter *) data; + + serve_content(request, ctx->stream_traces); + mk_http_done(request); +} + +/* HTTP endpoint: / (root) */ +static void cb_root(mk_request_t *request, void *data) +{ + (void) data; + + mk_http_status(request, 200); + mk_http_send(request, "Fluent Bit Vivo Exporter\n", 24, NULL); + mk_http_done(request); +} + +struct vivo_http *vivo_http_server_create(struct vivo_exporter *ctx, + const char *listen, + int tcp_port, + struct flb_config *config) +{ + int vid; + char tmp[32]; + struct vivo_http *ph; + + ph = flb_malloc(sizeof(struct vivo_http)); + if (!ph) { + flb_errno(); + return NULL; + } + ph->config = config; + + /* HTTP Server context */ + ph->ctx = mk_create(); + if (!ph->ctx) { + flb_free(ph); + return NULL; + } + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) -1, "%s:%d", listen, tcp_port); + mk_config_set(ph->ctx, + "Listen", tmp, + "Workers", "1", + NULL); + + /* Virtual host */ + vid = mk_vhost_create(ph->ctx, NULL); + ph->vid = vid; + + /* Set HTTP URI callbacks */ + mk_vhost_handler(ph->ctx, vid, "/logs", cb_logs, ctx); + mk_vhost_handler(ph->ctx, vid, "/metrics", cb_metrics, ctx); + mk_vhost_handler(ph->ctx, vid, "/traces", cb_traces, ctx); + mk_vhost_handler(ph->ctx, vid, "/", cb_root, NULL); + + return ph; +} + +void vivo_http_server_destroy(struct vivo_http *ph) +{ + if (ph) { + /* TODO: release mk_vhost */ + if (ph->ctx) { + mk_destroy(ph->ctx); + } + flb_free(ph); + } +} + +int vivo_http_server_start(struct vivo_http *ph) +{ + return mk_start(ph->ctx); +} + +int vivo_http_server_stop(struct vivo_http *ph) +{ + return mk_stop(ph->ctx); +} |