diff options
Diffstat (limited to 'fluent-bit/plugins/out_kinesis_streams')
-rw-r--r-- | fluent-bit/plugins/out_kinesis_streams/CMakeLists.txt | 5 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kinesis_streams/kinesis.c | 499 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kinesis_streams/kinesis.h | 109 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kinesis_streams/kinesis_api.c | 987 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kinesis_streams/kinesis_api.h | 44 |
5 files changed, 1644 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_kinesis_streams/CMakeLists.txt b/fluent-bit/plugins/out_kinesis_streams/CMakeLists.txt new file mode 100644 index 000000000..d95110ee2 --- /dev/null +++ b/fluent-bit/plugins/out_kinesis_streams/CMakeLists.txt @@ -0,0 +1,5 @@ +set(src + kinesis.c + kinesis_api.c) + +FLB_PLUGIN(out_kinesis_streams "${src}" "") diff --git a/fluent-bit/plugins/out_kinesis_streams/kinesis.c b/fluent-bit/plugins/out_kinesis_streams/kinesis.c new file mode 100644 index 000000000..85dd37948 --- /dev/null +++ b/fluent-bit/plugins/out_kinesis_streams/kinesis.c @@ -0,0 +1,499 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_slist.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_config_map.h> +#include <fluent-bit/flb_output_plugin.h> + +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_utils.h> + +#include <monkey/mk_core.h> +#include <msgpack.h> +#include <string.h> +#include <stdio.h> + +#include "kinesis.h" +#include "kinesis_api.h" + +static struct flb_aws_header content_type_header = { + .key = "Content-Type", + .key_len = 12, + .val = "application/x-amz-json-1.1", + .val_len = 26, +}; + +static int cb_kinesis_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ + const char *tmp; + char *session_name = NULL; + struct flb_kinesis *ctx = NULL; + int ret; + (void) config; + (void) data; + + ctx = flb_calloc(1, sizeof(struct flb_kinesis)); + if (!ctx) { + flb_errno(); + return -1; + } + + ctx->ins = ins; + + /* Populate context with config map defaults and incoming properties */ + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "configuration error"); + goto error; + } + + tmp = flb_output_get_property("stream", ins); + if (tmp) { + ctx->stream_name = tmp; + } else { + flb_plg_error(ctx->ins, "'stream' is a required field"); + goto error; + } + + tmp = flb_output_get_property("time_key", ins); + if (tmp) { + ctx->time_key = tmp; + } + + tmp = flb_output_get_property("time_key_format", ins); + if (tmp) { + ctx->time_key_format = tmp; + } else { + ctx->time_key_format = DEFAULT_TIME_KEY_FORMAT; + } + + tmp = flb_output_get_property("log_key", ins); + if (tmp) { + ctx->log_key = tmp; + } + + if (ctx->log_key && ctx->time_key) { + flb_plg_error(ctx->ins, "'time_key' and 'log_key' can not be used together"); + goto error; + } + + tmp = flb_output_get_property("endpoint", ins); + if (tmp) { + ctx->custom_endpoint = FLB_TRUE; + ctx->endpoint = removeProtocol((char *) tmp, "https://"); + } + else { + ctx->custom_endpoint = FLB_FALSE; + } + + tmp = flb_output_get_property("sts_endpoint", ins); + if (tmp) { + ctx->sts_endpoint = (char *) tmp; + } + + + tmp = flb_output_get_property("log_key", ins); + if (tmp) { + ctx->log_key = tmp; + } + + tmp = flb_output_get_property("region", ins); + if (tmp) { + ctx->region = tmp; + } else { + flb_plg_error(ctx->ins, "'region' is a required field"); + goto error; + } + + tmp = flb_output_get_property("role_arn", ins); + if (tmp) { + ctx->role_arn = tmp; + } + + /* one tls instance for provider, one for cw client */ + ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + + if (!ctx->cred_tls) { + flb_plg_error(ctx->ins, "Failed to create tls context"); + goto error; + } + + ctx->client_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (!ctx->client_tls) { + flb_plg_error(ctx->ins, "Failed to create tls context"); + goto error; + } + + ctx->aws_provider = flb_standard_chain_provider_create(config, + ctx->cred_tls, + (char *) ctx->region, + ctx->sts_endpoint, + NULL, + flb_aws_client_generator(), + ctx->profile); + if (!ctx->aws_provider) { + flb_plg_error(ctx->ins, "Failed to create AWS Credential Provider"); + goto error; + } + + ctx->uuid = flb_sts_session_name(); + if (!ctx->uuid) { + flb_plg_error(ctx->ins, + "Failed to generate plugin instance UUID"); + goto error; + } + + if(ctx->role_arn) { + /* set up sts assume role provider */ + session_name = flb_sts_session_name(); + if (!session_name) { + flb_plg_error(ctx->ins, + "Failed to generate random STS session name"); + goto error; + } + + /* STS provider needs yet another separate TLS instance */ + ctx->sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (!ctx->sts_tls) { + flb_errno(); + goto error; + } + + ctx->base_aws_provider = ctx->aws_provider; + + ctx->aws_provider = flb_sts_provider_create(config, + ctx->sts_tls, + ctx->base_aws_provider, + (char *) ctx->external_id, + (char *) ctx->role_arn, + session_name, + (char *) ctx->region, + ctx->sts_endpoint, + NULL, + flb_aws_client_generator()); + if (!ctx->aws_provider) { + flb_plg_error(ctx->ins, + "Failed to create AWS STS Credential Provider"); + goto error; + } + /* session name can freed after provider is created */ + flb_free(session_name); + session_name = NULL; + } + + /* initialize credentials and set to sync mode */ + ctx->aws_provider->provider_vtable->sync(ctx->aws_provider); + ctx->aws_provider->provider_vtable->init(ctx->aws_provider); + ctx->aws_provider->provider_vtable->upstream_set(ctx->aws_provider, ctx->ins); + + if (ctx->endpoint == NULL) { + ctx->endpoint = flb_aws_endpoint("kinesis", (char *) ctx->region); + if (!ctx->endpoint) { + goto error; + } + } + + struct flb_aws_client_generator *generator = flb_aws_client_generator(); + ctx->kinesis_client = generator->create(); + if (!ctx->kinesis_client) { + goto error; + } + ctx->kinesis_client->name = "kinesis_client"; + ctx->kinesis_client->has_auth = FLB_TRUE; + ctx->kinesis_client->provider = ctx->aws_provider; + ctx->kinesis_client->region = (char *) ctx->region; + ctx->kinesis_client->retry_requests = ctx->retry_requests; + ctx->kinesis_client->service = "kinesis"; + ctx->kinesis_client->port = 443; + ctx->kinesis_client->flags = 0; + ctx->kinesis_client->proxy = NULL; + ctx->kinesis_client->static_headers = &content_type_header; + ctx->kinesis_client->static_headers_len = 1; + + struct flb_upstream *upstream = flb_upstream_create(config, ctx->endpoint, + 443, FLB_IO_TLS, + ctx->client_tls); + if (!upstream) { + flb_plg_error(ctx->ins, "Connection initialization error"); + goto error; + } + + ctx->kinesis_client->upstream = upstream; + flb_output_upstream_set(upstream, ctx->ins); + + ctx->kinesis_client->host = ctx->endpoint; + + /* Export context */ + flb_output_set_context(ins, ctx); + + return 0; + +error: + flb_free(session_name); + flb_plg_error(ctx->ins, "Initialization failed"); + flb_kinesis_ctx_destroy(ctx); + return -1; +} + +static struct flush *new_flush_buffer(const char *tag, int tag_len) +{ + struct flush *buf; + + + buf = flb_calloc(1, sizeof(struct flush)); + if (!buf) { + flb_errno(); + return NULL; + } + + buf->tmp_buf = flb_malloc(sizeof(char) * PUT_RECORDS_PAYLOAD_SIZE); + if (!buf->tmp_buf) { + flb_errno(); + kinesis_flush_destroy(buf); + return NULL; + } + buf->tmp_buf_size = PUT_RECORDS_PAYLOAD_SIZE; + + buf->events = flb_malloc(sizeof(struct kinesis_event) * MAX_EVENTS_PER_PUT); + if (!buf->events) { + flb_errno(); + kinesis_flush_destroy(buf); + return NULL; + } + buf->events_capacity = MAX_EVENTS_PER_PUT; + + buf->tag = tag; + buf->tag_len = tag_len; + + return buf; +} + +static void cb_kinesis_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + struct flb_kinesis *ctx = out_context; + int ret; + struct flush *buf; + (void) i_ins; + (void) config; + + buf = new_flush_buffer(event_chunk->tag, flb_sds_len(event_chunk->tag)); + if (!buf) { + flb_plg_error(ctx->ins, "Failed to construct flush buffer"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + ret = process_and_send_to_kinesis(ctx, buf, + event_chunk->data, + event_chunk->size); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to send records to kinesis"); + kinesis_flush_destroy(buf); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + flb_plg_debug(ctx->ins, "Processed %d records, sent %d to %s", + buf->records_processed, buf->records_sent, ctx->stream_name); + kinesis_flush_destroy(buf); + + FLB_OUTPUT_RETURN(FLB_OK); +} + +void flb_kinesis_ctx_destroy(struct flb_kinesis *ctx) +{ + if (ctx != NULL) { + if (ctx->base_aws_provider) { + flb_aws_provider_destroy(ctx->base_aws_provider); + } + + if (ctx->aws_provider) { + flb_aws_provider_destroy(ctx->aws_provider); + } + + if (ctx->cred_tls) { + flb_tls_destroy(ctx->cred_tls); + } + + if (ctx->sts_tls) { + flb_tls_destroy(ctx->sts_tls); + } + + if (ctx->client_tls) { + flb_tls_destroy(ctx->client_tls); + } + + if (ctx->kinesis_client) { + flb_aws_client_destroy(ctx->kinesis_client); + } + + if (ctx->custom_endpoint == FLB_FALSE) { + flb_free(ctx->endpoint); + } + + if (ctx->uuid) { + flb_free(ctx->uuid); + } + + flb_free(ctx); + } +} + +static int cb_kinesis_exit(void *data, struct flb_config *config) +{ + struct flb_kinesis *ctx = data; + + flb_kinesis_ctx_destroy(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "region", NULL, + 0, FLB_TRUE, offsetof(struct flb_kinesis, region), + "The AWS region of your kinesis stream" + }, + + { + FLB_CONFIG_MAP_STR, "stream", NULL, + 0, FLB_TRUE, offsetof(struct flb_kinesis, stream_name), + "Kinesis stream name" + }, + + { + FLB_CONFIG_MAP_STR, "time_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_kinesis, time_key), + "Add the timestamp to the record under this key. By default the timestamp " + "from Fluent Bit will not be added to records sent to Kinesis." + }, + + { + FLB_CONFIG_MAP_STR, "time_key_format", NULL, + 0, FLB_TRUE, offsetof(struct flb_kinesis, time_key_format), + "strftime compliant format string for the timestamp; for example, " + "the default is '%Y-%m-%dT%H:%M:%S'. This option is used with time_key. " + }, + + { + FLB_CONFIG_MAP_STR, "role_arn", NULL, + 0, FLB_TRUE, offsetof(struct flb_kinesis, role_arn), + "ARN of an IAM role to assume (ex. for cross account access)." + }, + + { + FLB_CONFIG_MAP_STR, "endpoint", NULL, + 0, FLB_FALSE, 0, + "Specify a custom endpoint for the Kinesis API" + }, + + { + FLB_CONFIG_MAP_STR, "sts_endpoint", NULL, + 0, FLB_TRUE, offsetof(struct flb_kinesis, sts_endpoint), + "Custom endpoint for the STS API." + }, + + { + FLB_CONFIG_MAP_STR, "external_id", NULL, + 0, FLB_TRUE, offsetof(struct flb_kinesis, external_id), + "Specify an external ID for the STS API, can be used with the role_arn parameter if your role " + "requires an external ID." + }, + + { + FLB_CONFIG_MAP_STR, "log_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_kinesis, log_key), + "By default, the whole log record will be sent to Kinesis. " + "If you specify a key name with this option, then only the value of " + "that key will be sent to Kinesis. For example, if you are using " + "the Fluentd Docker log driver, you can specify `log_key log` and only " + "the log message will be sent to Kinesis." + }, + + { + FLB_CONFIG_MAP_BOOL, "auto_retry_requests", "true", + 0, FLB_TRUE, offsetof(struct flb_kinesis, retry_requests), + "Immediately retry failed requests to AWS services once. This option " + "does not affect the normal Fluent Bit retry mechanism with backoff. " + "Instead, it enables an immediate retry with no delay for networking " + "errors, which may help improve throughput when there are transient/random " + "networking issues." + }, + + { + FLB_CONFIG_MAP_STR, "profile", NULL, + 0, FLB_TRUE, offsetof(struct flb_kinesis, profile), + "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in " + "$HOME/.aws/ directory." + }, + + /* EOF */ + {0} +}; + +/* Plugin registration */ +struct flb_output_plugin out_kinesis_streams_plugin = { + .name = "kinesis_streams", + .description = "Send logs to Amazon Kinesis Streams", + .cb_init = cb_kinesis_init, + .cb_flush = cb_kinesis_flush, + .cb_exit = cb_kinesis_exit, + .workers = 1, + .flags = 0, + + /* Configuration */ + .config_map = config_map, +}; diff --git a/fluent-bit/plugins/out_kinesis_streams/kinesis.h b/fluent-bit/plugins/out_kinesis_streams/kinesis.h new file mode 100644 index 000000000..75d41e107 --- /dev/null +++ b/fluent-bit/plugins/out_kinesis_streams/kinesis.h @@ -0,0 +1,109 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_KINESIS_H +#define FLB_OUT_KINESIS_H + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/flb_signv4.h> + +#define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S" + +/* buffers used for each flush */ +struct flush { + /* temporary buffer for storing the serialized event messages */ + char *tmp_buf; + size_t tmp_buf_size; + /* current index of tmp_buf */ + size_t tmp_buf_offset; + + /* projected final size of the payload for this flush */ + size_t data_size; + + /* log records- each of these has a pointer to their message in tmp_buf */ + struct kinesis_event *events; + int events_capacity; + /* current event */ + int event_index; + + /* the payload of the API request */ + char *out_buf; + size_t out_buf_size; + + /* buffer used to temporarily hold an event during processing */ + char *event_buf; + size_t event_buf_size; + + int records_sent; + int records_processed; + + const char *tag; + int tag_len; +}; + +struct kinesis_event { + char *json; + size_t len; + struct timespec timestamp; +}; + +struct flb_kinesis { + /* + * TLS instances can not be re-used. So we have one for: + * - Base cred provider (needed for EKS provider) + * - STS Assume role provider + * - The CloudWatch Logs client for this plugin + */ + struct flb_tls *cred_tls; + struct flb_tls *sts_tls; + struct flb_tls *client_tls; + struct flb_aws_provider *aws_provider; + struct flb_aws_provider *base_aws_provider; + struct flb_aws_client *kinesis_client; + + /* configuration options */ + const char *stream_name; + const char *time_key; + const char *time_key_format; + const char *region; + const char *role_arn; + const char *log_key; + const char *external_id; + int retry_requests; + char *sts_endpoint; + int custom_endpoint; + char *profile; + + /* in this plugin the 'random' partition key is a uuid + fluent tag + timestamp */ + char *uuid; + + /* must be freed on shutdown if custom_endpoint is not set */ + char *endpoint; + + /* Plugin output instance reference */ + struct flb_output_instance *ins; +}; + +void flb_kinesis_ctx_destroy(struct flb_kinesis *ctx); + +#endif diff --git a/fluent-bit/plugins/out_kinesis_streams/kinesis_api.c b/fluent-bit/plugins/out_kinesis_streams/kinesis_api.c new file mode 100644 index 000000000..9124657bc --- /dev/null +++ b/fluent-bit/plugins/out_kinesis_streams/kinesis_api.c @@ -0,0 +1,987 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_slist.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_macros.h> +#include <fluent-bit/flb_config_map.h> +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_log_event_decoder.h> + +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_base64.h> + +#include <monkey/mk_core.h> +#include <msgpack.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> + +#ifndef FLB_SYSTEM_WINDOWS +#include <unistd.h> +#endif + +#include "kinesis_api.h" + +#define ERR_CODE_EXCEEDED_THROUGHPUT "ProvisionedThroughputExceededException" + +static struct flb_aws_header put_records_target_header = { + .key = "X-Amz-Target", + .key_len = 12, + .val = "Kinesis_20131202.PutRecords", + .val_len = 27, +}; + +static inline int try_to_write(char *buf, int *off, size_t left, + const char *str, size_t str_len) +{ + if (str_len <= 0){ + str_len = strlen(str); + } + if (left <= *off+str_len) { + return FLB_FALSE; + } + memcpy(buf+*off, str, str_len); + *off += str_len; + return FLB_TRUE; +} + +/* + * Writes the "header" for a put_records payload + */ +static int init_put_payload(struct flb_kinesis *ctx, struct flush *buf, + int *offset) +{ + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "{\"StreamName\":\"", 15)) { + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + ctx->stream_name, 0)) { + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\",\"Records\":[", 13)) { + goto error; + } + return 0; + +error: + return -1; +} + +/* + * Simple and fast hashing algorithm to create random partition keys + */ +static flb_sds_t random_partition_key(const char *tag) +{ + int c; + unsigned long hash = 5381; + unsigned long hash2 = 5381; + flb_sds_t hash_str; + flb_sds_t tmp; + struct flb_time tm; + + /* get current time */ + flb_time_get(&tm); + + /* compose hash */ + while ((c = *tag++)) { + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + } + hash2 = (unsigned long) hash2 * tm.tm.tv_sec * tm.tm.tv_nsec; + + /* flb_sds_printf allocs if the incoming sds is not at least 64 bytes */ + hash_str = flb_sds_create_size(64); + if (!hash_str) { + flb_errno(); + return NULL; + } + tmp = flb_sds_printf(&hash_str, "%lu%lu", hash % 7919, hash2 % 7919); + if (!tmp) { + flb_errno(); + flb_sds_destroy(hash_str); + return NULL; + } + hash_str = tmp; + + return hash_str; +} + +/* + * Writes a log event to the output buffer + */ +static int write_event(struct flb_kinesis *ctx, struct flush *buf, + struct kinesis_event *event, int *offset) +{ + flb_sds_t tag_timestamp = NULL; + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "{\"Data\":\"", 9)) { + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + event->json, event->len)) { + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\",\"PartitionKey\":\"", 18)) { + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + ctx->uuid, 10)) { + goto error; + } + + tag_timestamp = random_partition_key(buf->tag); + if (!tag_timestamp) { + flb_plg_error(ctx->ins, "failed to generate partition key for %s", buf->tag); + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + tag_timestamp, 0)) { + flb_sds_destroy(tag_timestamp); + goto error; + } + flb_sds_destroy(tag_timestamp); + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"}", 2)) { + goto error; + } + + return 0; + +error: + return -1; +} + +/* Terminates a PutRecords payload */ +static int end_put_payload(struct flb_kinesis *ctx, struct flush *buf, + int *offset) +{ + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "]}", 2)) { + return -1; + } + buf->out_buf[*offset] = '\0'; + + return 0; +} + + +/* + * Processes the msgpack object + * -1 = failure, record not added + * 0 = success, record added + * 1 = we ran out of space, send and retry + * 2 = record could not be processed, discard it + * Returns 0 on success, -1 on general errors, + * and 1 if we ran out of space to write the event + * which means a send must occur + */ +static int process_event(struct flb_kinesis *ctx, struct flush *buf, + const msgpack_object *obj, struct flb_time *tms) +{ + size_t written = 0; + int ret; + size_t size; + size_t b64_len; + struct kinesis_event *event; + char *tmp_buf_ptr; + char *time_key_ptr; + struct tm time_stamp; + struct tm *tmp; + size_t len; + size_t tmp_size; + char *out_buf; + + tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; + ret = flb_msgpack_to_json(tmp_buf_ptr, + buf->tmp_buf_size - buf->tmp_buf_offset, + obj); + if (ret <= 0) { + /* + * negative value means failure to write to buffer, + * which means we ran out of space, and must send the logs + * + * TODO: This could also incorrectly be triggered if the record + * is larger than MAX_EVENT_SIZE + */ + return 1; + } + written = (size_t) ret; + + /* Discard empty messages (written == 2 means '""') */ + if (written <= 2) { + flb_plg_debug(ctx->ins, "Found empty log message, %s", ctx->stream_name); + return 2; + } + + if (ctx->log_key) { + /* + * flb_msgpack_to_json will encase the value in quotes + * We don't want that for log_key, so we ignore the first + * and last character + */ + written -= 2; + tmp_buf_ptr++; /* pass over the opening quote */ + buf->tmp_buf_offset++; + } + + /* is (written + 1) because we still have to append newline */ + if ((written + 1) >= MAX_EVENT_SIZE) { + flb_plg_warn(ctx->ins, "[size=%zu] Discarding record which is larger than " + "max size allowed by Kinesis, %s", written + 1, + ctx->stream_name); + return 2; + } + + if (ctx->time_key) { + /* append time_key to end of json string */ + tmp = gmtime_r(&tms->tm.tv_sec, &time_stamp); + if (!tmp) { + flb_plg_error(ctx->ins, "Could not create time stamp for %lu unix " + "seconds, discarding record, %s", tms->tm.tv_sec, + ctx->stream_name); + return 2; + } + + /* format time output and return the length */ + len = flb_aws_strftime_precision(&out_buf, ctx->time_key_format, tms); + + /* how much space do we have left */ + tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written; + if (len > tmp_size) { + /* not enough space - tell caller to retry */ + flb_free(out_buf); + return 1; + } + + if (len == 0) { + /* + * when the length of out_buf is not enough for time_key_format, + * time_key will not be added to record. + */ + flb_plg_error(ctx->ins, "Failed to add time_key %s to record, %s", + ctx->time_key, ctx->stream_name); + flb_free(out_buf); + } + else { + time_key_ptr = tmp_buf_ptr + written - 1; + memcpy(time_key_ptr, ",", 1); + time_key_ptr++; + memcpy(time_key_ptr, "\"", 1); + time_key_ptr++; + memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key)); + time_key_ptr += strlen(ctx->time_key); + memcpy(time_key_ptr, "\":\"", 3); + time_key_ptr += 3; + + /* merge out_buf to time_key_ptr */ + memcpy(time_key_ptr, out_buf, len); + flb_free(out_buf); + time_key_ptr += len; + memcpy(time_key_ptr, "\"}", 2); + time_key_ptr += 2; + written = (time_key_ptr - tmp_buf_ptr); + } + } + + /* is (written + 1) because we still have to append newline */ + if ((written + 1) >= MAX_EVENT_SIZE) { + flb_plg_warn(ctx->ins, "[size=%zu] Discarding record which is larger than " + "max size allowed by Kinesis, %s", written + 1, + ctx->stream_name); + return 2; + } + + /* append newline to record */ + + tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written; + if (tmp_size <= 1) { + /* no space left- tell caller to retry */ + return 1; + } + + memcpy(tmp_buf_ptr + written, "\n", 1); + written++; + + /* + * check if event_buf is initialized and big enough + * Base64 encoding will increase size by ~4/3 + */ + size = (written * 1.5) + 4; + if (buf->event_buf == NULL || buf->event_buf_size < size) { + flb_free(buf->event_buf); + buf->event_buf = flb_malloc(size); + buf->event_buf_size = size; + if (buf->event_buf == NULL) { + flb_errno(); + return -1; + } + } + + tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; + ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, + (unsigned char *) tmp_buf_ptr, written); + if (ret != 0) { + flb_errno(); + return -1; + } + written = b64_len; + + tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; + if ((buf->tmp_buf_size - buf->tmp_buf_offset) < written) { + /* not enough space, send logs */ + return 1; + } + + /* copy serialized json to tmp_buf */ + memcpy(tmp_buf_ptr, buf->event_buf, written); + + buf->tmp_buf_offset += written; + event = &buf->events[buf->event_index]; + event->json = tmp_buf_ptr; + event->len = written; + event->timestamp.tv_sec = tms->tm.tv_sec; + event->timestamp.tv_nsec = tms->tm.tv_nsec; + + return 0; +} + +/* Resets or inits a flush struct */ +static void reset_flush_buf(struct flb_kinesis *ctx, struct flush *buf) { + buf->event_index = 0; + buf->tmp_buf_offset = 0; + buf->data_size = PUT_RECORDS_HEADER_LEN + PUT_RECORDS_FOOTER_LEN; + buf->data_size += strlen(ctx->stream_name); +} + +/* constructs a put payload, and then sends */ +static int send_log_events(struct flb_kinesis *ctx, struct flush *buf) { + int ret; + int offset; + int i; + struct kinesis_event *event; + + if (buf->event_index <= 0) { + /* + * event_index should always be 1 more than the actual last event index + * when this function is called. + * Except in the case where send_log_events() is called at the end of + * process_and_send_to_kinesis. If all records were already sent, event_index + * will be 0. Hence this check. + */ + return 0; + } + + /* alloc out_buf if needed */ + if (buf->out_buf == NULL || buf->out_buf_size < buf->data_size) { + if (buf->out_buf != NULL) { + flb_free(buf->out_buf); + } + buf->out_buf = flb_malloc(buf->data_size + 1); + if (!buf->out_buf) { + flb_errno(); + return -1; + } + buf->out_buf_size = buf->data_size; + } + + offset = 0; + ret = init_put_payload(ctx, buf, &offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize PutRecords payload, %s", + ctx->stream_name); + return -1; + } + + for (i = 0; i < buf->event_index; i++) { + event = &buf->events[i]; + ret = write_event(ctx, buf, event, &offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to write log record %d to " + "payload buffer, %s", i, ctx->stream_name); + return -1; + } + if (i != (buf->event_index -1)) { + if (!try_to_write(buf->out_buf, &offset, buf->out_buf_size, + ",", 1)) { + flb_plg_error(ctx->ins, "Could not terminate record with ','"); + return -1; + } + } + } + + ret = end_put_payload(ctx, buf, &offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not complete PutRecords payload"); + return -1; + } + flb_plg_debug(ctx->ins, "kinesis:PutRecords: events=%d, payload=%d bytes", i, offset); + ret = put_records(ctx, buf, (size_t) offset, i); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to send log records"); + return -1; + } + buf->records_sent += i; + + return 0; +} + +/* + * Processes the msgpack object, sends the current batch if needed + */ +static int add_event(struct flb_kinesis *ctx, struct flush *buf, + const msgpack_object *obj, struct flb_time *tms) +{ + int ret; + struct kinesis_event *event; + int retry_add = FLB_FALSE; + size_t event_bytes = 0; + + if (buf->event_index == 0) { + /* init */ + reset_flush_buf(ctx, buf); + } + +retry_add_event: + retry_add = FLB_FALSE; + ret = process_event(ctx, buf, obj, tms); + if (ret < 0) { + return -1; + } + else if (ret == 1) { + if (buf->event_index <= 0) { + /* somehow the record was larger than our entire request buffer */ + flb_plg_warn(ctx->ins, "Discarding massive log record, %s", + ctx->stream_name); + return 0; /* discard this record and return to caller */ + } + /* send logs and then retry the add */ + retry_add = FLB_TRUE; + goto send; + } else if (ret == 2) { + /* discard this record and return to caller */ + flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s", + ctx->stream_name); + return 0; + } + + event = &buf->events[buf->event_index]; + event_bytes = event->len + PUT_RECORDS_PER_RECORD_LEN; + + if ((buf->data_size + event_bytes) > PUT_RECORDS_PAYLOAD_SIZE) { + if (buf->event_index <= 0) { + /* somehow the record was larger than our entire request buffer */ + flb_plg_warn(ctx->ins, "[size=%zu] Discarding massive log record, %s", + event_bytes, ctx->stream_name); + return 0; /* discard this record and return to caller */ + } + /* do not send this event */ + retry_add = FLB_TRUE; + goto send; + } + + /* send is not needed yet, return to caller */ + buf->data_size += event_bytes; + buf->event_index++; + + if (buf->event_index == MAX_EVENTS_PER_PUT) { + goto send; + } + + return 0; + +send: + ret = send_log_events(ctx, buf); + reset_flush_buf(ctx, buf); + if (ret < 0) { + return -1; + } + + if (retry_add == FLB_TRUE) { + goto retry_add_event; + } + + return 0; +} + +/* + * Main routine- processes msgpack and sends in batches + * return value is the number of events processed (number sent is stored in buf) + */ +int process_and_send_to_kinesis(struct flb_kinesis *ctx, struct flush *buf, + const char *data, size_t bytes) +{ + int i = 0; + size_t map_size; + msgpack_object map; + msgpack_object_kv *kv; + msgpack_object key; + msgpack_object val; + char *key_str = NULL; + size_t key_str_size = 0; + int j; + int ret; + int check = FLB_FALSE; + int found = FLB_FALSE; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + return -1; + } + + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + map = *log_event.body; + map_size = map.via.map.size; + + if (ctx->log_key) { + key_str = NULL; + key_str_size = 0; + check = FLB_FALSE; + found = FLB_FALSE; + + kv = map.via.map.ptr; + + for(j=0; j < map_size; j++) { + key = (kv+j)->key; + if (key.type == MSGPACK_OBJECT_BIN) { + key_str = (char *) key.via.bin.ptr; + key_str_size = key.via.bin.size; + check = FLB_TRUE; + } + if (key.type == MSGPACK_OBJECT_STR) { + key_str = (char *) key.via.str.ptr; + key_str_size = key.via.str.size; + check = FLB_TRUE; + } + + if (check == FLB_TRUE) { + if (strncmp(ctx->log_key, key_str, key_str_size) == 0) { + found = FLB_TRUE; + val = (kv+j)->val; + ret = add_event(ctx, buf, &val, &log_event.timestamp); + if (ret < 0 ) { + goto error; + } + } + } + + } + if (found == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not find log_key '%s' in record, %s", + ctx->log_key, ctx->stream_name); + } + else { + i++; + } + continue; + } + + ret = add_event(ctx, buf, &map, &log_event.timestamp); + if (ret < 0 ) { + goto error; + } + i++; + } + + flb_log_event_decoder_destroy(&log_decoder); + + /* send any remaining events */ + ret = send_log_events(ctx, buf); + reset_flush_buf(ctx, buf); + if (ret < 0) { + return -1; + } + + /* return number of events processed */ + buf->records_processed = i; + return i; + +error: + flb_log_event_decoder_destroy(&log_decoder); + + return -1; +} + +/* + * Returns number of failed records on success, -1 on failure + */ +static int process_api_response(struct flb_kinesis *ctx, + struct flb_http_client *c) +{ + int i; + int k; + int w; + int ret; + int failed_records = -1; + int root_type; + char *out_buf; + int throughput_exceeded = FLB_FALSE; + size_t off = 0; + size_t out_size; + msgpack_unpacked result; + msgpack_object root; + msgpack_object key; + msgpack_object val; + msgpack_object response; + msgpack_object response_key; + msgpack_object response_val; + + if (strstr(c->resp.payload, "\"FailedRecordCount\":0")) { + return 0; + } + + /* Convert JSON payload to msgpack */ + ret = flb_pack_json(c->resp.payload, c->resp.payload_size, + &out_buf, &out_size, &root_type, NULL); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not pack/validate JSON API response\n%s", + c->resp.payload); + return -1; + } + + /* Lookup error field */ + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, out_buf, out_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s", + c->resp.payload); + failed_records = -1; + goto done; + } + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected payload type=%i", + root.type); + failed_records = -1; + goto done; + } + + for (i = 0; i < root.via.map.size; i++) { + key = root.via.map.ptr[i].key; + if (key.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "unexpected key type=%i", + key.type); + failed_records = -1; + goto done; + } + + if (key.via.str.size >= 14 && + strncmp(key.via.str.ptr, "FailedRecordCount", 14) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + flb_plg_error(ctx->ins, "unexpected 'FailedRecordCount' value type=%i", + val.type); + failed_records = -1; + goto done; + } + + failed_records = val.via.u64; + if (failed_records == 0) { + /* no need to check RequestResponses field */ + goto done; + } + } + + if (key.via.str.size >= 14 && + strncmp(key.via.str.ptr, "Records", 7) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_ARRAY) { + flb_plg_error(ctx->ins, "unexpected 'Records' value type=%i", + val.type); + failed_records = -1; + goto done; + } + + if (val.via.array.size == 0) { + flb_plg_error(ctx->ins, "'Records' field in response is empty"); + failed_records = -1; + goto done; + } + + for (k = 0; k < val.via.array.size; k++) { + /* iterate through the responses */ + response = val.via.array.ptr[k]; + if (response.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected 'Records[%d]' value type=%i", + k, response.type); + failed_records = -1; + goto done; + } + for (w = 0; w < response.via.map.size; w++) { + /* iterate through the response's keys */ + response_key = response.via.map.ptr[w].key; + if (response_key.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "unexpected key type=%i", + response_key.type); + failed_records = -1; + goto done; + } + if (response_key.via.str.size >= 9 && + strncmp(response_key.via.str.ptr, "ErrorCode", 9) == 0) { + response_val = response.via.map.ptr[w].val; + if (!throughput_exceeded && + response_val.via.str.size >= 38 && + (strncmp(response_val.via.str.ptr, + ERR_CODE_EXCEEDED_THROUGHPUT, 38) == 0)) { + throughput_exceeded = FLB_TRUE; + flb_plg_error(ctx->ins, "Throughput limits may have been exceeded, %s", + ctx->stream_name); + } + flb_plg_debug(ctx->ins, "Record %i failed with err_code=%.*s", + k, response_val.via.str.size, + response_val.via.str.ptr); + } + if (response_key.via.str.size >= 12 && + strncmp(response_key.via.str.ptr, "ErrorMessage", 12) == 0) { + response_val = response.via.map.ptr[w].val; + flb_plg_debug(ctx->ins, "Record %i failed with err_msg=%.*s", + k, response_val.via.str.size, + response_val.via.str.ptr); + } + } + } + } + } + + done: + flb_free(out_buf); + msgpack_unpacked_destroy(&result); + return failed_records; +} + +static int plugin_under_test() +{ + if (getenv("FLB_KINESIS_PLUGIN_UNDER_TEST") != NULL) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static char *mock_error_response(char *error_env_var) +{ + char *err_val = NULL; + char *error = NULL; + int len = 0; + + err_val = getenv(error_env_var); + if (err_val != NULL && strlen(err_val) > 0) { + error = flb_malloc(strlen(err_val) + sizeof(char)); + if (error == NULL) { + flb_errno(); + return NULL; + } + + len = strlen(err_val); + memcpy(error, err_val, len); + error[len] = '\0'; + return error; + } + + return NULL; +} + +static int partial_success() +{ + char *err_val = NULL; + + err_val = getenv("PARTIAL_SUCCESS_CASE"); + if (err_val != NULL && strlen(err_val) > 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static struct flb_http_client *mock_http_call(char *error_env_var) +{ + /* create an http client so that we can set the response */ + struct flb_http_client *c = NULL; + char *error = mock_error_response(error_env_var); + + c = flb_calloc(1, sizeof(struct flb_http_client)); + if (!c) { + flb_errno(); + flb_free(error); + return NULL; + } + mk_list_init(&c->headers); + + if (error != NULL) { + c->resp.status = 400; + /* resp.data is freed on destroy, payload is supposed to reference it */ + c->resp.data = error; + c->resp.payload = c->resp.data; + c->resp.payload_size = strlen(error); + } + else { + c->resp.status = 200; + c->resp.payload = ""; + c->resp.payload_size = 0; + if (partial_success() == FLB_TRUE) { + /* mocked partial failure response */ + c->resp.payload = "{\"FailedRecordCount\":2,\"Records\":[{\"SequenceNumber\":\"49543463076548007577105092703039560359975228518395012686\",\"ShardId\":\"shardId-000000000000\"},{\"ErrorCode\":\"ProvisionedThroughputExceededException\",\"ErrorMessage\":\"Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111.\"},{\"ErrorCode\":\"InternalFailure\",\"ErrorMessage\":\"Internal service failure.\"}]}"; + c->resp.payload_size = strlen(c->resp.payload); + } + else { + /* mocked success response */ + c->resp.payload = "{\"FailedRecordCount\":0,\"Records\":[{\"SequenceNumber\":\"49543463076548007577105092703039560359975228518395019266\",\"ShardId\":\"shardId-000000000000\"},{\"SequenceNumber\":\"49543463076570308322303623326179887152428262250726293522\",\"ShardId\":\"shardId-000000000001\"},{\"SequenceNumber\":\"49543463076570308322303623326179887152428262250726293588\",\"ShardId\":\"shardId-000000000003\"}]}"; + c->resp.payload_size = strlen(c->resp.payload); + } + } + + return c; +} + + +/* + * Returns -1 on failure, 0 on success + */ +int put_records(struct flb_kinesis *ctx, struct flush *buf, + size_t payload_size, int num_records) +{ + + struct flb_http_client *c = NULL; + struct flb_aws_client *kinesis_client; + flb_sds_t error; + int failed_records = 0; + + flb_plg_debug(ctx->ins, "Sending log records to stream %s", + ctx->stream_name); + + if (plugin_under_test() == FLB_TRUE) { + c = mock_http_call("TEST_PUT_RECORDS_ERROR"); + } + else { + kinesis_client = ctx->kinesis_client; + c = kinesis_client->client_vtable->request(kinesis_client, FLB_HTTP_POST, + "/", buf->out_buf, payload_size, + &put_records_target_header, 1); + } + + if (c) { + flb_plg_debug(ctx->ins, "PutRecords http status=%d", c->resp.status); + + if (c->resp.status == 200) { + /* Kinesis API can return partial success- check response */ + if (c->resp.payload_size > 0) { + failed_records = process_api_response(ctx, c); + if (failed_records < 0) { + flb_plg_error(ctx->ins, "PutRecords response " + "could not be parsed, %s", + c->resp.payload); + flb_http_client_destroy(c); + return -1; + } + if (failed_records == num_records) { + flb_plg_error(ctx->ins, "PutRecords request returned " + "with no records successfully recieved, %s", + ctx->stream_name); + flb_http_client_destroy(c); + return -1; + } + if (failed_records > 0) { + flb_plg_error(ctx->ins, "%d out of %d records failed to be " + "delivered, will retry this batch, %s", + failed_records, num_records, + ctx->stream_name); + flb_http_client_destroy(c); + return -1; + } + } + flb_plg_debug(ctx->ins, "Sent events to %s", ctx->stream_name); + flb_http_client_destroy(c); + return 0; + } + + /* Check error */ + if (c->resp.payload_size > 0) { + error = flb_aws_error(c->resp.payload, c->resp.payload_size); + if (error != NULL) { + if (strcmp(error, ERR_CODE_EXCEEDED_THROUGHPUT) == 0) { + flb_plg_error(ctx->ins, "Throughput limits for %s " + "may have been exceeded.", + ctx->stream_name); + } + if (strncmp(error, "SerializationException", 22) == 0) { + /* + * If this happens, we habe a bug in the code + * User should send us the output to debug + */ + flb_plg_error(ctx->ins, "<<------Bug in Code------>>"); + printf("Malformed request: %s", buf->out_buf); + } + flb_aws_print_error(c->resp.payload, c->resp.payload_size, + "PutRecords", ctx->ins); + flb_sds_destroy(error); + } + else { + /* error could not be parsed, print raw response to debug */ + flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload); + } + } + } + + flb_plg_error(ctx->ins, "Failed to send log records to %s", ctx->stream_name); + if (c) { + flb_http_client_destroy(c); + } + return -1; +} + + +void kinesis_flush_destroy(struct flush *buf) +{ + if (buf) { + flb_free(buf->tmp_buf); + flb_free(buf->out_buf); + flb_free(buf->events); + flb_free(buf->event_buf); + flb_free(buf); + } +} diff --git a/fluent-bit/plugins/out_kinesis_streams/kinesis_api.h b/fluent-bit/plugins/out_kinesis_streams/kinesis_api.h new file mode 100644 index 000000000..e44de6d4d --- /dev/null +++ b/fluent-bit/plugins/out_kinesis_streams/kinesis_api.h @@ -0,0 +1,44 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_KINESIS_API +#define FLB_OUT_KINESIS_API + +#define PUT_RECORDS_PAYLOAD_SIZE 5242880 +#define MAX_EVENTS_PER_PUT 500 +#define MAX_EVENT_SIZE 1048556 /* 1048576 - 20 bytes for partition key */ + +/* number of characters needed to 'start' a PutRecords payload */ +#define PUT_RECORDS_HEADER_LEN 30 +/* number of characters needed per record in a PutRecords payload */ +#define PUT_RECORDS_PER_RECORD_LEN 48 +/* number of characters needed to 'end' a PutRecords payload */ +#define PUT_RECORDS_FOOTER_LEN 4 + +#include "kinesis.h" + +void kinesis_flush_destroy(struct flush *buf); + +int process_and_send_to_kinesis(struct flb_kinesis *ctx, struct flush *buf, + const char *data, size_t bytes); + +int put_records(struct flb_kinesis *ctx, struct flush *buf, + size_t payload_size, int num_records); + +#endif |