diff options
Diffstat (limited to 'fluent-bit/plugins/out_splunk')
-rw-r--r-- | fluent-bit/plugins/out_splunk/CMakeLists.txt | 6 | ||||
-rw-r--r-- | fluent-bit/plugins/out_splunk/splunk.c | 873 | ||||
-rw-r--r-- | fluent-bit/plugins/out_splunk/splunk.h | 119 | ||||
-rw-r--r-- | fluent-bit/plugins/out_splunk/splunk_conf.c | 313 | ||||
-rw-r--r-- | fluent-bit/plugins/out_splunk/splunk_conf.h | 29 |
5 files changed, 1340 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_splunk/CMakeLists.txt b/fluent-bit/plugins/out_splunk/CMakeLists.txt new file mode 100644 index 00000000..da66bca7 --- /dev/null +++ b/fluent-bit/plugins/out_splunk/CMakeLists.txt @@ -0,0 +1,6 @@ +set(src + splunk_conf.c + splunk.c + ) + +FLB_PLUGIN(out_splunk "${src}" "") diff --git a/fluent-bit/plugins/out_splunk/splunk.c b/fluent-bit/plugins/out_splunk/splunk.c new file mode 100644 index 00000000..d9c28380 --- /dev/null +++ b/fluent-bit/plugins/out_splunk/splunk.c @@ -0,0 +1,873 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_mp.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_gzip.h> +#include <fluent-bit/flb_ra_key.h> +#include <fluent-bit/flb_metrics.h> +#include <fluent-bit/flb_log_event_decoder.h> + +#include <msgpack.h> +#include "splunk.h" +#include "splunk_conf.h" + +static int cb_splunk_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ + struct flb_splunk *ctx; + + ctx = flb_splunk_conf_create(ins, config); + if (!ctx) { + flb_plg_error(ins, "configuration failed"); + return -1; + } + + flb_output_set_context(ins, ctx); + + /* + * This plugin instance uses the HTTP client interface, let's register + * it debugging callbacks. + */ + flb_output_set_http_debug_callbacks(ins); + return 0; +} + +static int pack_map_meta(struct flb_splunk *ctx, + struct flb_mp_map_header *mh, + msgpack_packer *mp_pck, + msgpack_object map, + char *tag, int tag_len) +{ + int index_key_set = FLB_FALSE; + int sourcetype_key_set = FLB_FALSE; + flb_sds_t str; + struct mk_list *head; + struct flb_splunk_field *f; + struct flb_mp_map_header mh_fields; + struct flb_ra_value *rval; + + /* event host */ + if (ctx->event_host) { + str = flb_ra_translate(ctx->ra_event_host, tag, tag_len, + map, NULL); + if (str) { + if (flb_sds_len(str) > 0) { + flb_mp_map_header_append(mh); + msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_EVENT_HOST) -1); + msgpack_pack_str_body(mp_pck, + FLB_SPLUNK_DEFAULT_EVENT_HOST, + sizeof(FLB_SPLUNK_DEFAULT_EVENT_HOST) - 1); + msgpack_pack_str(mp_pck, flb_sds_len(str)); + msgpack_pack_str_body(mp_pck, str, flb_sds_len(str)); + } + flb_sds_destroy(str); + } + } + + /* event source */ + if (ctx->event_source) { + str = flb_ra_translate(ctx->ra_event_source, tag, tag_len, + map, NULL); + if (str) { + if (flb_sds_len(str) > 0) { + flb_mp_map_header_append(mh); + msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_EVENT_SOURCE) -1); + msgpack_pack_str_body(mp_pck, + FLB_SPLUNK_DEFAULT_EVENT_SOURCE, + sizeof(FLB_SPLUNK_DEFAULT_EVENT_SOURCE) - 1); + msgpack_pack_str(mp_pck, flb_sds_len(str)); + msgpack_pack_str_body(mp_pck, str, flb_sds_len(str)); + } + flb_sds_destroy(str); + } + } + + /* event sourcetype (key lookup) */ + if (ctx->event_sourcetype_key) { + str = flb_ra_translate(ctx->ra_event_sourcetype_key, tag, tag_len, + map, NULL); + if (str) { + /* sourcetype_key was found */ + if (flb_sds_len(str) > 0) { + flb_mp_map_header_append(mh); + msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_EVENT_SOURCET) -1); + msgpack_pack_str_body(mp_pck, + FLB_SPLUNK_DEFAULT_EVENT_SOURCET, + sizeof(FLB_SPLUNK_DEFAULT_EVENT_SOURCET) - 1); + msgpack_pack_str(mp_pck, flb_sds_len(str)); + msgpack_pack_str_body(mp_pck, str, flb_sds_len(str)); + sourcetype_key_set = FLB_TRUE; + } + flb_sds_destroy(str); + } + /* If not found, it will fallback to the value set in event_sourcetype */ + } + + if (sourcetype_key_set == FLB_FALSE && ctx->event_sourcetype) { + flb_mp_map_header_append(mh); + msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_EVENT_SOURCET) -1); + msgpack_pack_str_body(mp_pck, + FLB_SPLUNK_DEFAULT_EVENT_SOURCET, + sizeof(FLB_SPLUNK_DEFAULT_EVENT_SOURCET) - 1); + msgpack_pack_str(mp_pck, flb_sds_len(ctx->event_sourcetype)); + msgpack_pack_str_body(mp_pck, + ctx->event_sourcetype, flb_sds_len(ctx->event_sourcetype)); + } + + /* event index (key lookup) */ + if (ctx->event_index_key) { + str = flb_ra_translate(ctx->ra_event_index_key, tag, tag_len, + map, NULL); + if (str) { + /* sourcetype_key was found */ + if (flb_sds_len(str) > 0) { + flb_mp_map_header_append(mh); + msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_EVENT_INDEX) -1); + msgpack_pack_str_body(mp_pck, + FLB_SPLUNK_DEFAULT_EVENT_INDEX, + sizeof(FLB_SPLUNK_DEFAULT_EVENT_INDEX) - 1); + msgpack_pack_str(mp_pck, flb_sds_len(str)); + msgpack_pack_str_body(mp_pck, str, flb_sds_len(str)); + index_key_set = FLB_TRUE; + } + flb_sds_destroy(str); + } + /* If not found, it will fallback to the value set in event_index */ + } + + if (index_key_set == FLB_FALSE && ctx->event_index) { + flb_mp_map_header_append(mh); + msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_EVENT_INDEX) -1); + msgpack_pack_str_body(mp_pck, + FLB_SPLUNK_DEFAULT_EVENT_INDEX, + sizeof(FLB_SPLUNK_DEFAULT_EVENT_INDEX) - 1); + msgpack_pack_str(mp_pck, flb_sds_len(ctx->event_index)); + msgpack_pack_str_body(mp_pck, + ctx->event_index, flb_sds_len(ctx->event_index)); + } + + /* event 'fields' */ + if (mk_list_size(&ctx->fields) > 0) { + flb_mp_map_header_append(mh); + msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_EVENT_FIELDS) -1); + msgpack_pack_str_body(mp_pck, + FLB_SPLUNK_DEFAULT_EVENT_FIELDS, + sizeof(FLB_SPLUNK_DEFAULT_EVENT_FIELDS) - 1); + + /* Pack map */ + flb_mp_map_header_init(&mh_fields, mp_pck); + + mk_list_foreach(head, &ctx->fields) { + f = mk_list_entry(head, struct flb_splunk_field, _head); + rval = flb_ra_get_value_object(f->ra, map); + if (!rval) { + continue; + } + + flb_mp_map_header_append(&mh_fields); + + /* key */ + msgpack_pack_str(mp_pck, flb_sds_len(f->key_name)); + msgpack_pack_str_body(mp_pck, f->key_name, flb_sds_len(f->key_name)); + + /* value */ + msgpack_pack_object(mp_pck, rval->o); + flb_ra_key_value_destroy(rval); + } + flb_mp_map_header_end(&mh_fields); + } + + return 0; +} + +static int pack_map(struct flb_splunk *ctx, msgpack_packer *mp_pck, + struct flb_time *tm, msgpack_object map, + char *tag, int tag_len) +{ + int i; + double t; + int map_size; + msgpack_object k; + msgpack_object v; + struct flb_mp_map_header mh; + + t = flb_time_to_double(tm); + map_size = map.via.map.size; + + if (ctx->splunk_send_raw == FLB_TRUE) { + msgpack_pack_map(mp_pck, map_size /* all k/v */); + } + else { + flb_mp_map_header_init(&mh, mp_pck); + + /* Append the time key */ + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_TIME) -1); + msgpack_pack_str_body(mp_pck, + FLB_SPLUNK_DEFAULT_TIME, + sizeof(FLB_SPLUNK_DEFAULT_TIME) - 1); + msgpack_pack_double(mp_pck, t); + + /* Pack Splunk metadata */ + pack_map_meta(ctx, &mh, mp_pck, map, tag, tag_len); + + /* Add k/v pairs under the key 'event' instead of to the top level object */ + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_EVENT) -1); + msgpack_pack_str_body(mp_pck, + FLB_SPLUNK_DEFAULT_EVENT, + sizeof(FLB_SPLUNK_DEFAULT_EVENT) - 1); + + flb_mp_map_header_end(&mh); + + msgpack_pack_map(mp_pck, map_size); + } + + /* Append k/v */ + for (i = 0; i < map_size; i++) { + k = map.via.map.ptr[i].key; + v = map.via.map.ptr[i].val; + + msgpack_pack_object(mp_pck, k); + msgpack_pack_object(mp_pck, v); + } + + return 0; +} + + +static inline int pack_event_key(struct flb_splunk *ctx, msgpack_packer *mp_pck, + struct flb_time *tm, msgpack_object map, + char *tag, int tag_len) +{ + double t; + struct flb_mp_map_header mh; + flb_sds_t val; + + t = flb_time_to_double(tm); + val = flb_ra_translate(ctx->ra_event_key, tag, tag_len, map, NULL); + if (!val || flb_sds_len(val) == 0) { + if (val != NULL) { + flb_sds_destroy(val); + } + + return -1; + } + + if (ctx->splunk_send_raw == FLB_FALSE) { + flb_mp_map_header_init(&mh, mp_pck); + + /* Append the time key */ + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_TIME) -1); + msgpack_pack_str_body(mp_pck, + FLB_SPLUNK_DEFAULT_TIME, + sizeof(FLB_SPLUNK_DEFAULT_TIME) - 1); + msgpack_pack_double(mp_pck, t); + + /* Pack Splunk metadata */ + pack_map_meta(ctx, &mh, mp_pck, map, tag, tag_len); + + /* Add k/v pairs under the key 'event' instead of to the top level object */ + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_EVENT) -1); + msgpack_pack_str_body(mp_pck, + FLB_SPLUNK_DEFAULT_EVENT, + sizeof(FLB_SPLUNK_DEFAULT_EVENT) - 1); + + flb_mp_map_header_end(&mh); + } + + msgpack_pack_str(mp_pck, flb_sds_len(val)); + msgpack_pack_str_body(mp_pck, val, flb_sds_len(val)); + flb_sds_destroy(val); + + return 0; +} + +#ifdef FLB_HAVE_METRICS +static inline int splunk_metrics_format(struct flb_output_instance *ins, + const void *in_buf, size_t in_bytes, + char **out_buf, size_t *out_size, + struct flb_splunk *ctx) +{ + int ret; + size_t off = 0; + cfl_sds_t text; + cfl_sds_t host; + struct cmt *cmt = NULL; + + if (ctx->event_host != NULL) { + host = ctx->event_host; + } + else { + host = "localhost"; + } + + /* get cmetrics context */ + ret = cmt_decode_msgpack_create(&cmt, (char *) in_buf, in_bytes, &off); + if (ret != 0) { + flb_plg_error(ins, "could not process metrics payload"); + return -1; + } + + /* convert to text representation */ + text = cmt_encode_splunk_hec_create(cmt, host, ctx->event_index, ctx->event_source, ctx->event_sourcetype); + + /* destroy cmt context */ + cmt_destroy(cmt); + + *out_buf = text; + *out_size = flb_sds_len(text); + + return 0; +} +#endif + +static inline int splunk_format(const void *in_buf, size_t in_bytes, + char *tag, int tag_len, + char **out_buf, size_t *out_size, + struct flb_splunk *ctx) +{ + int ret; + msgpack_object map; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + char *err; + flb_sds_t tmp; + flb_sds_t record; + flb_sds_t json_out; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + json_out = flb_sds_create_size(in_bytes * 1.5); + if (!json_out) { + flb_errno(); + return -1; + } + + ret = flb_log_event_decoder_init(&log_decoder, (char *) in_buf, in_bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + flb_sds_destroy(json_out); + + return -1; + } + + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + + /* Create temporary msgpack buffer */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + map = *log_event.body; + + if (ctx->event_key) { + /* Pack the value of a event key */ + ret = pack_event_key(ctx, &mp_pck, &log_event.timestamp, map, tag, tag_len); + if (ret != 0) { + /* + * if pack_event_key fails due to missing content in the + * record, we just warn the user and try to pack it + * as a normal map. + */ + ret = pack_map(ctx, &mp_pck, &log_event.timestamp, map, tag, tag_len); + } + } + else { + /* Pack as a map */ + ret = pack_map(ctx, &mp_pck, &log_event.timestamp, map, tag, tag_len); + } + + /* Validate packaging */ + if (ret != 0) { + /* Format invalid record */ + err = flb_msgpack_to_json_str(2048, &map); + if (err) { + /* Print error and continue processing other records */ + flb_plg_warn(ctx->ins, "could not process or pack record: %s", err); + msgpack_sbuffer_destroy(&mp_sbuf); + flb_free(err); + } + continue; + } + + /* Format as JSON */ + record = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + if (!record) { + flb_errno(); + msgpack_sbuffer_destroy(&mp_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + flb_sds_destroy(json_out); + return -1; + } + + /* On raw mode, append a breakline to every record */ + if (ctx->splunk_send_raw) { + tmp = flb_sds_cat(record, "\n", 1); + if (tmp) { + record = tmp; + } + } + + tmp = flb_sds_cat(json_out, record, flb_sds_len(record)); + flb_sds_destroy(record); + if (tmp) { + json_out = tmp; + } + else { + flb_errno(); + msgpack_sbuffer_destroy(&mp_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + flb_sds_destroy(json_out); + return -1; + } + msgpack_sbuffer_destroy(&mp_sbuf); + } + + *out_buf = json_out; + *out_size = flb_sds_len(json_out); + + flb_log_event_decoder_destroy(&log_decoder); + + return 0; +} + +static void debug_request_response(struct flb_splunk *ctx, + struct flb_http_client *c) +{ + int ret; + int uncompressed = FLB_FALSE; + time_t now; + void *tmp_buf = NULL; + size_t tmp_size; + size_t req_size; + char *req_buf = NULL; + struct tm result; + struct tm *current; + unsigned char *ptr; + flb_sds_t req_headers = NULL; + flb_sds_t req_body = NULL; + + if (c->body_len > 3) { + ptr = (unsigned char *) c->body_buf; + if (ptr[0] == 0x1F && ptr[1] == 0x8B && ptr[2] == 0x08) { + /* uncompress payload */ + ret = flb_gzip_uncompress((void *) c->body_buf, c->body_len, + &tmp_buf, &tmp_size); + if (ret == -1) { + fprintf(stdout, "[out_splunk] could not uncompress data\n"); + } + else { + req_buf = (char *) tmp_buf; + req_size = tmp_size; + uncompressed = FLB_TRUE; + } + } + else { + req_buf = (char *) c->body_buf; + req_size = c->body_len; + } + + /* create a safe buffer */ + if (req_buf) { + req_body = flb_sds_create_len(req_buf, req_size); + } + } + + req_headers = flb_sds_create_len(c->header_buf, c->header_len); + + if (c->resp.data) + now = time(NULL); + current = localtime_r(&now, &result); + + fprintf(stdout, + "[%i/%02i/%02i %02i:%02i:%02i] " + "[out_splunk] debug HTTP 400 (bad request)\n" + ">>> request\n" + "%s%s\n\n" + "<<< response\n" + "%s\n\n", + + current->tm_year + 1900, + current->tm_mon + 1, + current->tm_mday, + current->tm_hour, + current->tm_min, + current->tm_sec, + + req_headers, + req_body, + c->resp.data); + + if (uncompressed) { + flb_free(tmp_buf); + } + + if (req_headers) { + flb_sds_destroy(req_headers); + } + if (req_body) { + flb_sds_destroy(req_body); + } +} + +static void cb_splunk_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + int compressed = FLB_FALSE; + size_t b_sent; + flb_sds_t buf_data; + size_t resp_size; + size_t buf_size; + char *endpoint; + struct flb_splunk *ctx = out_context; + struct flb_connection *u_conn; + struct flb_http_client *c; + void *payload_buf; + size_t payload_size; + (void) i_ins; + (void) config; + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + +#ifdef FLB_HAVE_METRICS + /* Check if the event type is metrics, handle the payload differently */ + if (event_chunk->type == FLB_EVENT_TYPE_METRICS) { + ret = splunk_metrics_format(ctx->ins, + event_chunk->data, + event_chunk->size, + &buf_data, &buf_size, ctx); + } +#endif + if (event_chunk->type == FLB_EVENT_TYPE_LOGS) { + /* Convert binary logs into a JSON payload */ + ret = splunk_format(event_chunk->data, + event_chunk->size, + (char *) event_chunk->tag, + flb_sds_len(event_chunk->tag), + &buf_data, &buf_size, ctx); + } + + if (ret == -1) { + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Map buffer */ + payload_buf = buf_data; + payload_size = buf_size; + + /* Should we compress the payload ? */ + if (ctx->compress_gzip == FLB_TRUE) { + ret = flb_gzip_compress((void *) buf_data, buf_size, + &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, + "cannot gzip payload, disabling compression"); + } + else { + compressed = FLB_TRUE; + + /* JSON buffer is not longer needed */ + flb_sds_destroy(buf_data); + } + } + + /* Splunk URI endpoint */ + if (ctx->splunk_send_raw) { + endpoint = FLB_SPLUNK_DEFAULT_URI_RAW; + } + else { + endpoint = FLB_SPLUNK_DEFAULT_URI_EVENT; + } + + /* Compose HTTP Client request */ + c = flb_http_client(u_conn, FLB_HTTP_POST, endpoint, + payload_buf, payload_size, NULL, 0, NULL, 0); + + /* HTTP Response buffer size, honor value set by the user */ + if (ctx->buffer_size > 0) { + flb_http_buffer_size(c, ctx->buffer_size); + } + else { + /* + * If no value was set, we try to accomodate by using our post + * payload size * 1.5, on that way we make room for large responses + * if something goes wrong, so we don't get a partial response. + */ + resp_size = payload_size * 1.5; + if (resp_size < 4096) { + resp_size = 4096; + } + flb_http_buffer_size(c, resp_size); + } + + /* HTTP Client */ + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + + /* Try to use http_user and http_passwd if not, fallback to auth_header */ + if (ctx->http_user && ctx->http_passwd) { + flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); + } + else if (ctx->auth_header) { + flb_http_add_header(c, "Authorization", 13, + ctx->auth_header, flb_sds_len(ctx->auth_header)); + } + + /* Append Channel identifier header */ + if (ctx->channel) { + flb_http_add_header(c, FLB_SPLUNK_CHANNEL_IDENTIFIER_HEADER, + strlen(FLB_SPLUNK_CHANNEL_IDENTIFIER_HEADER), + ctx->channel, ctx->channel_len); + } + + /* Content Encoding: gzip */ + if (compressed == FLB_TRUE) { + flb_http_set_content_encoding_gzip(c); + } + + /* Map debug callbacks */ + flb_http_client_debug(c, ctx->ins->callback); + + /* Perform HTTP request */ + ret = flb_http_do(c, &b_sent); + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i", ret); + ret = FLB_RETRY; + } + else { + if (c->resp.status != 200) { + if (c->resp.payload_size > 0) { + flb_plg_warn(ctx->ins, "http_status=%i:\n%s", + c->resp.status, c->resp.payload); + } + else { + flb_plg_warn(ctx->ins, "http_status=%i", c->resp.status); + } + /* + * Requests that get 4xx responses from the Splunk HTTP Event + * Collector will 'always' fail, so there is no point in retrying + * them: + * + * https://docs.splunk.com/Documentation/Splunk/8.0.5/Data/TroubleshootHTTPEventCollector#Possible_error_codes + */ + ret = (c->resp.status < 400 || c->resp.status >= 500) ? + FLB_RETRY : FLB_ERROR; + + + if (c->resp.status == 400 && ctx->http_debug_bad_request) { + debug_request_response(ctx, c); + } + } + else { + ret = FLB_OK; + } + } + + /* + * If the payload buffer is different than incoming records in body, means + * we generated a different payload and must be freed. + */ + if (compressed == FLB_TRUE) { + flb_free(payload_buf); + } + else { + flb_sds_destroy(buf_data); + } + + /* Cleanup */ + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(ret); +} + +static int cb_splunk_exit(void *data, struct flb_config *config) +{ + struct flb_splunk *ctx = data; + + flb_splunk_conf_destroy(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "compress", NULL, + 0, FLB_FALSE, 0, + "Set payload compression mechanism. Option available is 'gzip'" + }, + + { + FLB_CONFIG_MAP_STR, "http_user", NULL, + 0, FLB_TRUE, offsetof(struct flb_splunk, http_user), + "Set HTTP auth user" + }, + + { + FLB_CONFIG_MAP_STR, "http_passwd", "", + 0, FLB_TRUE, offsetof(struct flb_splunk, http_passwd), + "Set HTTP auth password" + }, + + { + FLB_CONFIG_MAP_SIZE, "http_buffer_size", NULL, + 0, FLB_FALSE, 0, + "Specify the buffer size used to read the response from the Splunk HTTP " + "service. This option is useful for debugging purposes where is required to read " + "full responses, note that response size grows depending of the number of records " + "inserted. To set an unlimited amount of memory set this value to 'false', " + "otherwise the value must be according to the Unit Size specification" + }, + + { + FLB_CONFIG_MAP_BOOL, "http_debug_bad_request", "false", + 0, FLB_TRUE, offsetof(struct flb_splunk, http_debug_bad_request), + "If the HTTP server response code is 400 (bad request) and this flag is " + "enabled, it will print the full HTTP request and response to the stdout " + "interface. This feature is available for debugging purposes." + }, + + { + FLB_CONFIG_MAP_STR, "event_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_splunk, event_key), + "Specify the key name that will be used to send a single value as part of the record." + }, + + { + FLB_CONFIG_MAP_STR, "event_host", NULL, + 0, FLB_TRUE, offsetof(struct flb_splunk, event_host), + "Set the host value to the event data. The value allows a record accessor " + "pattern." + }, + + { + FLB_CONFIG_MAP_STR, "event_source", NULL, + 0, FLB_TRUE, offsetof(struct flb_splunk, event_source), + "Set the source value to assign to the event data." + }, + + { + FLB_CONFIG_MAP_STR, "event_sourcetype", NULL, + 0, FLB_TRUE, offsetof(struct flb_splunk, event_sourcetype), + "Set the sourcetype value to assign to the event data." + }, + + { + FLB_CONFIG_MAP_STR, "event_sourcetype_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_splunk, event_sourcetype_key), + "Set a record key that will populate 'sourcetype'. If the key is found, it will " + "have precedence over the value set in 'event_sourcetype'." + }, + + { + FLB_CONFIG_MAP_STR, "event_index", NULL, + 0, FLB_TRUE, offsetof(struct flb_splunk, event_index), + "The name of the index by which the event data is to be indexed." + }, + + { + FLB_CONFIG_MAP_STR, "event_index_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_splunk, event_index_key), + "Set a record key that will populate the 'index' field. If the key is found, " + "it will have precedence over the value set in 'event_index'." + }, + + { + FLB_CONFIG_MAP_SLIST_2, "event_field", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_splunk, event_fields), + "Set event fields for the record. This option can be set multiple times and " + "the format is 'key_name record_accessor_pattern'." + }, + + { + FLB_CONFIG_MAP_STR, "splunk_token", NULL, + 0, FLB_FALSE, 0, + "Specify the Authentication Token for the HTTP Event Collector interface." + }, + + { + FLB_CONFIG_MAP_BOOL, "splunk_send_raw", "off", + 0, FLB_TRUE, offsetof(struct flb_splunk, splunk_send_raw), + "When enabled, the record keys and values are set in the top level of the " + "map instead of under the event key. Refer to the Sending Raw Events section " + "from the docs for more details to make this option work properly." + }, + + { + FLB_CONFIG_MAP_STR, "channel", NULL, + 0, FLB_TRUE, offsetof(struct flb_splunk, channel), + "Specify X-Splunk-Request-Channel Header for the HTTP Event Collector interface." + }, + + /* EOF */ + {0} +}; + + +static int cb_splunk_format_test(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *flush_ctx, + int event_type, + const char *tag, int tag_len, + const void *data, size_t bytes, + void **out_data, size_t *out_size) +{ + struct flb_splunk *ctx = plugin_context; + + return splunk_format(data, bytes, (char *) tag, tag_len, + (char**) out_data, out_size,ctx); +} + +struct flb_output_plugin out_splunk_plugin = { + .name = "splunk", + .description = "Send events to Splunk HTTP Event Collector", + .cb_init = cb_splunk_init, + .cb_flush = cb_splunk_flush, + .cb_exit = cb_splunk_exit, + .config_map = config_map, + .workers = 2, +#ifdef FLB_HAVE_METRICS + .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS, +#endif + + /* for testing */ + .test_formatter.callback = cb_splunk_format_test, + /* Plugin flags */ + .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, +}; diff --git a/fluent-bit/plugins/out_splunk/splunk.h b/fluent-bit/plugins/out_splunk/splunk.h new file mode 100644 index 00000000..eef8fa8b --- /dev/null +++ b/fluent-bit/plugins/out_splunk/splunk.h @@ -0,0 +1,119 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_SPLUNK +#define FLB_OUT_SPLUNK + +#define FLB_SPLUNK_DEFAULT_HOST "127.0.0.1" +#define FLB_SPLUNK_DEFAULT_PORT 8088 +#define FLB_SPLUNK_DEFAULT_URI_RAW "/services/collector/raw" +#define FLB_SPLUNK_DEFAULT_URI_EVENT "/services/collector/event" +#define FLB_SPLUNK_DEFAULT_TIME "time" +#define FLB_SPLUNK_DEFAULT_EVENT_HOST "host" +#define FLB_SPLUNK_DEFAULT_EVENT_SOURCE "source" +#define FLB_SPLUNK_DEFAULT_EVENT_SOURCET "sourcetype" +#define FLB_SPLUNK_DEFAULT_EVENT_INDEX "index" +#define FLB_SPLUNK_DEFAULT_EVENT_FIELDS "fields" +#define FLB_SPLUNK_DEFAULT_EVENT "event" +#define FLB_SPLUNK_DEFAULT_HTTP_MAX "2M" + +#define FLB_SPLUNK_CHANNEL_IDENTIFIER_HEADER "X-Splunk-Request-Channel" + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_record_accessor.h> + +struct flb_splunk_field { + flb_sds_t key_name; + struct flb_record_accessor *ra; + struct mk_list _head; +}; + +struct flb_splunk { + /* Payload compression */ + int compress_gzip; + + /* HTTP Auth */ + char *http_user; + char *http_passwd; + + /* Event key */ + flb_sds_t event_key; + struct flb_record_accessor *ra_event_key; + + /* Event host */ + flb_sds_t event_host; + struct flb_record_accessor *ra_event_host; + + /* Event source */ + flb_sds_t event_source; + struct flb_record_accessor *ra_event_source; + + /* + * NOTE: EVENT SOURCE + * ------------------- + * we use two separate variables since we aim to specify a default in case + * a record accessor pattern is given but not found. The event_sourcetype_key + * has precedence over th the 'event_sourcetype' variable. + */ + + /* Event sourcetype */ + flb_sds_t event_sourcetype; + + /* Event sourcetype record key */ + flb_sds_t event_sourcetype_key; + struct flb_record_accessor *ra_event_sourcetype_key; + + /* Event index */ + flb_sds_t event_index; + + /* Event sourcetype record key */ + flb_sds_t event_index_key; + struct flb_record_accessor *ra_event_index_key; + + /* Event fields */ + struct mk_list *event_fields; + + /* Internal/processed event fields */ + struct mk_list fields; + + /* Token Auth */ + flb_sds_t auth_header; + + /* Channel identifier */ + flb_sds_t channel; + size_t channel_len; + + /* Send fields directly or pack data into "event" object */ + int splunk_send_raw; + + /* HTTP Client Setup */ + size_t buffer_size; + + /* HTTP: Debug bad requests (HTTP status 400) to stdout */ + int http_debug_bad_request; + + /* Upstream connection to the backend server */ + struct flb_upstream *u; + + /* Plugin instance */ + struct flb_output_instance *ins; +}; + +#endif diff --git a/fluent-bit/plugins/out_splunk/splunk_conf.c b/fluent-bit/plugins/out_splunk/splunk_conf.c new file mode 100644 index 00000000..cc911cbe --- /dev/null +++ b/fluent-bit/plugins/out_splunk/splunk_conf.c @@ -0,0 +1,313 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_utils.h> + +#include "splunk.h" +#include "splunk_conf.h" + +static int event_fields_create(struct flb_splunk *ctx) +{ + int i = 0; + struct mk_list *head; + struct flb_slist_entry *kname; + struct flb_slist_entry *pattern; + struct flb_config_map_val *mv; + struct flb_splunk_field *f; + + if (!ctx->event_fields) { + return 0; + } + + flb_config_map_foreach(head, mv, ctx->event_fields) { + kname = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); + pattern = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); + + f = flb_malloc(sizeof(struct flb_splunk_field)); + if (!f) { + flb_errno(); + return -1; + } + + f->key_name = flb_sds_create(kname->str); + if (!f->key_name) { + flb_free(f); + return -1; + } + + f->ra = flb_ra_create(pattern->str, FLB_TRUE); + if (!f->ra) { + flb_plg_error(ctx->ins, + "could not process event_field number #%i with " + "pattern '%s'", + i, pattern->str); + flb_sds_destroy(f->key_name); + flb_free(f); + return -1; + } + + mk_list_add(&f->_head, &ctx->fields); + } + + return 0; +} + +static void event_fields_destroy(struct flb_splunk *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_splunk_field *f; + + mk_list_foreach_safe(head, tmp, &ctx->fields) { + f = mk_list_entry(head, struct flb_splunk_field, _head); + flb_sds_destroy(f->key_name); + flb_ra_destroy(f->ra); + mk_list_del(&f->_head); + flb_free(f); + } +} + +struct flb_splunk *flb_splunk_conf_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret; + int io_flags = 0; + size_t size; + flb_sds_t t; + const char *tmp; + struct flb_upstream *upstream; + struct flb_splunk *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_splunk)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + mk_list_init(&ctx->fields); + + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Set default network configuration */ + flb_output_net_default(FLB_SPLUNK_DEFAULT_HOST, FLB_SPLUNK_DEFAULT_PORT, ins); + + /* use TLS ? */ + if (ins->use_tls == FLB_TRUE) { + io_flags = FLB_IO_TLS; + } + else { + io_flags = FLB_IO_TCP; + } + + if (ins->host.ipv6 == FLB_TRUE) { + io_flags |= FLB_IO_IPV6; + } + + /* Prepare an upstream handler */ + upstream = flb_upstream_create(config, + ins->host.name, + ins->host.port, + io_flags, + ins->tls); + if (!upstream) { + flb_plg_error(ctx->ins, "cannot create Upstream context"); + flb_splunk_conf_destroy(ctx); + return NULL; + } + + /* Set manual Index and Type */ + ctx->u = upstream; + + tmp = flb_output_get_property("http_buffer_size", ins); + if (!tmp) { + ctx->buffer_size = 0; + } + else { + size = flb_utils_size_to_bytes(tmp); + if (size == -1) { + flb_plg_error(ctx->ins, "invalid 'buffer_size' value"); + flb_splunk_conf_destroy(ctx); + return NULL; + } + if (size < 4 *1024) { + size = 4 * 1024; + } + ctx->buffer_size = size; + } + + /* Compress (gzip) */ + tmp = flb_output_get_property("compress", ins); + ctx->compress_gzip = FLB_FALSE; + if (tmp) { + if (strcasecmp(tmp, "gzip") == 0) { + ctx->compress_gzip = FLB_TRUE; + } + } + + /* Event key */ + if (ctx->event_key) { + if (ctx->event_key[0] != '$') { + flb_plg_error(ctx->ins, + "invalid event_key pattern, it must start with '$'"); + flb_splunk_conf_destroy(ctx); + return NULL; + } + ctx->ra_event_key = flb_ra_create(ctx->event_key, FLB_TRUE); + if (!ctx->ra_event_key) { + flb_plg_error(ctx->ins, + "cannot create record accessor for event_key pattern: '%s'", + ctx->event_key); + flb_splunk_conf_destroy(ctx); + return NULL; + } + } + + /* Event host */ + if (ctx->event_host) { + ctx->ra_event_host = flb_ra_create(ctx->event_host, FLB_TRUE); + if (!ctx->ra_event_host) { + flb_plg_error(ctx->ins, + "cannot create record accessor for event_key pattern: '%s'", + ctx->event_host); + flb_splunk_conf_destroy(ctx); + return NULL; + } + } + + /* Event source */ + if (ctx->event_source) { + ctx->ra_event_source = flb_ra_create(ctx->event_source, FLB_TRUE); + if (!ctx->ra_event_source) { + flb_plg_error(ctx->ins, + "cannot create record accessor for event_source pattern: '%s'", + ctx->event_host); + flb_splunk_conf_destroy(ctx); + return NULL; + } + } + + /* Event source (key lookup) */ + if (ctx->event_sourcetype_key) { + ctx->ra_event_sourcetype_key = flb_ra_create(ctx->event_sourcetype_key, FLB_TRUE); + if (!ctx->ra_event_sourcetype_key) { + flb_plg_error(ctx->ins, + "cannot create record accessor for " + "event_sourcetype_key pattern: '%s'", + ctx->event_host); + flb_splunk_conf_destroy(ctx); + return NULL; + } + } + + /* Event index (key lookup) */ + if (ctx->event_index_key) { + ctx->ra_event_index_key = flb_ra_create(ctx->event_index_key, FLB_TRUE); + if (!ctx->ra_event_index_key) { + flb_plg_error(ctx->ins, + "cannot create record accessor for " + "event_index_key pattern: '%s'", + ctx->event_host); + flb_splunk_conf_destroy(ctx); + return NULL; + } + } + + /* Event fields */ + ret = event_fields_create(ctx); + if (ret == -1) { + flb_splunk_conf_destroy(ctx); + return NULL; + } + + /* No http_user is set, fallback to splunk_token, if splunk_token is unset, fail. */ + if (!ctx->http_user) { + /* Splunk Auth Token */ + tmp = flb_output_get_property("splunk_token", ins); + if(!tmp) { + flb_plg_error(ctx->ins, "either splunk_token or http_user should be set"); + flb_splunk_conf_destroy(ctx); + return NULL; + } + ctx->auth_header = flb_sds_create("Splunk "); + t = flb_sds_cat(ctx->auth_header, tmp, strlen(tmp)); + if (t) { + ctx->auth_header = t; + } + else { + flb_plg_error(ctx->ins, "error on token generation"); + flb_splunk_conf_destroy(ctx); + return NULL; + } + } + + /* channel */ + if (ctx->channel != NULL) { + ctx->channel_len = flb_sds_len(ctx->channel); + } + + /* Set instance flags into upstream */ + flb_output_upstream_set(ctx->u, ins); + + return ctx; +} + +int flb_splunk_conf_destroy(struct flb_splunk *ctx) +{ + if (!ctx) { + return -1; + } + + if (ctx->auth_header) { + flb_sds_destroy(ctx->auth_header); + } + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + + if (ctx->ra_event_key) { + flb_ra_destroy(ctx->ra_event_key); + } + + if (ctx->ra_event_host) { + flb_ra_destroy(ctx->ra_event_host); + } + + if (ctx->ra_event_source) { + flb_ra_destroy(ctx->ra_event_source); + } + + if (ctx->ra_event_sourcetype_key) { + flb_ra_destroy(ctx->ra_event_sourcetype_key); + } + + if (ctx->ra_event_index_key) { + flb_ra_destroy(ctx->ra_event_index_key); + } + + event_fields_destroy(ctx); + + flb_free(ctx); + + return 0; +} diff --git a/fluent-bit/plugins/out_splunk/splunk_conf.h b/fluent-bit/plugins/out_splunk/splunk_conf.h new file mode 100644 index 00000000..c5114b1f --- /dev/null +++ b/fluent-bit/plugins/out_splunk/splunk_conf.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_SPLUNK_CONF_H +#define FLB_OUT_SPLUNK_CONF_H + +#include "splunk.h" + +struct flb_splunk *flb_splunk_conf_create(struct flb_output_instance *ins, + struct flb_config *config); +int flb_splunk_conf_destroy(struct flb_splunk *ctx); + +#endif |