From 54deae27eed83a162ee438ef6bad4a23767757dd Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Tue, 21 May 2019 20:56:05 +0200 Subject: Merging upstream version 1.15.0. Signed-off-by: Daniel Baumann --- backends/aws_kinesis/aws_kinesis_put_record.cc | 87 ++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 backends/aws_kinesis/aws_kinesis_put_record.cc (limited to 'backends/aws_kinesis/aws_kinesis_put_record.cc') diff --git a/backends/aws_kinesis/aws_kinesis_put_record.cc b/backends/aws_kinesis/aws_kinesis_put_record.cc new file mode 100644 index 000000000..0c8ece68b --- /dev/null +++ b/backends/aws_kinesis/aws_kinesis_put_record.cc @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include +#include +#include +#include +#include +#include "aws_kinesis_put_record.h" + +using namespace Aws; + +SDKOptions options; + +Kinesis::KinesisClient *client; + +struct request_outcome { + Kinesis::Model::PutRecordOutcomeCallable future_outcome; + size_t data_len; +}; + +Vector request_outcomes; + +void kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout) { + InitAPI(options); + + Client::ClientConfiguration config; + + config.region = region; + config.requestTimeoutMs = timeout; + config.connectTimeoutMs = timeout; + + if(access_key_id && *access_key_id && secret_key && *secret_key) { + client = New("client", Auth::AWSCredentials(access_key_id, secret_key), config); + } else { + client = New("client", config); + } +} + +void kinesis_shutdown() { + Delete(client); + + ShutdownAPI(options); +} + +int kinesis_put_record(const char *stream_name, const char *partition_key, + const char *data, size_t data_len) { + Kinesis::Model::PutRecordRequest request; + + request.SetStreamName(stream_name); + request.SetPartitionKey(partition_key); + request.SetData(Utils::ByteBuffer((unsigned char*) data, data_len)); + + request_outcomes.push_back({client->PutRecordCallable(request), data_len}); + + return 0; +} + +int kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes) { + Kinesis::Model::PutRecordOutcome outcome; + *sent_bytes = 0; + *lost_bytes = 0; + + for(auto request_outcome = request_outcomes.begin(); request_outcome != request_outcomes.end(); ) { + std::future_status status = request_outcome->future_outcome.wait_for(std::chrono::microseconds(100)); + + if(status == std::future_status::ready || status == std::future_status::deferred) { + outcome = request_outcome->future_outcome.get(); + *sent_bytes += request_outcome->data_len; + + if(!outcome.IsSuccess()) { + *lost_bytes += request_outcome->data_len; + outcome.GetError().GetMessage().copy(error_message, ERROR_LINE_MAX); + } + + request_outcomes.erase(request_outcome); + } else { + ++request_outcome; + } + } + + if(*lost_bytes) { + return 1; + } + + return 0; +} \ No newline at end of file -- cgit v1.2.3