diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/plugins/out_kinesis_streams | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/out_kinesis_streams')
-rw-r--r-- | fluent-bit/plugins/out_kinesis_streams/CMakeLists.txt | 5 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kinesis_streams/kinesis.c | 499 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kinesis_streams/kinesis.h | 109 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kinesis_streams/kinesis_api.c | 987 | ||||
-rw-r--r-- | fluent-bit/plugins/out_kinesis_streams/kinesis_api.h | 44 |
5 files changed, 0 insertions, 1644 deletions
diff --git a/fluent-bit/plugins/out_kinesis_streams/CMakeLists.txt b/fluent-bit/plugins/out_kinesis_streams/CMakeLists.txt deleted file mode 100644 index d95110ee..00000000 --- a/fluent-bit/plugins/out_kinesis_streams/CMakeLists.txt +++ /dev/null @@ -1,5 +0,0 @@ -set(src - kinesis.c - kinesis_api.c) - -FLB_PLUGIN(out_kinesis_streams "${src}" "") diff --git a/fluent-bit/plugins/out_kinesis_streams/kinesis.c b/fluent-bit/plugins/out_kinesis_streams/kinesis.c deleted file mode 100644 index 85dd3794..00000000 --- a/fluent-bit/plugins/out_kinesis_streams/kinesis.c +++ /dev/null @@ -1,499 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_compat.h> -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_output.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_slist.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_config_map.h> -#include <fluent-bit/flb_output_plugin.h> - -#include <fluent-bit/flb_sds.h> -#include <fluent-bit/flb_aws_credentials.h> -#include <fluent-bit/flb_aws_util.h> -#include <fluent-bit/flb_mem.h> -#include <fluent-bit/flb_http_client.h> -#include <fluent-bit/flb_utils.h> - -#include <monkey/mk_core.h> -#include <msgpack.h> -#include <string.h> -#include <stdio.h> - -#include "kinesis.h" -#include "kinesis_api.h" - -static struct flb_aws_header content_type_header = { - .key = "Content-Type", - .key_len = 12, - .val = "application/x-amz-json-1.1", - .val_len = 26, -}; - -static int cb_kinesis_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) -{ - const char *tmp; - char *session_name = NULL; - struct flb_kinesis *ctx = NULL; - int ret; - (void) config; - (void) data; - - ctx = flb_calloc(1, sizeof(struct flb_kinesis)); - if (!ctx) { - flb_errno(); - return -1; - } - - ctx->ins = ins; - - /* Populate context with config map defaults and incoming properties */ - ret = flb_output_config_map_set(ins, (void *) ctx); - if (ret == -1) { - flb_plg_error(ctx->ins, "configuration error"); - goto error; - } - - tmp = flb_output_get_property("stream", ins); - if (tmp) { - ctx->stream_name = tmp; - } else { - flb_plg_error(ctx->ins, "'stream' is a required field"); - goto error; - } - - tmp = flb_output_get_property("time_key", ins); - if (tmp) { - ctx->time_key = tmp; - } - - tmp = flb_output_get_property("time_key_format", ins); - if (tmp) { - ctx->time_key_format = tmp; - } else { - ctx->time_key_format = DEFAULT_TIME_KEY_FORMAT; - } - - tmp = flb_output_get_property("log_key", ins); - if (tmp) { - ctx->log_key = tmp; - } - - if (ctx->log_key && ctx->time_key) { - flb_plg_error(ctx->ins, "'time_key' and 'log_key' can not be used together"); - goto error; - } - - tmp = flb_output_get_property("endpoint", ins); - if (tmp) { - ctx->custom_endpoint = FLB_TRUE; - ctx->endpoint = removeProtocol((char *) tmp, "https://"); - } - else { - ctx->custom_endpoint = FLB_FALSE; - } - - tmp = flb_output_get_property("sts_endpoint", ins); - if (tmp) { - ctx->sts_endpoint = (char *) tmp; - } - - - tmp = flb_output_get_property("log_key", ins); - if (tmp) { - ctx->log_key = tmp; - } - - tmp = flb_output_get_property("region", ins); - if (tmp) { - ctx->region = tmp; - } else { - flb_plg_error(ctx->ins, "'region' is a required field"); - goto error; - } - - tmp = flb_output_get_property("role_arn", ins); - if (tmp) { - ctx->role_arn = tmp; - } - - /* one tls instance for provider, one for cw client */ - ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, - FLB_TRUE, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - - if (!ctx->cred_tls) { - flb_plg_error(ctx->ins, "Failed to create tls context"); - goto error; - } - - ctx->client_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, - FLB_TRUE, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - if (!ctx->client_tls) { - flb_plg_error(ctx->ins, "Failed to create tls context"); - goto error; - } - - ctx->aws_provider = flb_standard_chain_provider_create(config, - ctx->cred_tls, - (char *) ctx->region, - ctx->sts_endpoint, - NULL, - flb_aws_client_generator(), - ctx->profile); - if (!ctx->aws_provider) { - flb_plg_error(ctx->ins, "Failed to create AWS Credential Provider"); - goto error; - } - - ctx->uuid = flb_sts_session_name(); - if (!ctx->uuid) { - flb_plg_error(ctx->ins, - "Failed to generate plugin instance UUID"); - goto error; - } - - if(ctx->role_arn) { - /* set up sts assume role provider */ - session_name = flb_sts_session_name(); - if (!session_name) { - flb_plg_error(ctx->ins, - "Failed to generate random STS session name"); - goto error; - } - - /* STS provider needs yet another separate TLS instance */ - ctx->sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, - FLB_TRUE, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - if (!ctx->sts_tls) { - flb_errno(); - goto error; - } - - ctx->base_aws_provider = ctx->aws_provider; - - ctx->aws_provider = flb_sts_provider_create(config, - ctx->sts_tls, - ctx->base_aws_provider, - (char *) ctx->external_id, - (char *) ctx->role_arn, - session_name, - (char *) ctx->region, - ctx->sts_endpoint, - NULL, - flb_aws_client_generator()); - if (!ctx->aws_provider) { - flb_plg_error(ctx->ins, - "Failed to create AWS STS Credential Provider"); - goto error; - } - /* session name can freed after provider is created */ - flb_free(session_name); - session_name = NULL; - } - - /* initialize credentials and set to sync mode */ - ctx->aws_provider->provider_vtable->sync(ctx->aws_provider); - ctx->aws_provider->provider_vtable->init(ctx->aws_provider); - ctx->aws_provider->provider_vtable->upstream_set(ctx->aws_provider, ctx->ins); - - if (ctx->endpoint == NULL) { - ctx->endpoint = flb_aws_endpoint("kinesis", (char *) ctx->region); - if (!ctx->endpoint) { - goto error; - } - } - - struct flb_aws_client_generator *generator = flb_aws_client_generator(); - ctx->kinesis_client = generator->create(); - if (!ctx->kinesis_client) { - goto error; - } - ctx->kinesis_client->name = "kinesis_client"; - ctx->kinesis_client->has_auth = FLB_TRUE; - ctx->kinesis_client->provider = ctx->aws_provider; - ctx->kinesis_client->region = (char *) ctx->region; - ctx->kinesis_client->retry_requests = ctx->retry_requests; - ctx->kinesis_client->service = "kinesis"; - ctx->kinesis_client->port = 443; - ctx->kinesis_client->flags = 0; - ctx->kinesis_client->proxy = NULL; - ctx->kinesis_client->static_headers = &content_type_header; - ctx->kinesis_client->static_headers_len = 1; - - struct flb_upstream *upstream = flb_upstream_create(config, ctx->endpoint, - 443, FLB_IO_TLS, - ctx->client_tls); - if (!upstream) { - flb_plg_error(ctx->ins, "Connection initialization error"); - goto error; - } - - ctx->kinesis_client->upstream = upstream; - flb_output_upstream_set(upstream, ctx->ins); - - ctx->kinesis_client->host = ctx->endpoint; - - /* Export context */ - flb_output_set_context(ins, ctx); - - return 0; - -error: - flb_free(session_name); - flb_plg_error(ctx->ins, "Initialization failed"); - flb_kinesis_ctx_destroy(ctx); - return -1; -} - -static struct flush *new_flush_buffer(const char *tag, int tag_len) -{ - struct flush *buf; - - - buf = flb_calloc(1, sizeof(struct flush)); - if (!buf) { - flb_errno(); - return NULL; - } - - buf->tmp_buf = flb_malloc(sizeof(char) * PUT_RECORDS_PAYLOAD_SIZE); - if (!buf->tmp_buf) { - flb_errno(); - kinesis_flush_destroy(buf); - return NULL; - } - buf->tmp_buf_size = PUT_RECORDS_PAYLOAD_SIZE; - - buf->events = flb_malloc(sizeof(struct kinesis_event) * MAX_EVENTS_PER_PUT); - if (!buf->events) { - flb_errno(); - kinesis_flush_destroy(buf); - return NULL; - } - buf->events_capacity = MAX_EVENTS_PER_PUT; - - buf->tag = tag; - buf->tag_len = tag_len; - - return buf; -} - -static void cb_kinesis_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) -{ - struct flb_kinesis *ctx = out_context; - int ret; - struct flush *buf; - (void) i_ins; - (void) config; - - buf = new_flush_buffer(event_chunk->tag, flb_sds_len(event_chunk->tag)); - if (!buf) { - flb_plg_error(ctx->ins, "Failed to construct flush buffer"); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - ret = process_and_send_to_kinesis(ctx, buf, - event_chunk->data, - event_chunk->size); - if (ret < 0) { - flb_plg_error(ctx->ins, "Failed to send records to kinesis"); - kinesis_flush_destroy(buf); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - flb_plg_debug(ctx->ins, "Processed %d records, sent %d to %s", - buf->records_processed, buf->records_sent, ctx->stream_name); - kinesis_flush_destroy(buf); - - FLB_OUTPUT_RETURN(FLB_OK); -} - -void flb_kinesis_ctx_destroy(struct flb_kinesis *ctx) -{ - if (ctx != NULL) { - if (ctx->base_aws_provider) { - flb_aws_provider_destroy(ctx->base_aws_provider); - } - - if (ctx->aws_provider) { - flb_aws_provider_destroy(ctx->aws_provider); - } - - if (ctx->cred_tls) { - flb_tls_destroy(ctx->cred_tls); - } - - if (ctx->sts_tls) { - flb_tls_destroy(ctx->sts_tls); - } - - if (ctx->client_tls) { - flb_tls_destroy(ctx->client_tls); - } - - if (ctx->kinesis_client) { - flb_aws_client_destroy(ctx->kinesis_client); - } - - if (ctx->custom_endpoint == FLB_FALSE) { - flb_free(ctx->endpoint); - } - - if (ctx->uuid) { - flb_free(ctx->uuid); - } - - flb_free(ctx); - } -} - -static int cb_kinesis_exit(void *data, struct flb_config *config) -{ - struct flb_kinesis *ctx = data; - - flb_kinesis_ctx_destroy(ctx); - return 0; -} - -/* Configuration properties map */ -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "region", NULL, - 0, FLB_TRUE, offsetof(struct flb_kinesis, region), - "The AWS region of your kinesis stream" - }, - - { - FLB_CONFIG_MAP_STR, "stream", NULL, - 0, FLB_TRUE, offsetof(struct flb_kinesis, stream_name), - "Kinesis stream name" - }, - - { - FLB_CONFIG_MAP_STR, "time_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_kinesis, time_key), - "Add the timestamp to the record under this key. By default the timestamp " - "from Fluent Bit will not be added to records sent to Kinesis." - }, - - { - FLB_CONFIG_MAP_STR, "time_key_format", NULL, - 0, FLB_TRUE, offsetof(struct flb_kinesis, time_key_format), - "strftime compliant format string for the timestamp; for example, " - "the default is '%Y-%m-%dT%H:%M:%S'. This option is used with time_key. " - }, - - { - FLB_CONFIG_MAP_STR, "role_arn", NULL, - 0, FLB_TRUE, offsetof(struct flb_kinesis, role_arn), - "ARN of an IAM role to assume (ex. for cross account access)." - }, - - { - FLB_CONFIG_MAP_STR, "endpoint", NULL, - 0, FLB_FALSE, 0, - "Specify a custom endpoint for the Kinesis API" - }, - - { - FLB_CONFIG_MAP_STR, "sts_endpoint", NULL, - 0, FLB_TRUE, offsetof(struct flb_kinesis, sts_endpoint), - "Custom endpoint for the STS API." - }, - - { - FLB_CONFIG_MAP_STR, "external_id", NULL, - 0, FLB_TRUE, offsetof(struct flb_kinesis, external_id), - "Specify an external ID for the STS API, can be used with the role_arn parameter if your role " - "requires an external ID." - }, - - { - FLB_CONFIG_MAP_STR, "log_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_kinesis, log_key), - "By default, the whole log record will be sent to Kinesis. " - "If you specify a key name with this option, then only the value of " - "that key will be sent to Kinesis. For example, if you are using " - "the Fluentd Docker log driver, you can specify `log_key log` and only " - "the log message will be sent to Kinesis." - }, - - { - FLB_CONFIG_MAP_BOOL, "auto_retry_requests", "true", - 0, FLB_TRUE, offsetof(struct flb_kinesis, retry_requests), - "Immediately retry failed requests to AWS services once. This option " - "does not affect the normal Fluent Bit retry mechanism with backoff. " - "Instead, it enables an immediate retry with no delay for networking " - "errors, which may help improve throughput when there are transient/random " - "networking issues." - }, - - { - FLB_CONFIG_MAP_STR, "profile", NULL, - 0, FLB_TRUE, offsetof(struct flb_kinesis, profile), - "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in " - "$HOME/.aws/ directory." - }, - - /* EOF */ - {0} -}; - -/* Plugin registration */ -struct flb_output_plugin out_kinesis_streams_plugin = { - .name = "kinesis_streams", - .description = "Send logs to Amazon Kinesis Streams", - .cb_init = cb_kinesis_init, - .cb_flush = cb_kinesis_flush, - .cb_exit = cb_kinesis_exit, - .workers = 1, - .flags = 0, - - /* Configuration */ - .config_map = config_map, -}; diff --git a/fluent-bit/plugins/out_kinesis_streams/kinesis.h b/fluent-bit/plugins/out_kinesis_streams/kinesis.h deleted file mode 100644 index 75d41e10..00000000 --- a/fluent-bit/plugins/out_kinesis_streams/kinesis.h +++ /dev/null @@ -1,109 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_OUT_KINESIS_H -#define FLB_OUT_KINESIS_H - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_sds.h> -#include <fluent-bit/flb_aws_credentials.h> -#include <fluent-bit/flb_http_client.h> -#include <fluent-bit/flb_aws_util.h> -#include <fluent-bit/flb_signv4.h> - -#define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S" - -/* buffers used for each flush */ -struct flush { - /* temporary buffer for storing the serialized event messages */ - char *tmp_buf; - size_t tmp_buf_size; - /* current index of tmp_buf */ - size_t tmp_buf_offset; - - /* projected final size of the payload for this flush */ - size_t data_size; - - /* log records- each of these has a pointer to their message in tmp_buf */ - struct kinesis_event *events; - int events_capacity; - /* current event */ - int event_index; - - /* the payload of the API request */ - char *out_buf; - size_t out_buf_size; - - /* buffer used to temporarily hold an event during processing */ - char *event_buf; - size_t event_buf_size; - - int records_sent; - int records_processed; - - const char *tag; - int tag_len; -}; - -struct kinesis_event { - char *json; - size_t len; - struct timespec timestamp; -}; - -struct flb_kinesis { - /* - * TLS instances can not be re-used. So we have one for: - * - Base cred provider (needed for EKS provider) - * - STS Assume role provider - * - The CloudWatch Logs client for this plugin - */ - struct flb_tls *cred_tls; - struct flb_tls *sts_tls; - struct flb_tls *client_tls; - struct flb_aws_provider *aws_provider; - struct flb_aws_provider *base_aws_provider; - struct flb_aws_client *kinesis_client; - - /* configuration options */ - const char *stream_name; - const char *time_key; - const char *time_key_format; - const char *region; - const char *role_arn; - const char *log_key; - const char *external_id; - int retry_requests; - char *sts_endpoint; - int custom_endpoint; - char *profile; - - /* in this plugin the 'random' partition key is a uuid + fluent tag + timestamp */ - char *uuid; - - /* must be freed on shutdown if custom_endpoint is not set */ - char *endpoint; - - /* Plugin output instance reference */ - struct flb_output_instance *ins; -}; - -void flb_kinesis_ctx_destroy(struct flb_kinesis *ctx); - -#endif diff --git a/fluent-bit/plugins/out_kinesis_streams/kinesis_api.c b/fluent-bit/plugins/out_kinesis_streams/kinesis_api.c deleted file mode 100644 index 9124657b..00000000 --- a/fluent-bit/plugins/out_kinesis_streams/kinesis_api.c +++ /dev/null @@ -1,987 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_compat.h> -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_output.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_slist.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_macros.h> -#include <fluent-bit/flb_config_map.h> -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_log_event_decoder.h> - -#include <fluent-bit/flb_sds.h> -#include <fluent-bit/flb_aws_credentials.h> -#include <fluent-bit/flb_aws_util.h> -#include <fluent-bit/flb_mem.h> -#include <fluent-bit/flb_http_client.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_base64.h> - -#include <monkey/mk_core.h> -#include <msgpack.h> -#include <string.h> -#include <stdio.h> -#include <stdlib.h> - -#ifndef FLB_SYSTEM_WINDOWS -#include <unistd.h> -#endif - -#include "kinesis_api.h" - -#define ERR_CODE_EXCEEDED_THROUGHPUT "ProvisionedThroughputExceededException" - -static struct flb_aws_header put_records_target_header = { - .key = "X-Amz-Target", - .key_len = 12, - .val = "Kinesis_20131202.PutRecords", - .val_len = 27, -}; - -static inline int try_to_write(char *buf, int *off, size_t left, - const char *str, size_t str_len) -{ - if (str_len <= 0){ - str_len = strlen(str); - } - if (left <= *off+str_len) { - return FLB_FALSE; - } - memcpy(buf+*off, str, str_len); - *off += str_len; - return FLB_TRUE; -} - -/* - * Writes the "header" for a put_records payload - */ -static int init_put_payload(struct flb_kinesis *ctx, struct flush *buf, - int *offset) -{ - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - "{\"StreamName\":\"", 15)) { - goto error; - } - - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - ctx->stream_name, 0)) { - goto error; - } - - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - "\",\"Records\":[", 13)) { - goto error; - } - return 0; - -error: - return -1; -} - -/* - * Simple and fast hashing algorithm to create random partition keys - */ -static flb_sds_t random_partition_key(const char *tag) -{ - int c; - unsigned long hash = 5381; - unsigned long hash2 = 5381; - flb_sds_t hash_str; - flb_sds_t tmp; - struct flb_time tm; - - /* get current time */ - flb_time_get(&tm); - - /* compose hash */ - while ((c = *tag++)) { - hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ - } - hash2 = (unsigned long) hash2 * tm.tm.tv_sec * tm.tm.tv_nsec; - - /* flb_sds_printf allocs if the incoming sds is not at least 64 bytes */ - hash_str = flb_sds_create_size(64); - if (!hash_str) { - flb_errno(); - return NULL; - } - tmp = flb_sds_printf(&hash_str, "%lu%lu", hash % 7919, hash2 % 7919); - if (!tmp) { - flb_errno(); - flb_sds_destroy(hash_str); - return NULL; - } - hash_str = tmp; - - return hash_str; -} - -/* - * Writes a log event to the output buffer - */ -static int write_event(struct flb_kinesis *ctx, struct flush *buf, - struct kinesis_event *event, int *offset) -{ - flb_sds_t tag_timestamp = NULL; - - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - "{\"Data\":\"", 9)) { - goto error; - } - - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - event->json, event->len)) { - goto error; - } - - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - "\",\"PartitionKey\":\"", 18)) { - goto error; - } - - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - ctx->uuid, 10)) { - goto error; - } - - tag_timestamp = random_partition_key(buf->tag); - if (!tag_timestamp) { - flb_plg_error(ctx->ins, "failed to generate partition key for %s", buf->tag); - goto error; - } - - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - tag_timestamp, 0)) { - flb_sds_destroy(tag_timestamp); - goto error; - } - flb_sds_destroy(tag_timestamp); - - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - "\"}", 2)) { - goto error; - } - - return 0; - -error: - return -1; -} - -/* Terminates a PutRecords payload */ -static int end_put_payload(struct flb_kinesis *ctx, struct flush *buf, - int *offset) -{ - if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - "]}", 2)) { - return -1; - } - buf->out_buf[*offset] = '\0'; - - return 0; -} - - -/* - * Processes the msgpack object - * -1 = failure, record not added - * 0 = success, record added - * 1 = we ran out of space, send and retry - * 2 = record could not be processed, discard it - * Returns 0 on success, -1 on general errors, - * and 1 if we ran out of space to write the event - * which means a send must occur - */ -static int process_event(struct flb_kinesis *ctx, struct flush *buf, - const msgpack_object *obj, struct flb_time *tms) -{ - size_t written = 0; - int ret; - size_t size; - size_t b64_len; - struct kinesis_event *event; - char *tmp_buf_ptr; - char *time_key_ptr; - struct tm time_stamp; - struct tm *tmp; - size_t len; - size_t tmp_size; - char *out_buf; - - tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; - ret = flb_msgpack_to_json(tmp_buf_ptr, - buf->tmp_buf_size - buf->tmp_buf_offset, - obj); - if (ret <= 0) { - /* - * negative value means failure to write to buffer, - * which means we ran out of space, and must send the logs - * - * TODO: This could also incorrectly be triggered if the record - * is larger than MAX_EVENT_SIZE - */ - return 1; - } - written = (size_t) ret; - - /* Discard empty messages (written == 2 means '""') */ - if (written <= 2) { - flb_plg_debug(ctx->ins, "Found empty log message, %s", ctx->stream_name); - return 2; - } - - if (ctx->log_key) { - /* - * flb_msgpack_to_json will encase the value in quotes - * We don't want that for log_key, so we ignore the first - * and last character - */ - written -= 2; - tmp_buf_ptr++; /* pass over the opening quote */ - buf->tmp_buf_offset++; - } - - /* is (written + 1) because we still have to append newline */ - if ((written + 1) >= MAX_EVENT_SIZE) { - flb_plg_warn(ctx->ins, "[size=%zu] Discarding record which is larger than " - "max size allowed by Kinesis, %s", written + 1, - ctx->stream_name); - return 2; - } - - if (ctx->time_key) { - /* append time_key to end of json string */ - tmp = gmtime_r(&tms->tm.tv_sec, &time_stamp); - if (!tmp) { - flb_plg_error(ctx->ins, "Could not create time stamp for %lu unix " - "seconds, discarding record, %s", tms->tm.tv_sec, - ctx->stream_name); - return 2; - } - - /* format time output and return the length */ - len = flb_aws_strftime_precision(&out_buf, ctx->time_key_format, tms); - - /* how much space do we have left */ - tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written; - if (len > tmp_size) { - /* not enough space - tell caller to retry */ - flb_free(out_buf); - return 1; - } - - if (len == 0) { - /* - * when the length of out_buf is not enough for time_key_format, - * time_key will not be added to record. - */ - flb_plg_error(ctx->ins, "Failed to add time_key %s to record, %s", - ctx->time_key, ctx->stream_name); - flb_free(out_buf); - } - else { - time_key_ptr = tmp_buf_ptr + written - 1; - memcpy(time_key_ptr, ",", 1); - time_key_ptr++; - memcpy(time_key_ptr, "\"", 1); - time_key_ptr++; - memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key)); - time_key_ptr += strlen(ctx->time_key); - memcpy(time_key_ptr, "\":\"", 3); - time_key_ptr += 3; - - /* merge out_buf to time_key_ptr */ - memcpy(time_key_ptr, out_buf, len); - flb_free(out_buf); - time_key_ptr += len; - memcpy(time_key_ptr, "\"}", 2); - time_key_ptr += 2; - written = (time_key_ptr - tmp_buf_ptr); - } - } - - /* is (written + 1) because we still have to append newline */ - if ((written + 1) >= MAX_EVENT_SIZE) { - flb_plg_warn(ctx->ins, "[size=%zu] Discarding record which is larger than " - "max size allowed by Kinesis, %s", written + 1, - ctx->stream_name); - return 2; - } - - /* append newline to record */ - - tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written; - if (tmp_size <= 1) { - /* no space left- tell caller to retry */ - return 1; - } - - memcpy(tmp_buf_ptr + written, "\n", 1); - written++; - - /* - * check if event_buf is initialized and big enough - * Base64 encoding will increase size by ~4/3 - */ - size = (written * 1.5) + 4; - if (buf->event_buf == NULL || buf->event_buf_size < size) { - flb_free(buf->event_buf); - buf->event_buf = flb_malloc(size); - buf->event_buf_size = size; - if (buf->event_buf == NULL) { - flb_errno(); - return -1; - } - } - - tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; - ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, - (unsigned char *) tmp_buf_ptr, written); - if (ret != 0) { - flb_errno(); - return -1; - } - written = b64_len; - - tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; - if ((buf->tmp_buf_size - buf->tmp_buf_offset) < written) { - /* not enough space, send logs */ - return 1; - } - - /* copy serialized json to tmp_buf */ - memcpy(tmp_buf_ptr, buf->event_buf, written); - - buf->tmp_buf_offset += written; - event = &buf->events[buf->event_index]; - event->json = tmp_buf_ptr; - event->len = written; - event->timestamp.tv_sec = tms->tm.tv_sec; - event->timestamp.tv_nsec = tms->tm.tv_nsec; - - return 0; -} - -/* Resets or inits a flush struct */ -static void reset_flush_buf(struct flb_kinesis *ctx, struct flush *buf) { - buf->event_index = 0; - buf->tmp_buf_offset = 0; - buf->data_size = PUT_RECORDS_HEADER_LEN + PUT_RECORDS_FOOTER_LEN; - buf->data_size += strlen(ctx->stream_name); -} - -/* constructs a put payload, and then sends */ -static int send_log_events(struct flb_kinesis *ctx, struct flush *buf) { - int ret; - int offset; - int i; - struct kinesis_event *event; - - if (buf->event_index <= 0) { - /* - * event_index should always be 1 more than the actual last event index - * when this function is called. - * Except in the case where send_log_events() is called at the end of - * process_and_send_to_kinesis. If all records were already sent, event_index - * will be 0. Hence this check. - */ - return 0; - } - - /* alloc out_buf if needed */ - if (buf->out_buf == NULL || buf->out_buf_size < buf->data_size) { - if (buf->out_buf != NULL) { - flb_free(buf->out_buf); - } - buf->out_buf = flb_malloc(buf->data_size + 1); - if (!buf->out_buf) { - flb_errno(); - return -1; - } - buf->out_buf_size = buf->data_size; - } - - offset = 0; - ret = init_put_payload(ctx, buf, &offset); - if (ret < 0) { - flb_plg_error(ctx->ins, "Failed to initialize PutRecords payload, %s", - ctx->stream_name); - return -1; - } - - for (i = 0; i < buf->event_index; i++) { - event = &buf->events[i]; - ret = write_event(ctx, buf, event, &offset); - if (ret < 0) { - flb_plg_error(ctx->ins, "Failed to write log record %d to " - "payload buffer, %s", i, ctx->stream_name); - return -1; - } - if (i != (buf->event_index -1)) { - if (!try_to_write(buf->out_buf, &offset, buf->out_buf_size, - ",", 1)) { - flb_plg_error(ctx->ins, "Could not terminate record with ','"); - return -1; - } - } - } - - ret = end_put_payload(ctx, buf, &offset); - if (ret < 0) { - flb_plg_error(ctx->ins, "Could not complete PutRecords payload"); - return -1; - } - flb_plg_debug(ctx->ins, "kinesis:PutRecords: events=%d, payload=%d bytes", i, offset); - ret = put_records(ctx, buf, (size_t) offset, i); - if (ret < 0) { - flb_plg_error(ctx->ins, "Failed to send log records"); - return -1; - } - buf->records_sent += i; - - return 0; -} - -/* - * Processes the msgpack object, sends the current batch if needed - */ -static int add_event(struct flb_kinesis *ctx, struct flush *buf, - const msgpack_object *obj, struct flb_time *tms) -{ - int ret; - struct kinesis_event *event; - int retry_add = FLB_FALSE; - size_t event_bytes = 0; - - if (buf->event_index == 0) { - /* init */ - reset_flush_buf(ctx, buf); - } - -retry_add_event: - retry_add = FLB_FALSE; - ret = process_event(ctx, buf, obj, tms); - if (ret < 0) { - return -1; - } - else if (ret == 1) { - if (buf->event_index <= 0) { - /* somehow the record was larger than our entire request buffer */ - flb_plg_warn(ctx->ins, "Discarding massive log record, %s", - ctx->stream_name); - return 0; /* discard this record and return to caller */ - } - /* send logs and then retry the add */ - retry_add = FLB_TRUE; - goto send; - } else if (ret == 2) { - /* discard this record and return to caller */ - flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s", - ctx->stream_name); - return 0; - } - - event = &buf->events[buf->event_index]; - event_bytes = event->len + PUT_RECORDS_PER_RECORD_LEN; - - if ((buf->data_size + event_bytes) > PUT_RECORDS_PAYLOAD_SIZE) { - if (buf->event_index <= 0) { - /* somehow the record was larger than our entire request buffer */ - flb_plg_warn(ctx->ins, "[size=%zu] Discarding massive log record, %s", - event_bytes, ctx->stream_name); - return 0; /* discard this record and return to caller */ - } - /* do not send this event */ - retry_add = FLB_TRUE; - goto send; - } - - /* send is not needed yet, return to caller */ - buf->data_size += event_bytes; - buf->event_index++; - - if (buf->event_index == MAX_EVENTS_PER_PUT) { - goto send; - } - - return 0; - -send: - ret = send_log_events(ctx, buf); - reset_flush_buf(ctx, buf); - if (ret < 0) { - return -1; - } - - if (retry_add == FLB_TRUE) { - goto retry_add_event; - } - - return 0; -} - -/* - * Main routine- processes msgpack and sends in batches - * return value is the number of events processed (number sent is stored in buf) - */ -int process_and_send_to_kinesis(struct flb_kinesis *ctx, struct flush *buf, - const char *data, size_t bytes) -{ - int i = 0; - size_t map_size; - msgpack_object map; - msgpack_object_kv *kv; - msgpack_object key; - msgpack_object val; - char *key_str = NULL; - size_t key_str_size = 0; - int j; - int ret; - int check = FLB_FALSE; - int found = FLB_FALSE; - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - - ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - - return -1; - } - - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - map = *log_event.body; - map_size = map.via.map.size; - - if (ctx->log_key) { - key_str = NULL; - key_str_size = 0; - check = FLB_FALSE; - found = FLB_FALSE; - - kv = map.via.map.ptr; - - for(j=0; j < map_size; j++) { - key = (kv+j)->key; - if (key.type == MSGPACK_OBJECT_BIN) { - key_str = (char *) key.via.bin.ptr; - key_str_size = key.via.bin.size; - check = FLB_TRUE; - } - if (key.type == MSGPACK_OBJECT_STR) { - key_str = (char *) key.via.str.ptr; - key_str_size = key.via.str.size; - check = FLB_TRUE; - } - - if (check == FLB_TRUE) { - if (strncmp(ctx->log_key, key_str, key_str_size) == 0) { - found = FLB_TRUE; - val = (kv+j)->val; - ret = add_event(ctx, buf, &val, &log_event.timestamp); - if (ret < 0 ) { - goto error; - } - } - } - - } - if (found == FLB_FALSE) { - flb_plg_error(ctx->ins, "Could not find log_key '%s' in record, %s", - ctx->log_key, ctx->stream_name); - } - else { - i++; - } - continue; - } - - ret = add_event(ctx, buf, &map, &log_event.timestamp); - if (ret < 0 ) { - goto error; - } - i++; - } - - flb_log_event_decoder_destroy(&log_decoder); - - /* send any remaining events */ - ret = send_log_events(ctx, buf); - reset_flush_buf(ctx, buf); - if (ret < 0) { - return -1; - } - - /* return number of events processed */ - buf->records_processed = i; - return i; - -error: - flb_log_event_decoder_destroy(&log_decoder); - - return -1; -} - -/* - * Returns number of failed records on success, -1 on failure - */ -static int process_api_response(struct flb_kinesis *ctx, - struct flb_http_client *c) -{ - int i; - int k; - int w; - int ret; - int failed_records = -1; - int root_type; - char *out_buf; - int throughput_exceeded = FLB_FALSE; - size_t off = 0; - size_t out_size; - msgpack_unpacked result; - msgpack_object root; - msgpack_object key; - msgpack_object val; - msgpack_object response; - msgpack_object response_key; - msgpack_object response_val; - - if (strstr(c->resp.payload, "\"FailedRecordCount\":0")) { - return 0; - } - - /* Convert JSON payload to msgpack */ - ret = flb_pack_json(c->resp.payload, c->resp.payload_size, - &out_buf, &out_size, &root_type, NULL); - if (ret == -1) { - flb_plg_error(ctx->ins, "could not pack/validate JSON API response\n%s", - c->resp.payload); - return -1; - } - - /* Lookup error field */ - msgpack_unpacked_init(&result); - ret = msgpack_unpack_next(&result, out_buf, out_size, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { - flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s", - c->resp.payload); - failed_records = -1; - goto done; - } - - root = result.data; - if (root.type != MSGPACK_OBJECT_MAP) { - flb_plg_error(ctx->ins, "unexpected payload type=%i", - root.type); - failed_records = -1; - goto done; - } - - for (i = 0; i < root.via.map.size; i++) { - key = root.via.map.ptr[i].key; - if (key.type != MSGPACK_OBJECT_STR) { - flb_plg_error(ctx->ins, "unexpected key type=%i", - key.type); - failed_records = -1; - goto done; - } - - if (key.via.str.size >= 14 && - strncmp(key.via.str.ptr, "FailedRecordCount", 14) == 0) { - val = root.via.map.ptr[i].val; - if (val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - flb_plg_error(ctx->ins, "unexpected 'FailedRecordCount' value type=%i", - val.type); - failed_records = -1; - goto done; - } - - failed_records = val.via.u64; - if (failed_records == 0) { - /* no need to check RequestResponses field */ - goto done; - } - } - - if (key.via.str.size >= 14 && - strncmp(key.via.str.ptr, "Records", 7) == 0) { - val = root.via.map.ptr[i].val; - if (val.type != MSGPACK_OBJECT_ARRAY) { - flb_plg_error(ctx->ins, "unexpected 'Records' value type=%i", - val.type); - failed_records = -1; - goto done; - } - - if (val.via.array.size == 0) { - flb_plg_error(ctx->ins, "'Records' field in response is empty"); - failed_records = -1; - goto done; - } - - for (k = 0; k < val.via.array.size; k++) { - /* iterate through the responses */ - response = val.via.array.ptr[k]; - if (response.type != MSGPACK_OBJECT_MAP) { - flb_plg_error(ctx->ins, "unexpected 'Records[%d]' value type=%i", - k, response.type); - failed_records = -1; - goto done; - } - for (w = 0; w < response.via.map.size; w++) { - /* iterate through the response's keys */ - response_key = response.via.map.ptr[w].key; - if (response_key.type != MSGPACK_OBJECT_STR) { - flb_plg_error(ctx->ins, "unexpected key type=%i", - response_key.type); - failed_records = -1; - goto done; - } - if (response_key.via.str.size >= 9 && - strncmp(response_key.via.str.ptr, "ErrorCode", 9) == 0) { - response_val = response.via.map.ptr[w].val; - if (!throughput_exceeded && - response_val.via.str.size >= 38 && - (strncmp(response_val.via.str.ptr, - ERR_CODE_EXCEEDED_THROUGHPUT, 38) == 0)) { - throughput_exceeded = FLB_TRUE; - flb_plg_error(ctx->ins, "Throughput limits may have been exceeded, %s", - ctx->stream_name); - } - flb_plg_debug(ctx->ins, "Record %i failed with err_code=%.*s", - k, response_val.via.str.size, - response_val.via.str.ptr); - } - if (response_key.via.str.size >= 12 && - strncmp(response_key.via.str.ptr, "ErrorMessage", 12) == 0) { - response_val = response.via.map.ptr[w].val; - flb_plg_debug(ctx->ins, "Record %i failed with err_msg=%.*s", - k, response_val.via.str.size, - response_val.via.str.ptr); - } - } - } - } - } - - done: - flb_free(out_buf); - msgpack_unpacked_destroy(&result); - return failed_records; -} - -static int plugin_under_test() -{ - if (getenv("FLB_KINESIS_PLUGIN_UNDER_TEST") != NULL) { - return FLB_TRUE; - } - - return FLB_FALSE; -} - -static char *mock_error_response(char *error_env_var) -{ - char *err_val = NULL; - char *error = NULL; - int len = 0; - - err_val = getenv(error_env_var); - if (err_val != NULL && strlen(err_val) > 0) { - error = flb_malloc(strlen(err_val) + sizeof(char)); - if (error == NULL) { - flb_errno(); - return NULL; - } - - len = strlen(err_val); - memcpy(error, err_val, len); - error[len] = '\0'; - return error; - } - - return NULL; -} - -static int partial_success() -{ - char *err_val = NULL; - - err_val = getenv("PARTIAL_SUCCESS_CASE"); - if (err_val != NULL && strlen(err_val) > 0) { - return FLB_TRUE; - } - - return FLB_FALSE; -} - -static struct flb_http_client *mock_http_call(char *error_env_var) -{ - /* create an http client so that we can set the response */ - struct flb_http_client *c = NULL; - char *error = mock_error_response(error_env_var); - - c = flb_calloc(1, sizeof(struct flb_http_client)); - if (!c) { - flb_errno(); - flb_free(error); - return NULL; - } - mk_list_init(&c->headers); - - if (error != NULL) { - c->resp.status = 400; - /* resp.data is freed on destroy, payload is supposed to reference it */ - c->resp.data = error; - c->resp.payload = c->resp.data; - c->resp.payload_size = strlen(error); - } - else { - c->resp.status = 200; - c->resp.payload = ""; - c->resp.payload_size = 0; - if (partial_success() == FLB_TRUE) { - /* mocked partial failure response */ - c->resp.payload = "{\"FailedRecordCount\":2,\"Records\":[{\"SequenceNumber\":\"49543463076548007577105092703039560359975228518395012686\",\"ShardId\":\"shardId-000000000000\"},{\"ErrorCode\":\"ProvisionedThroughputExceededException\",\"ErrorMessage\":\"Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111.\"},{\"ErrorCode\":\"InternalFailure\",\"ErrorMessage\":\"Internal service failure.\"}]}"; - c->resp.payload_size = strlen(c->resp.payload); - } - else { - /* mocked success response */ - c->resp.payload = "{\"FailedRecordCount\":0,\"Records\":[{\"SequenceNumber\":\"49543463076548007577105092703039560359975228518395019266\",\"ShardId\":\"shardId-000000000000\"},{\"SequenceNumber\":\"49543463076570308322303623326179887152428262250726293522\",\"ShardId\":\"shardId-000000000001\"},{\"SequenceNumber\":\"49543463076570308322303623326179887152428262250726293588\",\"ShardId\":\"shardId-000000000003\"}]}"; - c->resp.payload_size = strlen(c->resp.payload); - } - } - - return c; -} - - -/* - * Returns -1 on failure, 0 on success - */ -int put_records(struct flb_kinesis *ctx, struct flush *buf, - size_t payload_size, int num_records) -{ - - struct flb_http_client *c = NULL; - struct flb_aws_client *kinesis_client; - flb_sds_t error; - int failed_records = 0; - - flb_plg_debug(ctx->ins, "Sending log records to stream %s", - ctx->stream_name); - - if (plugin_under_test() == FLB_TRUE) { - c = mock_http_call("TEST_PUT_RECORDS_ERROR"); - } - else { - kinesis_client = ctx->kinesis_client; - c = kinesis_client->client_vtable->request(kinesis_client, FLB_HTTP_POST, - "/", buf->out_buf, payload_size, - &put_records_target_header, 1); - } - - if (c) { - flb_plg_debug(ctx->ins, "PutRecords http status=%d", c->resp.status); - - if (c->resp.status == 200) { - /* Kinesis API can return partial success- check response */ - if (c->resp.payload_size > 0) { - failed_records = process_api_response(ctx, c); - if (failed_records < 0) { - flb_plg_error(ctx->ins, "PutRecords response " - "could not be parsed, %s", - c->resp.payload); - flb_http_client_destroy(c); - return -1; - } - if (failed_records == num_records) { - flb_plg_error(ctx->ins, "PutRecords request returned " - "with no records successfully recieved, %s", - ctx->stream_name); - flb_http_client_destroy(c); - return -1; - } - if (failed_records > 0) { - flb_plg_error(ctx->ins, "%d out of %d records failed to be " - "delivered, will retry this batch, %s", - failed_records, num_records, - ctx->stream_name); - flb_http_client_destroy(c); - return -1; - } - } - flb_plg_debug(ctx->ins, "Sent events to %s", ctx->stream_name); - flb_http_client_destroy(c); - return 0; - } - - /* Check error */ - if (c->resp.payload_size > 0) { - error = flb_aws_error(c->resp.payload, c->resp.payload_size); - if (error != NULL) { - if (strcmp(error, ERR_CODE_EXCEEDED_THROUGHPUT) == 0) { - flb_plg_error(ctx->ins, "Throughput limits for %s " - "may have been exceeded.", - ctx->stream_name); - } - if (strncmp(error, "SerializationException", 22) == 0) { - /* - * If this happens, we habe a bug in the code - * User should send us the output to debug - */ - flb_plg_error(ctx->ins, "<<------Bug in Code------>>"); - printf("Malformed request: %s", buf->out_buf); - } - flb_aws_print_error(c->resp.payload, c->resp.payload_size, - "PutRecords", ctx->ins); - flb_sds_destroy(error); - } - else { - /* error could not be parsed, print raw response to debug */ - flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload); - } - } - } - - flb_plg_error(ctx->ins, "Failed to send log records to %s", ctx->stream_name); - if (c) { - flb_http_client_destroy(c); - } - return -1; -} - - -void kinesis_flush_destroy(struct flush *buf) -{ - if (buf) { - flb_free(buf->tmp_buf); - flb_free(buf->out_buf); - flb_free(buf->events); - flb_free(buf->event_buf); - flb_free(buf); - } -} diff --git a/fluent-bit/plugins/out_kinesis_streams/kinesis_api.h b/fluent-bit/plugins/out_kinesis_streams/kinesis_api.h deleted file mode 100644 index e44de6d4..00000000 --- a/fluent-bit/plugins/out_kinesis_streams/kinesis_api.h +++ /dev/null @@ -1,44 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_OUT_KINESIS_API -#define FLB_OUT_KINESIS_API - -#define PUT_RECORDS_PAYLOAD_SIZE 5242880 -#define MAX_EVENTS_PER_PUT 500 -#define MAX_EVENT_SIZE 1048556 /* 1048576 - 20 bytes for partition key */ - -/* number of characters needed to 'start' a PutRecords payload */ -#define PUT_RECORDS_HEADER_LEN 30 -/* number of characters needed per record in a PutRecords payload */ -#define PUT_RECORDS_PER_RECORD_LEN 48 -/* number of characters needed to 'end' a PutRecords payload */ -#define PUT_RECORDS_FOOTER_LEN 4 - -#include "kinesis.h" - -void kinesis_flush_destroy(struct flush *buf); - -int process_and_send_to_kinesis(struct flb_kinesis *ctx, struct flush *buf, - const char *data, size_t bytes); - -int put_records(struct flb_kinesis *ctx, struct flush *buf, - size_t payload_size, int num_records); - -#endif |