summaryrefslogtreecommitdiffstats
path: root/exporting/aws_kinesis
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 14:31:17 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 14:31:17 +0000
commit8020f71afd34d7696d7933659df2d763ab05542f (patch)
tree2fdf1b5447ffd8bdd61e702ca183e814afdcb4fc /exporting/aws_kinesis
parentInitial commit. (diff)
downloadnetdata-upstream.tar.xz
netdata-upstream.zip
Adding upstream version 1.37.1.upstream/1.37.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'exporting/aws_kinesis')
-rw-r--r--exporting/aws_kinesis/Makefile.am8
-rw-r--r--exporting/aws_kinesis/README.md58
-rw-r--r--exporting/aws_kinesis/aws_kinesis.c218
-rw-r--r--exporting/aws_kinesis/aws_kinesis.h16
-rw-r--r--exporting/aws_kinesis/aws_kinesis_put_record.cc151
-rw-r--r--exporting/aws_kinesis/aws_kinesis_put_record.h35
6 files changed, 486 insertions, 0 deletions
diff --git a/exporting/aws_kinesis/Makefile.am b/exporting/aws_kinesis/Makefile.am
new file mode 100644
index 0000000..161784b
--- /dev/null
+++ b/exporting/aws_kinesis/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/exporting/aws_kinesis/README.md b/exporting/aws_kinesis/README.md
new file mode 100644
index 0000000..29dd343
--- /dev/null
+++ b/exporting/aws_kinesis/README.md
@@ -0,0 +1,58 @@
+<!--
+title: "Export metrics to AWS Kinesis Data Streams"
+description: "Archive your Agent's metrics to AWS Kinesis Data Streams for long-term storage, further analysis, or correlation with data from other sources."
+custom_edit_url: https://github.com/netdata/netdata/edit/master/exporting/aws_kinesis/README.md
+sidebar_label: AWS Kinesis Data Streams
+-->
+
+# Export metrics to AWS Kinesis Data Streams
+
+## Prerequisites
+
+To use AWS Kinesis for metric collecting and processing, you should first
+[install](https://docs.aws.amazon.com/en_us/sdk-for-cpp/v1/developer-guide/setup.html) AWS SDK for C++.
+`libcrypto`, `libssl`, and `libcurl` are also required to compile Netdata with Kinesis support enabled. Next, Netdata
+should be re-installed from the source. The installer will detect that the required libraries are now available.
+
+If the AWS SDK for C++ is being installed from source, it is useful to set `-DBUILD_ONLY=kinesis`. Otherwise, the
+build process could take a very long time. Note, that the default installation path for the libraries is
+`/usr/local/lib64`. Many Linux distributions don't include this path as the default one for a library search, so it is
+advisable to use the following options to `cmake` while building the AWS SDK:
+
+```sh
+sudo cmake -DCMAKE_INSTALL_PREFIX=/usr -DBUILD_ONLY=kinesis <aws-sdk-cpp sources>
+```
+
+The `-DCMAKE_INSTALL_PREFIX=/usr` option also ensures that
+[third party dependencies](https://github.com/aws/aws-sdk-cpp#third-party-dependencies) are installed in your system
+during the SDK build process.
+
+## Configuration
+
+To enable data sending to the Kinesis service, run `./edit-config exporting.conf` in the Netdata configuration directory
+and set the following options:
+
+```conf
+[kinesis:my_instance]
+ enabled = yes
+ destination = us-east-1
+```
+
+Set the `destination` option to an AWS region.
+
+Set AWS credentials and stream name:
+
+```conf
+ # AWS credentials
+ aws_access_key_id = your_access_key_id
+ aws_secret_access_key = your_secret_access_key
+ # destination stream
+ stream name = your_stream_name
+```
+
+Alternatively, you can set AWS credentials for the `netdata` user using AWS SDK for C++ [standard methods](https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html).
+
+Netdata automatically computes a partition key for every record with the purpose to distribute records across
+available shards evenly.
+
+
diff --git a/exporting/aws_kinesis/aws_kinesis.c b/exporting/aws_kinesis/aws_kinesis.c
new file mode 100644
index 0000000..1d89cc7
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis.c
@@ -0,0 +1,218 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aws_kinesis.h"
+
+/**
+ * Clean AWS Kinesis *
+ */
+void aws_kinesis_cleanup(struct instance *instance)
+{
+ info("EXPORTING: cleaning up instance %s ...", instance->config.name);
+ kinesis_shutdown(instance->connector_specific_data);
+
+ freez(instance->connector_specific_data);
+
+ struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ if (connector_specific_config) {
+ freez(connector_specific_config->auth_key_id);
+ freez(connector_specific_config->secure_key);
+ freez(connector_specific_config->stream_name);
+
+ freez(connector_specific_config);
+ }
+
+ info("EXPORTING: instance %s exited", instance->config.name);
+ instance->exited = 1;
+}
+
+/**
+ * Initialize AWS Kinesis connector instance
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int init_aws_kinesis_instance(struct instance *instance)
+{
+ instance->worker = aws_kinesis_connector_worker;
+
+ instance->start_batch_formatting = NULL;
+ instance->start_host_formatting = format_host_labels_json_plaintext;
+ instance->start_chart_formatting = NULL;
+
+ if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
+ instance->metric_formatting = format_dimension_collected_json_plaintext;
+ else
+ instance->metric_formatting = format_dimension_stored_json_plaintext;
+
+ instance->end_chart_formatting = NULL;
+ instance->variables_formatting = NULL;
+ instance->end_host_formatting = flush_host_labels;
+ instance->end_batch_formatting = NULL;
+
+ instance->prepare_header = NULL;
+ instance->check_response = NULL;
+
+ instance->buffer = (void *)buffer_create(0);
+ if (!instance->buffer) {
+ error("EXPORTING: cannot create buffer for AWS Kinesis exporting connector instance %s", instance->config.name);
+ return 1;
+ }
+ if (uv_mutex_init(&instance->mutex))
+ return 1;
+ if (uv_cond_init(&instance->cond_var))
+ return 1;
+
+ if (!instance->engine->aws_sdk_initialized) {
+ aws_sdk_init();
+ instance->engine->aws_sdk_initialized = 1;
+ }
+
+ struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ struct aws_kinesis_specific_data *connector_specific_data = callocz(1, sizeof(struct aws_kinesis_specific_data));
+ instance->connector_specific_data = (void *)connector_specific_data;
+
+ if (!strcmp(connector_specific_config->stream_name, "")) {
+ error("stream name is a mandatory Kinesis parameter but it is not configured");
+ return 1;
+ }
+
+ kinesis_init(
+ (void *)connector_specific_data,
+ instance->config.destination,
+ connector_specific_config->auth_key_id,
+ connector_specific_config->secure_key,
+ instance->config.timeoutms);
+
+ return 0;
+}
+
+/**
+ * AWS Kinesis connector worker
+ *
+ * Runs in a separate thread for every instance.
+ *
+ * @param instance_p an instance data structure.
+ */
+void aws_kinesis_connector_worker(void *instance_p)
+{
+ struct instance *instance = (struct instance *)instance_p;
+ struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ struct aws_kinesis_specific_data *connector_specific_data = instance->connector_specific_data;
+
+ while (!instance->engine->exit) {
+ unsigned long long partition_key_seq = 0;
+ struct stats *stats = &instance->stats;
+
+ uv_mutex_lock(&instance->mutex);
+ while (!instance->data_is_ready)
+ uv_cond_wait(&instance->cond_var, &instance->mutex);
+ instance->data_is_ready = 0;
+
+ if (unlikely(instance->engine->exit)) {
+ uv_mutex_unlock(&instance->mutex);
+ break;
+ }
+
+ // reset the monitoring chart counters
+ stats->received_bytes =
+ stats->sent_bytes =
+ stats->sent_metrics =
+ stats->lost_metrics =
+ stats->receptions =
+ stats->transmission_successes =
+ stats->transmission_failures =
+ stats->data_lost_events =
+ stats->lost_bytes =
+ stats->reconnects = 0;
+
+ BUFFER *buffer = (BUFFER *)instance->buffer;
+ size_t buffer_len = buffer_strlen(buffer);
+
+ stats->buffered_bytes = buffer_len;
+
+ size_t sent = 0;
+
+ while (sent < buffer_len) {
+ char partition_key[KINESIS_PARTITION_KEY_MAX + 1];
+ snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++);
+ size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX);
+
+ const char *first_char = buffer_tostring(buffer) + sent;
+
+ size_t record_len = 0;
+
+ // split buffer into chunks of maximum allowed size
+ if (buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) {
+ record_len = buffer_len - sent;
+ } else {
+ record_len = KINESIS_RECORD_MAX - partition_key_len;
+ while (record_len && *(first_char + record_len - 1) != '\n')
+ record_len--;
+ }
+ char error_message[ERROR_LINE_MAX + 1] = "";
+
+ debug(
+ D_EXPORTING,
+ "EXPORTING: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \
+ buffer = %zu, record = %zu",
+ instance->config.destination,
+ connector_specific_config->auth_key_id,
+ connector_specific_config->secure_key,
+ connector_specific_config->stream_name,
+ partition_key,
+ buffer_len,
+ record_len);
+
+ kinesis_put_record(
+ connector_specific_data, connector_specific_config->stream_name, partition_key, first_char, record_len);
+
+ sent += record_len;
+ stats->transmission_successes++;
+
+ size_t sent_bytes = 0, lost_bytes = 0;
+
+ if (unlikely(kinesis_get_result(
+ connector_specific_data->request_outcomes, error_message, &sent_bytes, &lost_bytes))) {
+ // oops! we couldn't send (all or some of the) data
+ error("EXPORTING: %s", error_message);
+ error(
+ "EXPORTING: failed to write data to external database '%s'. Willing to write %zu bytes, wrote %zu bytes.",
+ instance->config.destination, sent_bytes, sent_bytes - lost_bytes);
+
+ stats->transmission_failures++;
+ stats->data_lost_events++;
+ stats->lost_bytes += lost_bytes;
+
+ // estimate the number of lost metrics
+ stats->lost_metrics += (collected_number)(
+ stats->buffered_metrics *
+ (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1));
+
+ break;
+ } else {
+ stats->receptions++;
+ }
+
+ if (unlikely(instance->engine->exit))
+ break;
+ }
+
+ stats->sent_bytes += sent;
+ if (likely(sent == buffer_len))
+ stats->sent_metrics = stats->buffered_metrics;
+
+ buffer_flush(buffer);
+
+ send_internal_metrics(instance);
+
+ stats->buffered_metrics = 0;
+
+ uv_mutex_unlock(&instance->mutex);
+
+#ifdef UNIT_TESTING
+ return;
+#endif
+ }
+
+ aws_kinesis_cleanup(instance);
+}
diff --git a/exporting/aws_kinesis/aws_kinesis.h b/exporting/aws_kinesis/aws_kinesis.h
new file mode 100644
index 0000000..d88a458
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis.h
@@ -0,0 +1,16 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_KINESIS_H
+#define NETDATA_EXPORTING_KINESIS_H
+
+#include "exporting/exporting_engine.h"
+#include "exporting/json/json.h"
+#include "aws_kinesis_put_record.h"
+
+#define KINESIS_PARTITION_KEY_MAX 256
+#define KINESIS_RECORD_MAX 1024 * 1024
+
+int init_aws_kinesis_instance(struct instance *instance);
+void aws_kinesis_connector_worker(void *instance_p);
+
+#endif //NETDATA_EXPORTING_KINESIS_H
diff --git a/exporting/aws_kinesis/aws_kinesis_put_record.cc b/exporting/aws_kinesis/aws_kinesis_put_record.cc
new file mode 100644
index 0000000..62c6b03
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis_put_record.cc
@@ -0,0 +1,151 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <aws/core/Aws.h>
+#include <aws/core/client/ClientConfiguration.h>
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/utils/Outcome.h>
+#include <aws/kinesis/KinesisClient.h>
+#include <aws/kinesis/model/PutRecordRequest.h>
+#include "aws_kinesis_put_record.h"
+
+using namespace Aws;
+
+static SDKOptions options;
+
+struct request_outcome {
+ Kinesis::Model::PutRecordOutcomeCallable future_outcome;
+ size_t data_len;
+};
+
+/**
+ * Initialize AWS SDK API
+ */
+void aws_sdk_init()
+{
+ InitAPI(options);
+}
+
+/**
+ * Shutdown AWS SDK API
+ */
+void aws_sdk_shutdown()
+{
+ ShutdownAPI(options);
+}
+
+/**
+ * Initialize a client and a data structure for request outcomes
+ *
+ * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
+ * @param region AWS region.
+ * @param access_key_id AWS account access key ID.
+ * @param secret_key AWS account secret access key.
+ * @param timeout communication timeout.
+ */
+void kinesis_init(
+ void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key,
+ const long timeout)
+{
+ struct aws_kinesis_specific_data *kinesis_specific_data =
+ (struct aws_kinesis_specific_data *)kinesis_specific_data_p;
+
+ Client::ClientConfiguration config;
+
+ config.region = region;
+ config.requestTimeoutMs = timeout;
+ config.connectTimeoutMs = timeout;
+
+ Kinesis::KinesisClient *client;
+
+ if (access_key_id && *access_key_id && secret_key && *secret_key) {
+ client = New<Kinesis::KinesisClient>("client", Auth::AWSCredentials(access_key_id, secret_key), config);
+ } else {
+ client = New<Kinesis::KinesisClient>("client", config);
+ }
+ kinesis_specific_data->client = (void *)client;
+
+ Vector<request_outcome> *request_outcomes;
+
+ request_outcomes = new Vector<request_outcome>;
+ kinesis_specific_data->request_outcomes = (void *)request_outcomes;
+}
+
+/**
+ * Deallocate Kinesis specific data
+ *
+ * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
+ */
+void kinesis_shutdown(void *kinesis_specific_data_p)
+{
+ struct aws_kinesis_specific_data *kinesis_specific_data =
+ (struct aws_kinesis_specific_data *)kinesis_specific_data_p;
+
+ Delete((Kinesis::KinesisClient *)kinesis_specific_data->client);
+ delete (Vector<request_outcome> *)kinesis_specific_data->request_outcomes;
+}
+
+/**
+ * Send data to the Kinesis service
+ *
+ * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
+ * @param stream_name the name of a stream to send to.
+ * @param partition_key a partition key which automatically maps data to a specific stream.
+ * @param data a data buffer to send to the stream.
+ * @param data_len the length of the data buffer.
+ */
+void kinesis_put_record(
+ void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data,
+ size_t data_len)
+{
+ struct aws_kinesis_specific_data *kinesis_specific_data =
+ (struct aws_kinesis_specific_data *)kinesis_specific_data_p;
+ Kinesis::Model::PutRecordRequest request;
+
+ request.SetStreamName(stream_name);
+ request.SetPartitionKey(partition_key);
+ request.SetData(Utils::ByteBuffer((unsigned char *)data, data_len));
+
+ ((Vector<request_outcome> *)(kinesis_specific_data->request_outcomes))->push_back(
+ { ((Kinesis::KinesisClient *)(kinesis_specific_data->client))->PutRecordCallable(request), data_len });
+}
+
+/**
+ * Get results from service responses
+ *
+ * @param request_outcomes_p request outcome information.
+ * @param error_message report error message to a caller.
+ * @param sent_bytes report to a caller how many bytes was successfully sent.
+ * @param lost_bytes report to a caller how many bytes was lost during transmission.
+ * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission
+ */
+int kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes)
+{
+ Vector<request_outcome> *request_outcomes = (Vector<request_outcome> *)request_outcomes_p;
+ Kinesis::Model::PutRecordOutcome outcome;
+ *sent_bytes = 0;
+ *lost_bytes = 0;
+
+ for (auto request_outcome = request_outcomes->begin(); request_outcome != request_outcomes->end();) {
+ std::future_status status = request_outcome->future_outcome.wait_for(std::chrono::microseconds(100));
+
+ if (status == std::future_status::ready || status == std::future_status::deferred) {
+ outcome = request_outcome->future_outcome.get();
+ *sent_bytes += request_outcome->data_len;
+
+ if (!outcome.IsSuccess()) {
+ *lost_bytes += request_outcome->data_len;
+ outcome.GetError().GetMessage().copy(error_message, ERROR_LINE_MAX);
+ }
+
+ request_outcomes->erase(request_outcome);
+ } else {
+ ++request_outcome;
+ }
+ }
+
+ if (*lost_bytes) {
+ return 1;
+ }
+
+ return 0;
+}
diff --git a/exporting/aws_kinesis/aws_kinesis_put_record.h b/exporting/aws_kinesis/aws_kinesis_put_record.h
new file mode 100644
index 0000000..321baf6
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis_put_record.h
@@ -0,0 +1,35 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_KINESIS_PUT_RECORD_H
+#define NETDATA_EXPORTING_KINESIS_PUT_RECORD_H
+
+#define ERROR_LINE_MAX 1023
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct aws_kinesis_specific_data {
+ void *client;
+ void *request_outcomes;
+};
+
+void aws_sdk_init();
+void aws_sdk_shutdown();
+
+void kinesis_init(
+ void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key,
+ const long timeout);
+void kinesis_shutdown(void *client);
+
+void kinesis_put_record(
+ void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data,
+ size_t data_len);
+
+int kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //NETDATA_EXPORTING_KINESIS_PUT_RECORD_H