summaryrefslogtreecommitdiffstats
path: root/backends/aws_kinesis
diff options
context:
space:
mode:
Diffstat (limited to 'backends/aws_kinesis')
-rw-r--r--backends/aws_kinesis/Makefile.am12
-rw-r--r--backends/aws_kinesis/README.md34
-rw-r--r--backends/aws_kinesis/aws_kinesis.c101
-rw-r--r--backends/aws_kinesis/aws_kinesis.conf10
-rw-r--r--backends/aws_kinesis/aws_kinesis.h14
-rw-r--r--backends/aws_kinesis/aws_kinesis_put_record.cc87
-rw-r--r--backends/aws_kinesis/aws_kinesis_put_record.h25
7 files changed, 283 insertions, 0 deletions
diff --git a/backends/aws_kinesis/Makefile.am b/backends/aws_kinesis/Makefile.am
new file mode 100644
index 000000000..7317b3821
--- /dev/null
+++ b/backends/aws_kinesis/Makefile.am
@@ -0,0 +1,12 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
+
+dist_libconfig_DATA = \
+ aws_kinesis.conf \
+ $(NULL) \ No newline at end of file
diff --git a/backends/aws_kinesis/README.md b/backends/aws_kinesis/README.md
new file mode 100644
index 000000000..a9cc77d6e
--- /dev/null
+++ b/backends/aws_kinesis/README.md
@@ -0,0 +1,34 @@
+# Using netdata with AWS Kinesis Data Streams
+
+## Prerequisites
+
+To use AWS Kinesis as a backend AWS SDK for C++ should be [installed](https://docs.aws.amazon.com/en_us/sdk-for-cpp/v1/developer-guide/setup.html) first. `libcrypto`, `libssl`, and `libcurl` are also required to compile netdata with Kinesis support enabled. Next, netdata should be re-installed from the source. The installer will detect that the required libraries are now available.
+
+If AWS SDK for C++ is being installed from sources, it is useful to set `-DBUILD_ONLY="kinesis"`. Otherwise, the building process could take a very long time.
+
+## Configuration
+
+To enable data sending to the kinesis backend set the following options in `netdata.conf`:
+```
+[backend]
+ enabled = yes
+ type = kinesis
+ destination = us-east-1
+```
+set the `destination` option to an AWS region.
+
+In the netdata configuration directory run `./edit-config aws_kinesis.conf` and set AWS credentials and stream name:
+```
+# AWS credentials
+aws_access_key_id = your_access_key_id
+aws_secret_access_key = your_secret_access_key
+
+# destination stream
+stream name = your_stream_name
+```
+Alternatively, AWS credentials can be set for the *netdata* user using AWS SDK for C++ [standard methods](https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html).
+
+A partition key for every record is computed automatically by the netdata with the purpose to distribute records across available shards evenly.
+
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fbackends%2Faws_kinesis%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]()
diff --git a/backends/aws_kinesis/aws_kinesis.c b/backends/aws_kinesis/aws_kinesis.c
new file mode 100644
index 000000000..d8b79364c
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis.c
@@ -0,0 +1,101 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#define BACKENDS_INTERNALS
+#include "aws_kinesis.h"
+
+#define CONFIG_FILE_LINE_MAX ((CONFIG_MAX_NAME + CONFIG_MAX_VALUE + 1024) * 2)
+
+// ----------------------------------------------------------------------------
+// kinesis backend
+
+// read the aws_kinesis.conf file
+int read_kinesis_conf(const char *path, char **access_key_id_p, char **secret_access_key_p, char **stream_name_p)
+{
+ char *access_key_id = *access_key_id_p;
+ char *secret_access_key = *secret_access_key_p;
+ char *stream_name = *stream_name_p;
+
+ if(unlikely(access_key_id)) freez(access_key_id);
+ if(unlikely(secret_access_key)) freez(secret_access_key);
+ if(unlikely(stream_name)) freez(stream_name);
+ access_key_id = NULL;
+ secret_access_key = NULL;
+ stream_name = NULL;
+
+ int line = 0;
+
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "%s/aws_kinesis.conf", path);
+
+ char buffer[CONFIG_FILE_LINE_MAX + 1], *s;
+
+ debug(D_BACKEND, "BACKEND: opening config file '%s'", filename);
+
+ FILE *fp = fopen(filename, "r");
+ if(!fp) {
+ return 1;
+ }
+
+ while(fgets(buffer, CONFIG_FILE_LINE_MAX, fp) != NULL) {
+ buffer[CONFIG_FILE_LINE_MAX] = '\0';
+ line++;
+
+ s = trim(buffer);
+ if(!s || *s == '#') {
+ debug(D_BACKEND, "BACKEND: ignoring line %d of file '%s', it is empty.", line, filename);
+ continue;
+ }
+
+ char *name = s;
+ char *value = strchr(s, '=');
+ if(unlikely(!value)) {
+ error("BACKEND: ignoring line %d ('%s') of file '%s', there is no = in it.", line, s, filename);
+ continue;
+ }
+ *value = '\0';
+ value++;
+
+ name = trim(name);
+ value = trim(value);
+
+ if(unlikely(!name || *name == '#')) {
+ error("BACKEND: ignoring line %d of file '%s', name is empty.", line, filename);
+ continue;
+ }
+
+ if(!value) value = "";
+
+ // strip quotes
+ if(*value == '"' || *value == '\'') {
+ value++;
+
+ s = value;
+ while(*s) s++;
+ if(s != value) s--;
+
+ if(*s == '"' || *s == '\'') *s = '\0';
+ }
+ if(name[0] == 'a' && name[4] == 'a' && !strcmp(name, "aws_access_key_id")) {
+ access_key_id = strdupz(value);
+ }
+ else if(name[0] == 'a' && name[4] == 's' && !strcmp(name, "aws_secret_access_key")) {
+ secret_access_key = strdupz(value);
+ }
+ else if(name[0] == 's' && !strcmp(name, "stream name")) {
+ stream_name = strdupz(value);
+ }
+ }
+
+ fclose(fp);
+
+ if(unlikely(!stream_name || !*stream_name)) {
+ error("BACKEND: stream name is a mandatory Kinesis parameter but it is not configured");
+ return 1;
+ }
+
+ *access_key_id_p = access_key_id;
+ *secret_access_key_p = secret_access_key;
+ *stream_name_p = stream_name;
+
+ return 0;
+}
diff --git a/backends/aws_kinesis/aws_kinesis.conf b/backends/aws_kinesis/aws_kinesis.conf
new file mode 100644
index 000000000..cc54b5fa2
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis.conf
@@ -0,0 +1,10 @@
+# AWS Kinesis Data Streams backend configuration
+#
+# All options in this file are mandatory
+
+# AWS credentials
+aws_access_key_id =
+aws_secret_access_key =
+
+# destination stream
+stream name = \ No newline at end of file
diff --git a/backends/aws_kinesis/aws_kinesis.h b/backends/aws_kinesis/aws_kinesis.h
new file mode 100644
index 000000000..50a4631c5
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_BACKEND_KINESIS_H
+#define NETDATA_BACKEND_KINESIS_H
+
+#include "backends/backends.h"
+#include "aws_kinesis_put_record.h"
+
+#define KINESIS_PARTITION_KEY_MAX 256
+#define KINESIS_RECORD_MAX 1024 * 1024
+
+extern int read_kinesis_conf(const char *path, char **auth_key_id_p, char **secure_key_p, char **stream_name_p);
+
+#endif //NETDATA_BACKEND_KINESIS_H
diff --git a/backends/aws_kinesis/aws_kinesis_put_record.cc b/backends/aws_kinesis/aws_kinesis_put_record.cc
new file mode 100644
index 000000000..0c8ece68b
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis_put_record.cc
@@ -0,0 +1,87 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <aws/core/Aws.h>
+#include <aws/core/client/ClientConfiguration.h>
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/utils/Outcome.h>
+#include <aws/kinesis/KinesisClient.h>
+#include <aws/kinesis/model/PutRecordRequest.h>
+#include "aws_kinesis_put_record.h"
+
+using namespace Aws;
+
+SDKOptions options;
+
+Kinesis::KinesisClient *client;
+
+struct request_outcome {
+ Kinesis::Model::PutRecordOutcomeCallable future_outcome;
+ size_t data_len;
+};
+
+Vector<request_outcome> request_outcomes;
+
+void kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout) {
+ InitAPI(options);
+
+ Client::ClientConfiguration config;
+
+ config.region = region;
+ config.requestTimeoutMs = timeout;
+ config.connectTimeoutMs = timeout;
+
+ if(access_key_id && *access_key_id && secret_key && *secret_key) {
+ client = New<Kinesis::KinesisClient>("client", Auth::AWSCredentials(access_key_id, secret_key), config);
+ } else {
+ client = New<Kinesis::KinesisClient>("client", config);
+ }
+}
+
+void kinesis_shutdown() {
+ Delete(client);
+
+ ShutdownAPI(options);
+}
+
+int kinesis_put_record(const char *stream_name, const char *partition_key,
+ const char *data, size_t data_len) {
+ Kinesis::Model::PutRecordRequest request;
+
+ request.SetStreamName(stream_name);
+ request.SetPartitionKey(partition_key);
+ request.SetData(Utils::ByteBuffer((unsigned char*) data, data_len));
+
+ request_outcomes.push_back({client->PutRecordCallable(request), data_len});
+
+ return 0;
+}
+
+int kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes) {
+ Kinesis::Model::PutRecordOutcome outcome;
+ *sent_bytes = 0;
+ *lost_bytes = 0;
+
+ for(auto request_outcome = request_outcomes.begin(); request_outcome != request_outcomes.end(); ) {
+ std::future_status status = request_outcome->future_outcome.wait_for(std::chrono::microseconds(100));
+
+ if(status == std::future_status::ready || status == std::future_status::deferred) {
+ outcome = request_outcome->future_outcome.get();
+ *sent_bytes += request_outcome->data_len;
+
+ if(!outcome.IsSuccess()) {
+ *lost_bytes += request_outcome->data_len;
+ outcome.GetError().GetMessage().copy(error_message, ERROR_LINE_MAX);
+ }
+
+ request_outcomes.erase(request_outcome);
+ } else {
+ ++request_outcome;
+ }
+ }
+
+ if(*lost_bytes) {
+ return 1;
+ }
+
+ return 0;
+} \ No newline at end of file
diff --git a/backends/aws_kinesis/aws_kinesis_put_record.h b/backends/aws_kinesis/aws_kinesis_put_record.h
new file mode 100644
index 000000000..f48e420f3
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis_put_record.h
@@ -0,0 +1,25 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_BACKEND_KINESIS_PUT_RECORD_H
+#define NETDATA_BACKEND_KINESIS_PUT_RECORD_H
+
+#define ERROR_LINE_MAX 1023
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout);
+
+void kinesis_shutdown();
+
+int kinesis_put_record(const char *stream_name, const char *partition_key,
+ const char *data, size_t data_len);
+
+int kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //NETDATA_BACKEND_KINESIS_PUT_RECORD_H