diff options
Diffstat (limited to 'fluent-bit/plugins/in_splunk')
-rw-r--r-- | fluent-bit/plugins/in_splunk/CMakeLists.txt | 12 | ||||
-rw-r--r-- | fluent-bit/plugins/in_splunk/splunk.c | 213 | ||||
-rw-r--r-- | fluent-bit/plugins/in_splunk/splunk.h | 60 | ||||
-rw-r--r-- | fluent-bit/plugins/in_splunk/splunk_config.c | 184 | ||||
-rw-r--r-- | fluent-bit/plugins/in_splunk/splunk_config.h | 29 | ||||
-rw-r--r-- | fluent-bit/plugins/in_splunk/splunk_conn.c | 306 | ||||
-rw-r--r-- | fluent-bit/plugins/in_splunk/splunk_conn.h | 54 | ||||
-rw-r--r-- | fluent-bit/plugins/in_splunk/splunk_prot.c | 779 | ||||
-rw-r--r-- | fluent-bit/plugins/in_splunk/splunk_prot.h | 36 |
9 files changed, 1673 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_splunk/CMakeLists.txt b/fluent-bit/plugins/in_splunk/CMakeLists.txt new file mode 100644 index 00000000..42ecf2e3 --- /dev/null +++ b/fluent-bit/plugins/in_splunk/CMakeLists.txt @@ -0,0 +1,12 @@ +if(NOT FLB_METRICS) + message(FATAL_ERROR "Splunk input plugin requires FLB_HTTP_SERVER=On.") +endif() + +set(src + splunk.c + splunk_conn.c + splunk_prot.c + splunk_config.c + ) + +FLB_PLUGIN(in_splunk "${src}" "monkey-core-static") diff --git a/fluent-bit/plugins/in_splunk/splunk.c b/fluent-bit/plugins/in_splunk/splunk.c new file mode 100644 index 00000000..78589037 --- /dev/null +++ b/fluent-bit/plugins/in_splunk/splunk.c @@ -0,0 +1,213 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_network.h> +#include <fluent-bit/flb_config.h> + +#include "splunk.h" +#include "splunk_conn.h" +#include "splunk_config.h" + +/* + * For a server event, the collection event means a new client have arrived, we + * accept the connection and create a new TCP instance which will wait for + * JSON map messages. + */ +static int in_splunk_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + struct flb_connection *connection; + struct splunk_conn *conn; + struct flb_splunk *ctx; + + ctx = in_context; + + connection = flb_downstream_conn_get(ctx->downstream); + + if (connection == NULL) { + flb_plg_error(ctx->ins, "could not accept new connection"); + + return -1; + } + + flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", + connection->fd); + + conn = splunk_conn_add(connection, ctx); + + if (conn == NULL) { + flb_downstream_conn_release(connection); + + return -1; + } + + return 0; +} + +static int in_splunk_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + unsigned short int port; + int ret; + struct flb_splunk *ctx; + + (void) data; + + /* Create context and basic conf */ + ctx = splunk_config_create(ins); + if (!ctx) { + return -1; + } + + ctx->collector_id = -1; + + /* Populate context with config map defaults and incoming properties */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "configuration error"); + splunk_config_destroy(ctx); + return -1; + } + + /* Set the context */ + flb_input_set_context(ins, ctx); + + port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10); + + ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP, + ins->flags, + ctx->listen, + port, + ins->tls, + config, + &ins->net_setup); + + if (ctx->downstream == NULL) { + flb_plg_error(ctx->ins, + "could not initialize downstream on %s:%s. Aborting", + ctx->listen, ctx->tcp_port); + + splunk_config_destroy(ctx); + + return -1; + } + + flb_input_downstream_set(ctx->downstream, ctx->ins); + + /* Collect upon data available on the standard input */ + ret = flb_input_set_collector_socket(ins, + in_splunk_collect, + ctx->downstream->server_fd, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, "Could not set collector for IN_TCP input plugin"); + splunk_config_destroy(ctx); + + return -1; + } + + ctx->collector_id = ret; + + return 0; +} + +static int in_splunk_exit(void *data, struct flb_config *config) +{ + struct flb_splunk *ctx; + + (void) config; + + ctx = data; + + if (ctx != NULL) { + splunk_config_destroy(ctx); + } + + return 0; +} + + +static void in_splunk_pause(void *data, struct flb_config *config) +{ + struct flb_splunk *ctx = data; + + flb_input_collector_pause(ctx->collector_id, ctx->ins); + +} + +static void in_splunk_resume(void *data, struct flb_config *config) +{ + struct flb_splunk *ctx = data; + + flb_input_collector_resume(ctx->collector_id, ctx->ins); +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE, + 0, FLB_TRUE, offsetof(struct flb_splunk, buffer_max_size), + "" + }, + + { + FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE, + 0, FLB_TRUE, offsetof(struct flb_splunk, buffer_chunk_size), + "" + }, + + { + FLB_CONFIG_MAP_SLIST_1, "success_header", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_splunk, success_headers), + "Add an HTTP header key/value pair on success. Multiple headers can be set" + }, + + { + FLB_CONFIG_MAP_STR, "splunk_token", NULL, + 0, FLB_FALSE, 0, + "Set valid Splunk HEC tokens for the requests" + }, + + { + FLB_CONFIG_MAP_STR, "tag_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_splunk, tag_key), + "" + }, + + + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_input_plugin in_splunk_plugin = { + .name = "splunk", + .description = "Input plugin for Splunk HEC payloads", + .cb_init = in_splunk_init, + .cb_pre_run = NULL, + .cb_collect = in_splunk_collect, + .cb_flush_buf = NULL, + .cb_pause = in_splunk_pause, + .cb_resume = in_splunk_resume, + .cb_exit = in_splunk_exit, + .config_map = config_map, + .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS +}; diff --git a/fluent-bit/plugins/in_splunk/splunk.h b/fluent-bit/plugins/in_splunk/splunk.h new file mode 100644 index 00000000..bf935ea2 --- /dev/null +++ b/fluent-bit/plugins/in_splunk/splunk.h @@ -0,0 +1,60 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_SPLUNK_H +#define FLB_IN_SPLUNK_H + +#include <fluent-bit/flb_downstream.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_log_event_encoder.h> + +#include <monkey/monkey.h> + +#define HTTP_BUFFER_MAX_SIZE "4M" +#define HTTP_BUFFER_CHUNK_SIZE "512K" + +struct flb_splunk { + flb_sds_t listen; + flb_sds_t tcp_port; + const char *tag_key; + + int collector_id; + + /* Success HTTP headers */ + struct mk_list *success_headers; + flb_sds_t success_headers_str; + + size_t buffer_max_size; /* Maximum buffer size */ + size_t buffer_chunk_size; /* Chunk allocation size */ + + /* Token Auth */ + flb_sds_t auth_header; + + struct flb_log_event_encoder log_encoder; + struct flb_downstream *downstream; /* Client manager */ + struct mk_list connections; /* linked list of connections */ + + struct mk_server *server; + struct flb_input_instance *ins; +}; + + +#endif diff --git a/fluent-bit/plugins/in_splunk/splunk_config.c b/fluent-bit/plugins/in_splunk/splunk_config.c new file mode 100644 index 00000000..f6107015 --- /dev/null +++ b/fluent-bit/plugins/in_splunk/splunk_config.c @@ -0,0 +1,184 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input_plugin.h> + +#include "splunk.h" +#include "splunk_config.h" +#include "splunk_conn.h" +#include "splunk_config.h" + +struct flb_splunk *splunk_config_create(struct flb_input_instance *ins) +{ + struct mk_list *header_iterator; + struct flb_slist_entry *header_value; + struct flb_slist_entry *header_name; + struct flb_config_map_val *header_pair; + char port[8]; + int ret; + struct flb_splunk *ctx; + const char *tmp; + + ctx = flb_calloc(1, sizeof(struct flb_splunk)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + mk_list_init(&ctx->connections); + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + ctx->auth_header = NULL; + tmp = flb_input_get_property("splunk_token", ins); + if (tmp) { + ctx->auth_header = flb_sds_create("Splunk "); + if (ctx->auth_header == NULL) { + flb_plg_error(ctx->ins, "error on prefix of auth_header generation"); + splunk_config_destroy(ctx); + return NULL; + } + ret = flb_sds_cat_safe(&ctx->auth_header, tmp, strlen(tmp)); + if (ret < 0) { + flb_plg_error(ctx->ins, "error on token generation"); + splunk_config_destroy(ctx); + return NULL; + } + } + + /* Listen interface (if not set, defaults to 0.0.0.0:8088) */ + flb_input_net_default_listener("0.0.0.0", 8088, ins); + + ctx->listen = flb_strdup(ins->host.listen); + snprintf(port, sizeof(port) - 1, "%d", ins->host.port); + ctx->tcp_port = flb_strdup(port); + + /* HTTP Server specifics */ + ctx->server = flb_calloc(1, sizeof(struct mk_server)); + if (ctx->server == NULL) { + flb_plg_error(ctx->ins, "error on mk_server allocation"); + splunk_config_destroy(ctx); + return NULL; + } + ctx->server->keep_alive = MK_TRUE; + + /* monkey detects server->workers == 0 as the server not being initialized at the + * moment so we want to make sure that it stays that way! + */ + + ret = flb_log_event_encoder_init(&ctx->log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, "error initializing event encoder : %d", ret); + + splunk_config_destroy(ctx); + + return NULL; + } + + ctx->success_headers_str = flb_sds_create_size(1); + + if (ctx->success_headers_str == NULL) { + splunk_config_destroy(ctx); + + return NULL; + } + + flb_config_map_foreach(header_iterator, header_pair, ctx->success_headers) { + header_name = mk_list_entry_first(header_pair->val.list, + struct flb_slist_entry, + _head); + + header_value = mk_list_entry_last(header_pair->val.list, + struct flb_slist_entry, + _head); + + ret = flb_sds_cat_safe(&ctx->success_headers_str, + header_name->str, + flb_sds_len(header_name->str)); + + if (ret == 0) { + ret = flb_sds_cat_safe(&ctx->success_headers_str, + ": ", + 2); + } + + if (ret == 0) { + ret = flb_sds_cat_safe(&ctx->success_headers_str, + header_value->str, + flb_sds_len(header_value->str)); + } + + if (ret == 0) { + ret = flb_sds_cat_safe(&ctx->success_headers_str, + "\r\n", + 2); + } + + if (ret != 0) { + splunk_config_destroy(ctx); + + return NULL; + } + } + + return ctx; +} + +int splunk_config_destroy(struct flb_splunk *ctx) +{ + /* release all connections */ + splunk_conn_release_all(ctx); + + flb_log_event_encoder_destroy(&ctx->log_encoder); + + if (ctx->collector_id != -1) { + flb_input_collector_delete(ctx->collector_id, ctx->ins); + + ctx->collector_id = -1; + } + + if (ctx->auth_header != NULL) { + flb_sds_destroy(ctx->auth_header); + } + + if (ctx->downstream != NULL) { + flb_downstream_destroy(ctx->downstream); + } + + if (ctx->server) { + flb_free(ctx->server); + } + + if (ctx->success_headers_str != NULL) { + flb_sds_destroy(ctx->success_headers_str); + } + + + flb_free(ctx->listen); + flb_free(ctx->tcp_port); + flb_free(ctx); + return 0; +} diff --git a/fluent-bit/plugins/in_splunk/splunk_config.h b/fluent-bit/plugins/in_splunk/splunk_config.h new file mode 100644 index 00000000..24d2008f --- /dev/null +++ b/fluent-bit/plugins/in_splunk/splunk_config.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_SPLUNK_CONFIG_H +#define FLB_IN_SPLUNK_CONFIG_H + +#include <fluent-bit/flb_input_plugin.h> +#include "splunk.h" + +struct flb_splunk *splunk_config_create(struct flb_input_instance *ins); +int splunk_config_destroy(struct flb_splunk *ctx); + +#endif diff --git a/fluent-bit/plugins/in_splunk/splunk_conn.c b/fluent-bit/plugins/in_splunk/splunk_conn.c new file mode 100644 index 00000000..f605a16c --- /dev/null +++ b/fluent-bit/plugins/in_splunk/splunk_conn.c @@ -0,0 +1,306 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for thet specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_engine.h> + +#include "splunk.h" +#include "splunk_conn.h" +#include "splunk_prot.h" + +static void splunk_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request); + +static int splunk_conn_event(void *data) +{ + int status; + size_t size; + ssize_t available; + ssize_t bytes; + char *tmp; + char *request_end; + size_t request_len; + struct flb_connection *connection; + struct splunk_conn *conn; + struct mk_event *event; + struct flb_splunk *ctx; + + connection = (struct flb_connection *) data; + + conn = connection->user_data; + + ctx = conn->ctx; + + event = &connection->event; + + if (event->mask & MK_EVENT_READ) { + available = (conn->buf_size - conn->buf_len) - 1; + if (available < 1) { + if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) { + flb_plg_trace(ctx->ins, + "fd=%i incoming data exceed limit (%zu KB)", + event->fd, (ctx->buffer_max_size / 1024)); + splunk_conn_del(conn); + return -1; + } + + size = conn->buf_size + ctx->buffer_chunk_size; + tmp = flb_realloc(conn->buf_data, size); + if (!tmp) { + flb_errno(); + return -1; + } + flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %zu", + event->fd, conn->buf_size, size); + + conn->buf_data = tmp; + conn->buf_size = size; + available = (conn->buf_size - conn->buf_len) - 1; + } + + /* Read data */ + bytes = flb_io_net_read(connection, + (void *) &conn->buf_data[conn->buf_len], + available); + + if (bytes <= 0) { + flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd); + splunk_conn_del(conn); + return -1; + } + + flb_plg_trace(ctx->ins, "read()=%zi pre_len=%i now_len=%zi", + bytes, conn->buf_len, conn->buf_len + bytes); + conn->buf_len += bytes; + conn->buf_data[conn->buf_len] = '\0'; + + status = mk_http_parser(&conn->request, &conn->session.parser, + conn->buf_data, conn->buf_len, conn->session.server); + + if (status == MK_HTTP_PARSER_OK) { + /* Do more logic parsing and checks for this request */ + splunk_prot_handle(ctx, conn, &conn->session, &conn->request); + + /* Evict the processed request from the connection buffer and reinitialize + * the HTTP parser. + */ + + request_end = NULL; + + if (NULL != conn->request.data.data) { + request_end = &conn->request.data.data[conn->request.data.len]; + } + else { + request_end = strstr(conn->buf_data, "\r\n\r\n"); + + if(NULL != request_end) { + request_end = &request_end[4]; + } + } + + if (NULL != request_end) { + request_len = (size_t)(request_end - conn->buf_data); + + if (0 < (conn->buf_len - request_len)) { + memmove(conn->buf_data, &conn->buf_data[request_len], + conn->buf_len - request_len); + + conn->buf_data[conn->buf_len - request_len] = '\0'; + conn->buf_len -= request_len; + } + else { + memset(conn->buf_data, 0, request_len); + + conn->buf_len = 0; + } + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + splunk_conn_request_init(&conn->session, &conn->request); + } + } + else if (status == MK_HTTP_PARSER_ERROR) { + splunk_prot_handle_error(ctx, conn, &conn->session, &conn->request); + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + splunk_conn_request_init(&conn->session, &conn->request); + } + + /* FIXME: add Protocol handler here */ + return bytes; + } + + if (event->mask & MK_EVENT_CLOSE) { + flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd); + splunk_conn_del(conn); + return -1; + } + + return 0; + +} + +static void splunk_conn_session_init(struct mk_http_session *session, + struct mk_server *server, + int client_fd) +{ + /* Alloc memory for node */ + session->_sched_init = MK_TRUE; + session->pipelined = MK_FALSE; + session->counter_connections = 0; + session->close_now = MK_FALSE; + session->status = MK_REQUEST_STATUS_INCOMPLETE; + session->server = server; + session->socket = client_fd; + + /* creation time in unix time */ + session->init_time = time(NULL); + + session->channel = mk_channel_new(MK_CHANNEL_SOCKET, session->socket); + session->channel->io = session->server->network; + + /* Init session request list */ + mk_list_init(&session->request_list); + + /* Initialize the parser */ + mk_http_parser_init(&session->parser); +} + +static void splunk_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request) +{ + memset(request, 0, sizeof(struct mk_http_request)); + + mk_http_request_init(session, request, session->server); + + request->in_headers.type = MK_STREAM_IOV; + request->in_headers.dynamic = MK_FALSE; + request->in_headers.cb_consumed = NULL; + request->in_headers.cb_finished = NULL; + request->in_headers.stream = &request->stream; + + mk_list_add(&request->in_headers._head, &request->stream.inputs); + + request->session = session; +} + +struct splunk_conn *splunk_conn_add(struct flb_connection *connection, + struct flb_splunk *ctx) +{ + struct splunk_conn *conn; + int ret; + + conn = flb_calloc(1, sizeof(struct splunk_conn)); + if (!conn) { + flb_errno(); + return NULL; + } + + conn->connection = connection; + + /* Set data for the event-loop */ + MK_EVENT_NEW(&connection->event); + + connection->user_data = conn; + connection->event.type = FLB_ENGINE_EV_CUSTOM; + connection->event.handler = splunk_conn_event; + + /* Connection info */ + conn->ctx = ctx; + conn->buf_len = 0; + + conn->buf_data = flb_malloc(ctx->buffer_chunk_size); + if (!conn->buf_data) { + flb_errno(); + + flb_plg_error(ctx->ins, "could not allocate new connection"); + flb_free(conn); + + return NULL; + } + conn->buf_size = ctx->buffer_chunk_size; + + /* Register instance into the event loop */ + ret = mk_event_add(flb_engine_evl_get(), + connection->fd, + FLB_ENGINE_EV_CUSTOM, + MK_EVENT_READ, + &connection->event); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not register new connection"); + + flb_free(conn->buf_data); + flb_free(conn); + + return NULL; + } + + /* Initialize HTTP Session: this is a custom context for Monkey HTTP */ + splunk_conn_session_init(&conn->session, ctx->server, conn->connection->fd); + + /* Initialize HTTP Request: this is the initial request and it will be reinitialized + * automatically after the request is handled so it can be used for the next one. + */ + splunk_conn_request_init(&conn->session, &conn->request); + + /* Link connection node to parent context list */ + mk_list_add(&conn->_head, &ctx->connections); + + return conn; +} + +int splunk_conn_del(struct splunk_conn *conn) +{ + if (conn->session.channel != NULL) { + mk_channel_release(conn->session.channel); + } + + /* The downstream unregisters the file descriptor from the event-loop + * so there's nothing to be done by the plugin + */ + flb_downstream_conn_release(conn->connection); + + mk_list_del(&conn->_head); + + flb_free(conn->buf_data); + flb_free(conn); + + return 0; +} + +void splunk_conn_release_all(struct flb_splunk *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct splunk_conn *conn; + + mk_list_foreach_safe(head, tmp, &ctx->connections) { + conn = mk_list_entry(head, struct splunk_conn, _head); + splunk_conn_del(conn); + } +} diff --git a/fluent-bit/plugins/in_splunk/splunk_conn.h b/fluent-bit/plugins/in_splunk/splunk_conn.h new file mode 100644 index 00000000..f4c955fc --- /dev/null +++ b/fluent-bit/plugins/in_splunk/splunk_conn.h @@ -0,0 +1,54 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_SPLUNK_CONN +#define FLB_IN_SPLUNK_CONN + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_connection.h> + +#include <monkey/mk_http.h> +#include <monkey/mk_http_parser.h> +#include <monkey/mk_utils.h> + +struct splunk_conn { + /* Buffer */ + char *buf_data; /* Buffer data */ + int buf_len; /* Data length */ + int buf_size; /* Buffer size */ + + /* + * Parser context: we only held one parser per connection + * which is re-used everytime we have a new request. + */ + struct mk_http_parser parser; + struct mk_http_request request; + struct mk_http_session session; + struct flb_connection *connection; + + void *ctx; /* Plugin parent context */ + struct mk_list _head; /* link to flb_http->connections */ +}; + +struct splunk_conn *splunk_conn_add(struct flb_connection *connection, struct flb_splunk *ctx); +int splunk_conn_del(struct splunk_conn *conn); +void splunk_conn_release_all(struct flb_splunk *ctx); + + +#endif diff --git a/fluent-bit/plugins/in_splunk/splunk_prot.c b/fluent-bit/plugins/in_splunk/splunk_prot.c new file mode 100644 index 00000000..5b060608 --- /dev/null +++ b/fluent-bit/plugins/in_splunk/splunk_prot.c @@ -0,0 +1,779 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_version.h> +#include <fluent-bit/flb_error.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_gzip.h> + +#include <monkey/monkey.h> +#include <monkey/mk_core.h> + +#include "splunk.h" +#include "splunk_conn.h" +#include "splunk_prot.h" + +#define HTTP_CONTENT_JSON 0 +#define HTTP_CONTENT_TEXT 1 +#define HTTP_CONTENT_UNKNOWN 2 + +static int send_response(struct splunk_conn *conn, int http_status, char *message) +{ + struct flb_splunk *context; + size_t sent; + int len; + flb_sds_t out; + + context = (struct flb_splunk *) conn->ctx; + + out = flb_sds_create_size(256); + if (!out) { + return -1; + } + + if (message) { + len = strlen(message); + } + else { + len = 0; + } + + if (http_status == 201) { + flb_sds_printf(&out, + "HTTP/1.1 201 Created \r\n" + "Server: Fluent Bit v%s\r\n" + "%s" + "Content-Length: 0\r\n\r\n", + FLB_VERSION_STR, + context->success_headers_str); + } + else if (http_status == 200) { + flb_sds_printf(&out, + "HTTP/1.1 200 OK\r\n" + "Server: Fluent Bit v%s\r\n" + "%s" + "Content-Length: 0\r\n\r\n", + FLB_VERSION_STR, + context->success_headers_str); + } + else if (http_status == 204) { + flb_sds_printf(&out, + "HTTP/1.1 204 No Content\r\n" + "Server: Fluent Bit v%s\r\n" + "%s" + "\r\n\r\n", + FLB_VERSION_STR, + context->success_headers_str); + } + else if (http_status == 400) { + flb_sds_printf(&out, + "HTTP/1.1 400 Forbidden\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: %i\r\n\r\n%s", + FLB_VERSION_STR, + len, message); + } + else if (http_status == 401) { + flb_sds_printf(&out, + "HTTP/1.1 401 Unauthorized\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: %i\r\n\r\n%s", + FLB_VERSION_STR, + len, message); + } + /* We should check this operations result */ + flb_io_net_write(conn->connection, + (void *) out, + flb_sds_len(out), + &sent); + + flb_sds_destroy(out); + + return 0; +} + +static int send_json_message_response(struct splunk_conn *conn, int http_status, char *message) +{ + size_t sent; + int len; + flb_sds_t out; + + out = flb_sds_create_size(256); + if (!out) { + return -1; + } + + if (message) { + len = strlen(message); + } + else { + len = 0; + } + + if (http_status == 200) { + flb_sds_printf(&out, + "HTTP/1.1 200 OK\r\n" + "Content-Type: application/json\r\n" + "Content-Length: %i\r\n\r\n%s", + len, message); + } + + /* We should check this operations result */ + flb_io_net_write(conn->connection, + (void *) out, + flb_sds_len(out), + &sent); + + flb_sds_destroy(out); + + return 0; +} + +/* implements functionality to get tag from key in record */ +static flb_sds_t tag_key(struct flb_splunk *ctx, msgpack_object *map) +{ + size_t map_size = map->via.map.size; + msgpack_object_kv *kv; + msgpack_object key; + msgpack_object val; + char *key_str = NULL; + char *val_str = NULL; + size_t key_str_size = 0; + size_t val_str_size = 0; + int j; + int check = FLB_FALSE; + int found = FLB_FALSE; + flb_sds_t tag; + + kv = map->via.map.ptr; + + for(j=0; j < map_size; j++) { + check = FLB_FALSE; + found = FLB_FALSE; + key = (kv+j)->key; + if (key.type == MSGPACK_OBJECT_BIN) { + key_str = (char *) key.via.bin.ptr; + key_str_size = key.via.bin.size; + check = FLB_TRUE; + } + if (key.type == MSGPACK_OBJECT_STR) { + key_str = (char *) key.via.str.ptr; + key_str_size = key.via.str.size; + check = FLB_TRUE; + } + + if (check == FLB_TRUE) { + if (strncmp(ctx->tag_key, key_str, key_str_size) == 0) { + val = (kv+j)->val; + if (val.type == MSGPACK_OBJECT_BIN) { + val_str = (char *) val.via.bin.ptr; + val_str_size = val.via.str.size; + found = FLB_TRUE; + break; + } + if (val.type == MSGPACK_OBJECT_STR) { + val_str = (char *) val.via.str.ptr; + val_str_size = val.via.str.size; + found = FLB_TRUE; + break; + } + } + } + } + + if (found == FLB_TRUE) { + tag = flb_sds_create_len(val_str, val_str_size); + if (!tag) { + flb_errno(); + return NULL; + } + return tag; + } + + + flb_plg_error(ctx->ins, "Could not find tag_key %s in record", ctx->tag_key); + return NULL; +} + +/* + * Process a raw text payload for Splunk HEC requests, uses the delimited character to split records, + * return the number of processed bytes + */ +static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size) +{ + int ret = FLB_EVENT_ENCODER_SUCCESS; + + ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_current_timestamp(&ctx->log_encoder); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_values( + &ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("log"), + FLB_LOG_EVENT_STRING_VALUE(buf, size)); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_rollback_record(&ctx->log_encoder); + return -1; + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + if (tag) { + flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + else { + /* use default plugin Tag (it internal name, e.g: http.0 */ + flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + } + else { + flb_plg_error(ctx->ins, "log event encoding error : %d", ret); + } + + return 0; +} + +static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *record, + flb_sds_t tag, flb_sds_t tag_from_record, + struct flb_time tm) { + int ret; + + ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp( + &ctx->log_encoder, + &tm); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_body_from_msgpack_object( + &ctx->log_encoder, + record); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + if (tag_from_record) { + flb_input_log_append(ctx->ins, + tag_from_record, + flb_sds_len(tag_from_record), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + + flb_sds_destroy(tag_from_record); + } + else if (tag) { + flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + else { + /* use default plugin Tag (it internal name, e.g: http.0 */ + flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + } + else { + flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + } +} + +static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size) +{ + size_t off = 0; + msgpack_unpacked result; + struct flb_time tm; + int i = 0; + msgpack_object *obj; + msgpack_object record; + flb_sds_t tag_from_record = NULL; + + flb_time_get(&tm); + + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { + if (result.data.type == MSGPACK_OBJECT_MAP) { + tag_from_record = NULL; + if (ctx->tag_key) { + tag_from_record = tag_key(ctx, &result.data); + } + + process_flb_log_append(ctx, &result.data, tag, tag_from_record, tm); + + flb_log_event_encoder_reset(&ctx->log_encoder); + } + else if (result.data.type == MSGPACK_OBJECT_ARRAY) { + obj = &result.data; + for (i = 0; i < obj->via.array.size; i++) + { + record = obj->via.array.ptr[i]; + + tag_from_record = NULL; + if (ctx->tag_key) { + tag_from_record = tag_key(ctx, &record); + } + + process_flb_log_append(ctx, &record, tag, tag_from_record, tm); + + /* TODO : Optimize this + * + * This is wasteful, considering that we are emitting a series + * of records we should start and commit each one and then + * emit them all at once after the loop. + */ + + flb_log_event_encoder_reset(&ctx->log_encoder); + } + + break; + } + else { + flb_plg_error(ctx->ins, "skip record from invalid type: %i", + result.data.type); + + msgpack_unpacked_destroy(&result); + + return -1; + } + } + + msgpack_unpacked_destroy(&result); + + return 0; +} + +static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag, + char *payload, size_t size) +{ + int ret; + int out_size; + char *pack; + struct flb_pack_state pack_state; + + /* Initialize packer */ + flb_pack_state_init(&pack_state); + + /* Pack JSON as msgpack */ + ret = flb_pack_json_state(payload, size, + &pack, &out_size, &pack_state); + flb_pack_state_reset(&pack_state); + + /* Handle exceptions */ + if (ret == FLB_ERR_JSON_PART) { + flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping"); + return -1; + } + else if (ret == FLB_ERR_JSON_INVAL) { + flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); + return -1; + } + else if (ret == -1) { + return -1; + } + + /* Process the packaged JSON and return the last byte used */ + process_json_payload_pack(ctx, tag, pack, out_size); + flb_free(pack); + + return 0; +} + +static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request) +{ + struct mk_http_header *auth_header = NULL; + + if (ctx->auth_header == NULL) { + return SPLUNK_AUTH_UNAUTH; + } + + auth_header = mk_http_header_get(MK_HEADER_AUTHORIZATION, request, NULL, 0); + + if (auth_header == NULL) { + return SPLUNK_AUTH_MISSING_CRED; + } + + if (auth_header != NULL && auth_header->val.len > 0) { + if (strncmp(ctx->auth_header, + auth_header->val.data, + strlen(ctx->auth_header)) == 0) { + return SPLUNK_AUTH_SUCCESS; + } + else { + return SPLUNK_AUTH_UNAUTHORIZED; + } + } + else { + return SPLUNK_AUTH_MISSING_CRED; + } + + return SPLUNK_AUTH_SUCCESS; +} + +static int handle_hec_payload(struct flb_splunk *ctx, int content_type, + flb_sds_t tag, char *buf, size_t size) +{ + int ret = -1; + + if (content_type == HTTP_CONTENT_JSON) { + ret = parse_hec_payload_json(ctx, tag, buf, size); + } + else if (content_type == HTTP_CONTENT_TEXT) { + ret = process_raw_payload_pack(ctx, tag, buf, size); + } + else if (content_type == HTTP_CONTENT_UNKNOWN) { + if (buf[0] == '{') { + ret = parse_hec_payload_json(ctx, tag, buf, size); + } + else { + ret = process_raw_payload_pack(ctx, tag, buf, size); + } + } + + return ret; +} + +static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int i = 0; + int ret = 0; + int type = -1; + struct mk_http_header *header; + int extra_size = -1; + struct mk_http_header *headers_extra; + int gzip_compressed = FLB_FALSE; + void *gz_data = NULL; + size_t gz_size = -1; + + header = &session->parser.headers[MK_HEADER_CONTENT_TYPE]; + if (header->key.data == NULL) { + send_response(conn, 400, "error: header 'Content-Type' is not set\n"); + return -1; + } + + if (header->val.len == 16 && + strncasecmp(header->val.data, "application/json", 16) == 0) { + type = HTTP_CONTENT_JSON; + } + else if (header->val.len == 10 && + strncasecmp(header->val.data, "text/plain", 10) == 0) { + type = HTTP_CONTENT_TEXT; + } + else { + /* Not neccesary to specify content-type for Splunk HEC. */ + flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads"); + type = HTTP_CONTENT_UNKNOWN; + } + + if (request->data.len <= 0) { + send_response(conn, 400, "error: no payload found\n"); + return -1; + } + + extra_size = session->parser.headers_extra_count; + if (extra_size > 0) { + for (i = 0; i < extra_size; i++) { + headers_extra = &session->parser.headers_extra[i]; + if (headers_extra->key.len == 16 && + strncasecmp(headers_extra->key.data, "Content-Encoding", 16) == 0) { + if (headers_extra->val.len == 4 && + strncasecmp(headers_extra->val.data, "gzip", 4) == 0) { + flb_plg_debug(ctx->ins, "body is gzipped"); + gzip_compressed = FLB_TRUE; + } + } + } + } + + if (gzip_compressed == FLB_TRUE) { + ret = flb_gzip_uncompress((void *) request->data.data, request->data.len, + &gz_data, &gz_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "gzip uncompress is failed"); + return -1; + } + + ret = handle_hec_payload(ctx, type, tag, gz_data, gz_size); + flb_free(gz_data); + } + else { + ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len); + } + + return 0; +} + +static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int ret = -1; + struct mk_http_header *header; + + header = &session->parser.headers[MK_HEADER_CONTENT_TYPE]; + if (header->key.data == NULL) { + send_response(conn, 400, "error: header 'Content-Type' is not set\n"); + return -1; + } + else if (header->val.len != 10 || + strncasecmp(header->val.data, "text/plain", 10) != 0) { + /* Not neccesary to specify content-type for Splunk HEC. */ + flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads"); + } + + if (request->data.len <= 0) { + send_response(conn, 400, "error: no payload found\n"); + return -1; + } + + /* Always handle as raw type of payloads here */ + ret = process_raw_payload_pack(ctx, tag, request->data.data, request->data.len); + + return ret; +} + +static inline int mk_http_point_header(mk_ptr_t *h, + struct mk_http_parser *parser, int key) +{ + struct mk_http_header *header; + + header = &parser->headers[key]; + if (header->type == key) { + h->data = header->val.data; + h->len = header->val.len; + return 0; + } + else { + h->data = NULL; + h->len = -1; + } + + return -1; +} + +/* + * Handle an incoming request. It perform extra checks over the request, if + * everything is OK, it enqueue the incoming payload. + */ +int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int i; + int ret; + int len; + char *uri; + char *qs; + off_t diff; + flb_sds_t tag; + struct mk_http_header *header; + + if (request->uri.data[0] != '/') { + send_response(conn, 400, "error: invalid request\n"); + return -1; + } + + /* Decode URI */ + uri = mk_utils_url_decode(request->uri); + if (!uri) { + uri = mk_mem_alloc_z(request->uri.len + 1); + if (!uri) { + return -1; + } + memcpy(uri, request->uri.data, request->uri.len); + uri[request->uri.len] = '\0'; + } + + /* Try to match a query string so we can remove it */ + qs = strchr(uri, '?'); + if (qs) { + /* remove the query string part */ + diff = qs - uri; + uri[diff] = '\0'; + } + + /* Refer the tag at first*/ + if (ctx->ins->tag && !ctx->ins->tag_default) { + tag = flb_sds_create(ctx->ins->tag); + if (tag == NULL) { + return -1; + } + } + else { + /* Compose the query string using the URI */ + len = strlen(uri); + + if (len == 1) { + tag = NULL; /* use default tag */ + } + else { + /* New tag skipping the URI '/' */ + tag = flb_sds_create_len(&uri[1], len - 1); + if (!tag) { + mk_mem_free(uri); + return -1; + } + + /* Sanitize, only allow alphanum chars */ + for (i = 0; i < flb_sds_len(tag); i++) { + if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') { + tag[i] = '_'; + } + } + } + } + + /* Check if we have a Host header: Hostname ; port */ + mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); + + /* Header: Connection */ + mk_http_point_header(&request->connection, &session->parser, + MK_HEADER_CONNECTION); + + /* HTTP/1.1 needs Host header */ + if (request->host.data == NULL && request->protocol == MK_HTTP_PROTOCOL_11) { + flb_sds_destroy(tag); + mk_mem_free(uri); + + return -1; + } + + /* Should we close the session after this request ? */ + mk_http_keepalive_check(session, request, ctx->server); + + /* Content Length */ + header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH]; + if (header->type == MK_HEADER_CONTENT_LENGTH) { + request->_content_length.data = header->val.data; + request->_content_length.len = header->val.len; + } + else { + request->_content_length.data = NULL; + } + + if (request->method == MK_METHOD_GET) { + /* Handle health minotoring of splunk hec endpoint for load balancers */ + if (strcasecmp(uri, "/services/collector/health") == 0) { + send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":200}"); + } + else { + send_response(conn, 400, "error: invalid HTTP endpoint\n"); + } + + flb_sds_destroy(tag); + mk_mem_free(uri); + + return 0; + } + + /* Under services/collector endpoints are required for + * authentication if provided splunk_token */ + ret = validate_auth_header(ctx, request); + if (ret < 0){ + send_response(conn, 401, "error: unauthroized\n"); + if (ret == SPLUNK_AUTH_MISSING_CRED) { + flb_plg_warn(ctx->ins, "missing credentials in request headers"); + } + else if (ret == SPLUNK_AUTH_UNAUTHORIZED) { + flb_plg_warn(ctx->ins, "wrong credentials in request headers"); + } + + flb_sds_destroy(tag); + mk_mem_free(uri); + + return -1; + } + + /* Handle every ingested payload cleanly */ + flb_log_event_encoder_reset(&ctx->log_encoder); + + if (request->method == MK_METHOD_POST) { + if (strcasecmp(uri, "/services/collector/raw") == 0) { + ret = process_hec_raw_payload(ctx, conn, tag, session, request); + + if (!ret) { + send_json_message_response(conn, 400, "{\"text\":\"Invalid data format\",\"code\":6}"); + } + send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}"); + } + else if (strcasecmp(uri, "/services/collector/event") == 0 || + strcasecmp(uri, "/services/collector") == 0) { + ret = process_hec_payload(ctx, conn, tag, session, request); + + if (!ret) { + send_json_message_response(conn, 400, "{\"text\":\"Invalid data format\",\"code\":6}"); + } + send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}"); + } + else { + send_response(conn, 400, "error: invalid HTTP endpoint\n"); + + flb_sds_destroy(tag); + mk_mem_free(uri); + + return -1; + } + } + else { + /* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/ + + flb_sds_destroy(tag); + mk_mem_free(uri); + + send_response(conn, 400, "error: invalid HTTP method\n"); + return -1; + } + + flb_sds_destroy(tag); + mk_mem_free(uri); + + return ret; +} + +/* + * Handle an incoming request which has resulted in an http parser error. + */ +int splunk_prot_handle_error(struct flb_splunk *ctx, struct splunk_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + send_response(conn, 400, "error: invalid request\n"); + return -1; +} diff --git a/fluent-bit/plugins/in_splunk/splunk_prot.h b/fluent-bit/plugins/in_splunk/splunk_prot.h new file mode 100644 index 00000000..100f12d2 --- /dev/null +++ b/fluent-bit/plugins/in_splunk/splunk_prot.h @@ -0,0 +1,36 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_SPLUNK_PROT +#define FLB_IN_SPLUNK_PROT + +#define SPLUNK_AUTH_UNAUTH 1 +#define SPLUNK_AUTH_SUCCESS 0 +#define SPLUNK_AUTH_MISSING_CRED -1 +#define SPLUNK_AUTH_UNAUTHORIZED -2 + +int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + +int splunk_prot_handle_error(struct flb_splunk *ctx, struct splunk_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + +#endif |