From 5da14042f70711ea5cf66e034699730335462f66 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 14:08:03 +0200 Subject: Merging upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- .../plugins/out_kinesis_streams/kinesis_api.c | 987 +++++++++++++++++++++ 1 file changed, 987 insertions(+) create mode 100644 src/fluent-bit/plugins/out_kinesis_streams/kinesis_api.c (limited to 'src/fluent-bit/plugins/out_kinesis_streams/kinesis_api.c') diff --git a/src/fluent-bit/plugins/out_kinesis_streams/kinesis_api.c b/src/fluent-bit/plugins/out_kinesis_streams/kinesis_api.c new file mode 100644 index 000000000..9124657bc --- /dev/null +++ b/src/fluent-bit/plugins/out_kinesis_streams/kinesis_api.c @@ -0,0 +1,987 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#ifndef FLB_SYSTEM_WINDOWS +#include +#endif + +#include "kinesis_api.h" + +#define ERR_CODE_EXCEEDED_THROUGHPUT "ProvisionedThroughputExceededException" + +static struct flb_aws_header put_records_target_header = { + .key = "X-Amz-Target", + .key_len = 12, + .val = "Kinesis_20131202.PutRecords", + .val_len = 27, +}; + +static inline int try_to_write(char *buf, int *off, size_t left, + const char *str, size_t str_len) +{ + if (str_len <= 0){ + str_len = strlen(str); + } + if (left <= *off+str_len) { + return FLB_FALSE; + } + memcpy(buf+*off, str, str_len); + *off += str_len; + return FLB_TRUE; +} + +/* + * Writes the "header" for a put_records payload + */ +static int init_put_payload(struct flb_kinesis *ctx, struct flush *buf, + int *offset) +{ + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "{\"StreamName\":\"", 15)) { + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + ctx->stream_name, 0)) { + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\",\"Records\":[", 13)) { + goto error; + } + return 0; + +error: + return -1; +} + +/* + * Simple and fast hashing algorithm to create random partition keys + */ +static flb_sds_t random_partition_key(const char *tag) +{ + int c; + unsigned long hash = 5381; + unsigned long hash2 = 5381; + flb_sds_t hash_str; + flb_sds_t tmp; + struct flb_time tm; + + /* get current time */ + flb_time_get(&tm); + + /* compose hash */ + while ((c = *tag++)) { + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + } + hash2 = (unsigned long) hash2 * tm.tm.tv_sec * tm.tm.tv_nsec; + + /* flb_sds_printf allocs if the incoming sds is not at least 64 bytes */ + hash_str = flb_sds_create_size(64); + if (!hash_str) { + flb_errno(); + return NULL; + } + tmp = flb_sds_printf(&hash_str, "%lu%lu", hash % 7919, hash2 % 7919); + if (!tmp) { + flb_errno(); + flb_sds_destroy(hash_str); + return NULL; + } + hash_str = tmp; + + return hash_str; +} + +/* + * Writes a log event to the output buffer + */ +static int write_event(struct flb_kinesis *ctx, struct flush *buf, + struct kinesis_event *event, int *offset) +{ + flb_sds_t tag_timestamp = NULL; + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "{\"Data\":\"", 9)) { + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + event->json, event->len)) { + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\",\"PartitionKey\":\"", 18)) { + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + ctx->uuid, 10)) { + goto error; + } + + tag_timestamp = random_partition_key(buf->tag); + if (!tag_timestamp) { + flb_plg_error(ctx->ins, "failed to generate partition key for %s", buf->tag); + goto error; + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + tag_timestamp, 0)) { + flb_sds_destroy(tag_timestamp); + goto error; + } + flb_sds_destroy(tag_timestamp); + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"}", 2)) { + goto error; + } + + return 0; + +error: + return -1; +} + +/* Terminates a PutRecords payload */ +static int end_put_payload(struct flb_kinesis *ctx, struct flush *buf, + int *offset) +{ + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "]}", 2)) { + return -1; + } + buf->out_buf[*offset] = '\0'; + + return 0; +} + + +/* + * Processes the msgpack object + * -1 = failure, record not added + * 0 = success, record added + * 1 = we ran out of space, send and retry + * 2 = record could not be processed, discard it + * Returns 0 on success, -1 on general errors, + * and 1 if we ran out of space to write the event + * which means a send must occur + */ +static int process_event(struct flb_kinesis *ctx, struct flush *buf, + const msgpack_object *obj, struct flb_time *tms) +{ + size_t written = 0; + int ret; + size_t size; + size_t b64_len; + struct kinesis_event *event; + char *tmp_buf_ptr; + char *time_key_ptr; + struct tm time_stamp; + struct tm *tmp; + size_t len; + size_t tmp_size; + char *out_buf; + + tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; + ret = flb_msgpack_to_json(tmp_buf_ptr, + buf->tmp_buf_size - buf->tmp_buf_offset, + obj); + if (ret <= 0) { + /* + * negative value means failure to write to buffer, + * which means we ran out of space, and must send the logs + * + * TODO: This could also incorrectly be triggered if the record + * is larger than MAX_EVENT_SIZE + */ + return 1; + } + written = (size_t) ret; + + /* Discard empty messages (written == 2 means '""') */ + if (written <= 2) { + flb_plg_debug(ctx->ins, "Found empty log message, %s", ctx->stream_name); + return 2; + } + + if (ctx->log_key) { + /* + * flb_msgpack_to_json will encase the value in quotes + * We don't want that for log_key, so we ignore the first + * and last character + */ + written -= 2; + tmp_buf_ptr++; /* pass over the opening quote */ + buf->tmp_buf_offset++; + } + + /* is (written + 1) because we still have to append newline */ + if ((written + 1) >= MAX_EVENT_SIZE) { + flb_plg_warn(ctx->ins, "[size=%zu] Discarding record which is larger than " + "max size allowed by Kinesis, %s", written + 1, + ctx->stream_name); + return 2; + } + + if (ctx->time_key) { + /* append time_key to end of json string */ + tmp = gmtime_r(&tms->tm.tv_sec, &time_stamp); + if (!tmp) { + flb_plg_error(ctx->ins, "Could not create time stamp for %lu unix " + "seconds, discarding record, %s", tms->tm.tv_sec, + ctx->stream_name); + return 2; + } + + /* format time output and return the length */ + len = flb_aws_strftime_precision(&out_buf, ctx->time_key_format, tms); + + /* how much space do we have left */ + tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written; + if (len > tmp_size) { + /* not enough space - tell caller to retry */ + flb_free(out_buf); + return 1; + } + + if (len == 0) { + /* + * when the length of out_buf is not enough for time_key_format, + * time_key will not be added to record. + */ + flb_plg_error(ctx->ins, "Failed to add time_key %s to record, %s", + ctx->time_key, ctx->stream_name); + flb_free(out_buf); + } + else { + time_key_ptr = tmp_buf_ptr + written - 1; + memcpy(time_key_ptr, ",", 1); + time_key_ptr++; + memcpy(time_key_ptr, "\"", 1); + time_key_ptr++; + memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key)); + time_key_ptr += strlen(ctx->time_key); + memcpy(time_key_ptr, "\":\"", 3); + time_key_ptr += 3; + + /* merge out_buf to time_key_ptr */ + memcpy(time_key_ptr, out_buf, len); + flb_free(out_buf); + time_key_ptr += len; + memcpy(time_key_ptr, "\"}", 2); + time_key_ptr += 2; + written = (time_key_ptr - tmp_buf_ptr); + } + } + + /* is (written + 1) because we still have to append newline */ + if ((written + 1) >= MAX_EVENT_SIZE) { + flb_plg_warn(ctx->ins, "[size=%zu] Discarding record which is larger than " + "max size allowed by Kinesis, %s", written + 1, + ctx->stream_name); + return 2; + } + + /* append newline to record */ + + tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written; + if (tmp_size <= 1) { + /* no space left- tell caller to retry */ + return 1; + } + + memcpy(tmp_buf_ptr + written, "\n", 1); + written++; + + /* + * check if event_buf is initialized and big enough + * Base64 encoding will increase size by ~4/3 + */ + size = (written * 1.5) + 4; + if (buf->event_buf == NULL || buf->event_buf_size < size) { + flb_free(buf->event_buf); + buf->event_buf = flb_malloc(size); + buf->event_buf_size = size; + if (buf->event_buf == NULL) { + flb_errno(); + return -1; + } + } + + tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; + ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, + (unsigned char *) tmp_buf_ptr, written); + if (ret != 0) { + flb_errno(); + return -1; + } + written = b64_len; + + tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; + if ((buf->tmp_buf_size - buf->tmp_buf_offset) < written) { + /* not enough space, send logs */ + return 1; + } + + /* copy serialized json to tmp_buf */ + memcpy(tmp_buf_ptr, buf->event_buf, written); + + buf->tmp_buf_offset += written; + event = &buf->events[buf->event_index]; + event->json = tmp_buf_ptr; + event->len = written; + event->timestamp.tv_sec = tms->tm.tv_sec; + event->timestamp.tv_nsec = tms->tm.tv_nsec; + + return 0; +} + +/* Resets or inits a flush struct */ +static void reset_flush_buf(struct flb_kinesis *ctx, struct flush *buf) { + buf->event_index = 0; + buf->tmp_buf_offset = 0; + buf->data_size = PUT_RECORDS_HEADER_LEN + PUT_RECORDS_FOOTER_LEN; + buf->data_size += strlen(ctx->stream_name); +} + +/* constructs a put payload, and then sends */ +static int send_log_events(struct flb_kinesis *ctx, struct flush *buf) { + int ret; + int offset; + int i; + struct kinesis_event *event; + + if (buf->event_index <= 0) { + /* + * event_index should always be 1 more than the actual last event index + * when this function is called. + * Except in the case where send_log_events() is called at the end of + * process_and_send_to_kinesis. If all records were already sent, event_index + * will be 0. Hence this check. + */ + return 0; + } + + /* alloc out_buf if needed */ + if (buf->out_buf == NULL || buf->out_buf_size < buf->data_size) { + if (buf->out_buf != NULL) { + flb_free(buf->out_buf); + } + buf->out_buf = flb_malloc(buf->data_size + 1); + if (!buf->out_buf) { + flb_errno(); + return -1; + } + buf->out_buf_size = buf->data_size; + } + + offset = 0; + ret = init_put_payload(ctx, buf, &offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize PutRecords payload, %s", + ctx->stream_name); + return -1; + } + + for (i = 0; i < buf->event_index; i++) { + event = &buf->events[i]; + ret = write_event(ctx, buf, event, &offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to write log record %d to " + "payload buffer, %s", i, ctx->stream_name); + return -1; + } + if (i != (buf->event_index -1)) { + if (!try_to_write(buf->out_buf, &offset, buf->out_buf_size, + ",", 1)) { + flb_plg_error(ctx->ins, "Could not terminate record with ','"); + return -1; + } + } + } + + ret = end_put_payload(ctx, buf, &offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not complete PutRecords payload"); + return -1; + } + flb_plg_debug(ctx->ins, "kinesis:PutRecords: events=%d, payload=%d bytes", i, offset); + ret = put_records(ctx, buf, (size_t) offset, i); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to send log records"); + return -1; + } + buf->records_sent += i; + + return 0; +} + +/* + * Processes the msgpack object, sends the current batch if needed + */ +static int add_event(struct flb_kinesis *ctx, struct flush *buf, + const msgpack_object *obj, struct flb_time *tms) +{ + int ret; + struct kinesis_event *event; + int retry_add = FLB_FALSE; + size_t event_bytes = 0; + + if (buf->event_index == 0) { + /* init */ + reset_flush_buf(ctx, buf); + } + +retry_add_event: + retry_add = FLB_FALSE; + ret = process_event(ctx, buf, obj, tms); + if (ret < 0) { + return -1; + } + else if (ret == 1) { + if (buf->event_index <= 0) { + /* somehow the record was larger than our entire request buffer */ + flb_plg_warn(ctx->ins, "Discarding massive log record, %s", + ctx->stream_name); + return 0; /* discard this record and return to caller */ + } + /* send logs and then retry the add */ + retry_add = FLB_TRUE; + goto send; + } else if (ret == 2) { + /* discard this record and return to caller */ + flb_plg_warn(ctx->ins, "Discarding large or unprocessable record, %s", + ctx->stream_name); + return 0; + } + + event = &buf->events[buf->event_index]; + event_bytes = event->len + PUT_RECORDS_PER_RECORD_LEN; + + if ((buf->data_size + event_bytes) > PUT_RECORDS_PAYLOAD_SIZE) { + if (buf->event_index <= 0) { + /* somehow the record was larger than our entire request buffer */ + flb_plg_warn(ctx->ins, "[size=%zu] Discarding massive log record, %s", + event_bytes, ctx->stream_name); + return 0; /* discard this record and return to caller */ + } + /* do not send this event */ + retry_add = FLB_TRUE; + goto send; + } + + /* send is not needed yet, return to caller */ + buf->data_size += event_bytes; + buf->event_index++; + + if (buf->event_index == MAX_EVENTS_PER_PUT) { + goto send; + } + + return 0; + +send: + ret = send_log_events(ctx, buf); + reset_flush_buf(ctx, buf); + if (ret < 0) { + return -1; + } + + if (retry_add == FLB_TRUE) { + goto retry_add_event; + } + + return 0; +} + +/* + * Main routine- processes msgpack and sends in batches + * return value is the number of events processed (number sent is stored in buf) + */ +int process_and_send_to_kinesis(struct flb_kinesis *ctx, struct flush *buf, + const char *data, size_t bytes) +{ + int i = 0; + size_t map_size; + msgpack_object map; + msgpack_object_kv *kv; + msgpack_object key; + msgpack_object val; + char *key_str = NULL; + size_t key_str_size = 0; + int j; + int ret; + int check = FLB_FALSE; + int found = FLB_FALSE; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + return -1; + } + + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + map = *log_event.body; + map_size = map.via.map.size; + + if (ctx->log_key) { + key_str = NULL; + key_str_size = 0; + check = FLB_FALSE; + found = FLB_FALSE; + + kv = map.via.map.ptr; + + for(j=0; j < map_size; j++) { + key = (kv+j)->key; + if (key.type == MSGPACK_OBJECT_BIN) { + key_str = (char *) key.via.bin.ptr; + key_str_size = key.via.bin.size; + check = FLB_TRUE; + } + if (key.type == MSGPACK_OBJECT_STR) { + key_str = (char *) key.via.str.ptr; + key_str_size = key.via.str.size; + check = FLB_TRUE; + } + + if (check == FLB_TRUE) { + if (strncmp(ctx->log_key, key_str, key_str_size) == 0) { + found = FLB_TRUE; + val = (kv+j)->val; + ret = add_event(ctx, buf, &val, &log_event.timestamp); + if (ret < 0 ) { + goto error; + } + } + } + + } + if (found == FLB_FALSE) { + flb_plg_error(ctx->ins, "Could not find log_key '%s' in record, %s", + ctx->log_key, ctx->stream_name); + } + else { + i++; + } + continue; + } + + ret = add_event(ctx, buf, &map, &log_event.timestamp); + if (ret < 0 ) { + goto error; + } + i++; + } + + flb_log_event_decoder_destroy(&log_decoder); + + /* send any remaining events */ + ret = send_log_events(ctx, buf); + reset_flush_buf(ctx, buf); + if (ret < 0) { + return -1; + } + + /* return number of events processed */ + buf->records_processed = i; + return i; + +error: + flb_log_event_decoder_destroy(&log_decoder); + + return -1; +} + +/* + * Returns number of failed records on success, -1 on failure + */ +static int process_api_response(struct flb_kinesis *ctx, + struct flb_http_client *c) +{ + int i; + int k; + int w; + int ret; + int failed_records = -1; + int root_type; + char *out_buf; + int throughput_exceeded = FLB_FALSE; + size_t off = 0; + size_t out_size; + msgpack_unpacked result; + msgpack_object root; + msgpack_object key; + msgpack_object val; + msgpack_object response; + msgpack_object response_key; + msgpack_object response_val; + + if (strstr(c->resp.payload, "\"FailedRecordCount\":0")) { + return 0; + } + + /* Convert JSON payload to msgpack */ + ret = flb_pack_json(c->resp.payload, c->resp.payload_size, + &out_buf, &out_size, &root_type, NULL); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not pack/validate JSON API response\n%s", + c->resp.payload); + return -1; + } + + /* Lookup error field */ + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, out_buf, out_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s", + c->resp.payload); + failed_records = -1; + goto done; + } + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected payload type=%i", + root.type); + failed_records = -1; + goto done; + } + + for (i = 0; i < root.via.map.size; i++) { + key = root.via.map.ptr[i].key; + if (key.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "unexpected key type=%i", + key.type); + failed_records = -1; + goto done; + } + + if (key.via.str.size >= 14 && + strncmp(key.via.str.ptr, "FailedRecordCount", 14) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + flb_plg_error(ctx->ins, "unexpected 'FailedRecordCount' value type=%i", + val.type); + failed_records = -1; + goto done; + } + + failed_records = val.via.u64; + if (failed_records == 0) { + /* no need to check RequestResponses field */ + goto done; + } + } + + if (key.via.str.size >= 14 && + strncmp(key.via.str.ptr, "Records", 7) == 0) { + val = root.via.map.ptr[i].val; + if (val.type != MSGPACK_OBJECT_ARRAY) { + flb_plg_error(ctx->ins, "unexpected 'Records' value type=%i", + val.type); + failed_records = -1; + goto done; + } + + if (val.via.array.size == 0) { + flb_plg_error(ctx->ins, "'Records' field in response is empty"); + failed_records = -1; + goto done; + } + + for (k = 0; k < val.via.array.size; k++) { + /* iterate through the responses */ + response = val.via.array.ptr[k]; + if (response.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected 'Records[%d]' value type=%i", + k, response.type); + failed_records = -1; + goto done; + } + for (w = 0; w < response.via.map.size; w++) { + /* iterate through the response's keys */ + response_key = response.via.map.ptr[w].key; + if (response_key.type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "unexpected key type=%i", + response_key.type); + failed_records = -1; + goto done; + } + if (response_key.via.str.size >= 9 && + strncmp(response_key.via.str.ptr, "ErrorCode", 9) == 0) { + response_val = response.via.map.ptr[w].val; + if (!throughput_exceeded && + response_val.via.str.size >= 38 && + (strncmp(response_val.via.str.ptr, + ERR_CODE_EXCEEDED_THROUGHPUT, 38) == 0)) { + throughput_exceeded = FLB_TRUE; + flb_plg_error(ctx->ins, "Throughput limits may have been exceeded, %s", + ctx->stream_name); + } + flb_plg_debug(ctx->ins, "Record %i failed with err_code=%.*s", + k, response_val.via.str.size, + response_val.via.str.ptr); + } + if (response_key.via.str.size >= 12 && + strncmp(response_key.via.str.ptr, "ErrorMessage", 12) == 0) { + response_val = response.via.map.ptr[w].val; + flb_plg_debug(ctx->ins, "Record %i failed with err_msg=%.*s", + k, response_val.via.str.size, + response_val.via.str.ptr); + } + } + } + } + } + + done: + flb_free(out_buf); + msgpack_unpacked_destroy(&result); + return failed_records; +} + +static int plugin_under_test() +{ + if (getenv("FLB_KINESIS_PLUGIN_UNDER_TEST") != NULL) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static char *mock_error_response(char *error_env_var) +{ + char *err_val = NULL; + char *error = NULL; + int len = 0; + + err_val = getenv(error_env_var); + if (err_val != NULL && strlen(err_val) > 0) { + error = flb_malloc(strlen(err_val) + sizeof(char)); + if (error == NULL) { + flb_errno(); + return NULL; + } + + len = strlen(err_val); + memcpy(error, err_val, len); + error[len] = '\0'; + return error; + } + + return NULL; +} + +static int partial_success() +{ + char *err_val = NULL; + + err_val = getenv("PARTIAL_SUCCESS_CASE"); + if (err_val != NULL && strlen(err_val) > 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static struct flb_http_client *mock_http_call(char *error_env_var) +{ + /* create an http client so that we can set the response */ + struct flb_http_client *c = NULL; + char *error = mock_error_response(error_env_var); + + c = flb_calloc(1, sizeof(struct flb_http_client)); + if (!c) { + flb_errno(); + flb_free(error); + return NULL; + } + mk_list_init(&c->headers); + + if (error != NULL) { + c->resp.status = 400; + /* resp.data is freed on destroy, payload is supposed to reference it */ + c->resp.data = error; + c->resp.payload = c->resp.data; + c->resp.payload_size = strlen(error); + } + else { + c->resp.status = 200; + c->resp.payload = ""; + c->resp.payload_size = 0; + if (partial_success() == FLB_TRUE) { + /* mocked partial failure response */ + c->resp.payload = "{\"FailedRecordCount\":2,\"Records\":[{\"SequenceNumber\":\"49543463076548007577105092703039560359975228518395012686\",\"ShardId\":\"shardId-000000000000\"},{\"ErrorCode\":\"ProvisionedThroughputExceededException\",\"ErrorMessage\":\"Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111.\"},{\"ErrorCode\":\"InternalFailure\",\"ErrorMessage\":\"Internal service failure.\"}]}"; + c->resp.payload_size = strlen(c->resp.payload); + } + else { + /* mocked success response */ + c->resp.payload = "{\"FailedRecordCount\":0,\"Records\":[{\"SequenceNumber\":\"49543463076548007577105092703039560359975228518395019266\",\"ShardId\":\"shardId-000000000000\"},{\"SequenceNumber\":\"49543463076570308322303623326179887152428262250726293522\",\"ShardId\":\"shardId-000000000001\"},{\"SequenceNumber\":\"49543463076570308322303623326179887152428262250726293588\",\"ShardId\":\"shardId-000000000003\"}]}"; + c->resp.payload_size = strlen(c->resp.payload); + } + } + + return c; +} + + +/* + * Returns -1 on failure, 0 on success + */ +int put_records(struct flb_kinesis *ctx, struct flush *buf, + size_t payload_size, int num_records) +{ + + struct flb_http_client *c = NULL; + struct flb_aws_client *kinesis_client; + flb_sds_t error; + int failed_records = 0; + + flb_plg_debug(ctx->ins, "Sending log records to stream %s", + ctx->stream_name); + + if (plugin_under_test() == FLB_TRUE) { + c = mock_http_call("TEST_PUT_RECORDS_ERROR"); + } + else { + kinesis_client = ctx->kinesis_client; + c = kinesis_client->client_vtable->request(kinesis_client, FLB_HTTP_POST, + "/", buf->out_buf, payload_size, + &put_records_target_header, 1); + } + + if (c) { + flb_plg_debug(ctx->ins, "PutRecords http status=%d", c->resp.status); + + if (c->resp.status == 200) { + /* Kinesis API can return partial success- check response */ + if (c->resp.payload_size > 0) { + failed_records = process_api_response(ctx, c); + if (failed_records < 0) { + flb_plg_error(ctx->ins, "PutRecords response " + "could not be parsed, %s", + c->resp.payload); + flb_http_client_destroy(c); + return -1; + } + if (failed_records == num_records) { + flb_plg_error(ctx->ins, "PutRecords request returned " + "with no records successfully recieved, %s", + ctx->stream_name); + flb_http_client_destroy(c); + return -1; + } + if (failed_records > 0) { + flb_plg_error(ctx->ins, "%d out of %d records failed to be " + "delivered, will retry this batch, %s", + failed_records, num_records, + ctx->stream_name); + flb_http_client_destroy(c); + return -1; + } + } + flb_plg_debug(ctx->ins, "Sent events to %s", ctx->stream_name); + flb_http_client_destroy(c); + return 0; + } + + /* Check error */ + if (c->resp.payload_size > 0) { + error = flb_aws_error(c->resp.payload, c->resp.payload_size); + if (error != NULL) { + if (strcmp(error, ERR_CODE_EXCEEDED_THROUGHPUT) == 0) { + flb_plg_error(ctx->ins, "Throughput limits for %s " + "may have been exceeded.", + ctx->stream_name); + } + if (strncmp(error, "SerializationException", 22) == 0) { + /* + * If this happens, we habe a bug in the code + * User should send us the output to debug + */ + flb_plg_error(ctx->ins, "<<------Bug in Code------>>"); + printf("Malformed request: %s", buf->out_buf); + } + flb_aws_print_error(c->resp.payload, c->resp.payload_size, + "PutRecords", ctx->ins); + flb_sds_destroy(error); + } + else { + /* error could not be parsed, print raw response to debug */ + flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload); + } + } + } + + flb_plg_error(ctx->ins, "Failed to send log records to %s", ctx->stream_name); + if (c) { + flb_http_client_destroy(c); + } + return -1; +} + + +void kinesis_flush_destroy(struct flush *buf) +{ + if (buf) { + flb_free(buf->tmp_buf); + flb_free(buf->out_buf); + flb_free(buf->events); + flb_free(buf->event_buf); + flb_free(buf); + } +} -- cgit v1.2.3