summaryrefslogtreecommitdiffstats
path: root/exporting/aws_kinesis/aws_kinesis_put_record.cc
blob: 62c6b030124ed31b8964474fa9469ff33d98ee3c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// SPDX-License-Identifier: GPL-3.0-or-later

#include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/utils/Outcome.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/kinesis/model/PutRecordRequest.h>
#include "aws_kinesis_put_record.h"

using namespace Aws;

static SDKOptions options;

struct request_outcome {
    Kinesis::Model::PutRecordOutcomeCallable future_outcome;
    size_t data_len;
};

/**
 * Initialize AWS SDK API
 */
void aws_sdk_init()
{
    InitAPI(options);
}

/**
 * Shutdown AWS SDK API
 */
void aws_sdk_shutdown()
{
    ShutdownAPI(options);
}

/**
 * Initialize a client and a data structure for request outcomes
 *
 * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
 * @param region AWS region.
 * @param access_key_id AWS account access key ID.
 * @param secret_key AWS account secret access key.
 * @param timeout communication timeout.
 */
void kinesis_init(
    void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key,
    const long timeout)
{
    struct aws_kinesis_specific_data *kinesis_specific_data =
        (struct aws_kinesis_specific_data *)kinesis_specific_data_p;

    Client::ClientConfiguration config;

    config.region = region;
    config.requestTimeoutMs = timeout;
    config.connectTimeoutMs = timeout;

    Kinesis::KinesisClient *client;

    if (access_key_id && *access_key_id && secret_key && *secret_key) {
        client = New<Kinesis::KinesisClient>("client", Auth::AWSCredentials(access_key_id, secret_key), config);
    } else {
        client = New<Kinesis::KinesisClient>("client", config);
    }
    kinesis_specific_data->client = (void *)client;

    Vector<request_outcome> *request_outcomes;

    request_outcomes = new Vector<request_outcome>;
    kinesis_specific_data->request_outcomes = (void *)request_outcomes;
}

/**
 * Deallocate Kinesis specific data
 *
 * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
 */
void kinesis_shutdown(void *kinesis_specific_data_p)
{
    struct aws_kinesis_specific_data *kinesis_specific_data =
        (struct aws_kinesis_specific_data *)kinesis_specific_data_p;

    Delete((Kinesis::KinesisClient *)kinesis_specific_data->client);
    delete (Vector<request_outcome> *)kinesis_specific_data->request_outcomes;
}

/**
 * Send data to the Kinesis service
 *
 * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
 * @param stream_name the name of a stream to send to.
 * @param partition_key a partition key which automatically maps data to a specific stream.
 * @param data a data buffer to send to the stream.
 * @param data_len the length of the data buffer.
 */
void kinesis_put_record(
    void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data,
    size_t data_len)
{
    struct aws_kinesis_specific_data *kinesis_specific_data =
        (struct aws_kinesis_specific_data *)kinesis_specific_data_p;
    Kinesis::Model::PutRecordRequest request;

    request.SetStreamName(stream_name);
    request.SetPartitionKey(partition_key);
    request.SetData(Utils::ByteBuffer((unsigned char *)data, data_len));

    ((Vector<request_outcome> *)(kinesis_specific_data->request_outcomes))->push_back(
        { ((Kinesis::KinesisClient *)(kinesis_specific_data->client))->PutRecordCallable(request), data_len });
}

/**
 * Get results from service responses
 *
 * @param request_outcomes_p request outcome information.
 * @param error_message report error message to a caller.
 * @param sent_bytes report to a caller how many bytes was successfully sent.
 * @param lost_bytes report to a caller how many bytes was lost during transmission.
 * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission
 */
int kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes)
{
    Vector<request_outcome> *request_outcomes = (Vector<request_outcome> *)request_outcomes_p;
    Kinesis::Model::PutRecordOutcome outcome;
    *sent_bytes = 0;
    *lost_bytes = 0;

    for (auto request_outcome = request_outcomes->begin(); request_outcome != request_outcomes->end();) {
        std::future_status status = request_outcome->future_outcome.wait_for(std::chrono::microseconds(100));

        if (status == std::future_status::ready || status == std::future_status::deferred) {
            outcome = request_outcome->future_outcome.get();
            *sent_bytes += request_outcome->data_len;

            if (!outcome.IsSuccess()) {
                *lost_bytes += request_outcome->data_len;
                outcome.GetError().GetMessage().copy(error_message, ERROR_LINE_MAX);
            }

            request_outcomes->erase(request_outcome);
        } else {
            ++request_outcome;
        }
    }

    if (*lost_bytes) {
        return 1;
    }

    return 0;
}