diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-21 17:19:04 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-21 17:19:04 +0000 |
commit | 310edf444908b09ea6d00c03baceb7925f3bb7a2 (patch) | |
tree | 7064577c7fa7a851e2e930beb606ea8237b0bbd2 /exporting/aws_kinesis/aws_kinesis_put_record.cc | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-310edf444908b09ea6d00c03baceb7925f3bb7a2.tar.xz netdata-310edf444908b09ea6d00c03baceb7925f3bb7a2.zip |
Merging upstream version 1.45.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'exporting/aws_kinesis/aws_kinesis_put_record.cc')
-rw-r--r-- | exporting/aws_kinesis/aws_kinesis_put_record.cc | 151 |
1 files changed, 0 insertions, 151 deletions
diff --git a/exporting/aws_kinesis/aws_kinesis_put_record.cc b/exporting/aws_kinesis/aws_kinesis_put_record.cc deleted file mode 100644 index 62c6b0301..000000000 --- a/exporting/aws_kinesis/aws_kinesis_put_record.cc +++ /dev/null @@ -1,151 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include <aws/core/Aws.h> -#include <aws/core/client/ClientConfiguration.h> -#include <aws/core/auth/AWSCredentials.h> -#include <aws/core/utils/Outcome.h> -#include <aws/kinesis/KinesisClient.h> -#include <aws/kinesis/model/PutRecordRequest.h> -#include "aws_kinesis_put_record.h" - -using namespace Aws; - -static SDKOptions options; - -struct request_outcome { - Kinesis::Model::PutRecordOutcomeCallable future_outcome; - size_t data_len; -}; - -/** - * Initialize AWS SDK API - */ -void aws_sdk_init() -{ - InitAPI(options); -} - -/** - * Shutdown AWS SDK API - */ -void aws_sdk_shutdown() -{ - ShutdownAPI(options); -} - -/** - * Initialize a client and a data structure for request outcomes - * - * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information. - * @param region AWS region. - * @param access_key_id AWS account access key ID. - * @param secret_key AWS account secret access key. - * @param timeout communication timeout. - */ -void kinesis_init( - void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key, - const long timeout) -{ - struct aws_kinesis_specific_data *kinesis_specific_data = - (struct aws_kinesis_specific_data *)kinesis_specific_data_p; - - Client::ClientConfiguration config; - - config.region = region; - config.requestTimeoutMs = timeout; - config.connectTimeoutMs = timeout; - - Kinesis::KinesisClient *client; - - if (access_key_id && *access_key_id && secret_key && *secret_key) { - client = New<Kinesis::KinesisClient>("client", Auth::AWSCredentials(access_key_id, secret_key), config); - } else { - client = New<Kinesis::KinesisClient>("client", config); - } - kinesis_specific_data->client = (void *)client; - - Vector<request_outcome> *request_outcomes; - - request_outcomes = new Vector<request_outcome>; - kinesis_specific_data->request_outcomes = (void *)request_outcomes; -} - -/** - * Deallocate Kinesis specific data - * - * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information. - */ -void kinesis_shutdown(void *kinesis_specific_data_p) -{ - struct aws_kinesis_specific_data *kinesis_specific_data = - (struct aws_kinesis_specific_data *)kinesis_specific_data_p; - - Delete((Kinesis::KinesisClient *)kinesis_specific_data->client); - delete (Vector<request_outcome> *)kinesis_specific_data->request_outcomes; -} - -/** - * Send data to the Kinesis service - * - * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information. - * @param stream_name the name of a stream to send to. - * @param partition_key a partition key which automatically maps data to a specific stream. - * @param data a data buffer to send to the stream. - * @param data_len the length of the data buffer. - */ -void kinesis_put_record( - void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data, - size_t data_len) -{ - struct aws_kinesis_specific_data *kinesis_specific_data = - (struct aws_kinesis_specific_data *)kinesis_specific_data_p; - Kinesis::Model::PutRecordRequest request; - - request.SetStreamName(stream_name); - request.SetPartitionKey(partition_key); - request.SetData(Utils::ByteBuffer((unsigned char *)data, data_len)); - - ((Vector<request_outcome> *)(kinesis_specific_data->request_outcomes))->push_back( - { ((Kinesis::KinesisClient *)(kinesis_specific_data->client))->PutRecordCallable(request), data_len }); -} - -/** - * Get results from service responses - * - * @param request_outcomes_p request outcome information. - * @param error_message report error message to a caller. - * @param sent_bytes report to a caller how many bytes was successfully sent. - * @param lost_bytes report to a caller how many bytes was lost during transmission. - * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission - */ -int kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes) -{ - Vector<request_outcome> *request_outcomes = (Vector<request_outcome> *)request_outcomes_p; - Kinesis::Model::PutRecordOutcome outcome; - *sent_bytes = 0; - *lost_bytes = 0; - - for (auto request_outcome = request_outcomes->begin(); request_outcome != request_outcomes->end();) { - std::future_status status = request_outcome->future_outcome.wait_for(std::chrono::microseconds(100)); - - if (status == std::future_status::ready || status == std::future_status::deferred) { - outcome = request_outcome->future_outcome.get(); - *sent_bytes += request_outcome->data_len; - - if (!outcome.IsSuccess()) { - *lost_bytes += request_outcome->data_len; - outcome.GetError().GetMessage().copy(error_message, ERROR_LINE_MAX); - } - - request_outcomes->erase(request_outcome); - } else { - ++request_outcome; - } - } - - if (*lost_bytes) { - return 1; - } - - return 0; -} |