diff options
Diffstat (limited to '')
-rw-r--r-- | backends/aws_kinesis/Makefile.am | 12 | ||||
-rw-r--r-- | backends/aws_kinesis/README.md | 53 | ||||
-rw-r--r-- | backends/aws_kinesis/aws_kinesis.c | 94 | ||||
-rw-r--r-- | backends/aws_kinesis/aws_kinesis.conf | 10 | ||||
-rw-r--r-- | backends/aws_kinesis/aws_kinesis.h | 14 | ||||
-rw-r--r-- | backends/aws_kinesis/aws_kinesis_put_record.cc | 87 | ||||
-rw-r--r-- | backends/aws_kinesis/aws_kinesis_put_record.h | 25 |
7 files changed, 295 insertions, 0 deletions
diff --git a/backends/aws_kinesis/Makefile.am b/backends/aws_kinesis/Makefile.am new file mode 100644 index 0000000..1fec72c --- /dev/null +++ b/backends/aws_kinesis/Makefile.am @@ -0,0 +1,12 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) + +dist_libconfig_DATA = \ + aws_kinesis.conf \ + $(NULL) diff --git a/backends/aws_kinesis/README.md b/backends/aws_kinesis/README.md new file mode 100644 index 0000000..a2b6825 --- /dev/null +++ b/backends/aws_kinesis/README.md @@ -0,0 +1,53 @@ +<!-- +title: "Using Netdata with AWS Kinesis Data Streams" +custom_edit_url: https://github.com/netdata/netdata/edit/master/backends/aws_kinesis/README.md +--> + +# Using Netdata with AWS Kinesis Data Streams + +## Prerequisites + +To use AWS Kinesis as a backend AWS SDK for C++ should be +[installed](https://docs.aws.amazon.com/en_us/sdk-for-cpp/v1/developer-guide/setup.html) first. `libcrypto`, `libssl`, +and `libcurl` are also required to compile Netdata with Kinesis support enabled. Next, Netdata should be re-installed +from the source. The installer will detect that the required libraries are now available. + +If the AWS SDK for C++ is being installed from source, it is useful to set `-DBUILD_ONLY="kinesis"`. Otherwise, the +building process could take a very long time. Take a note, that the default installation path for the libraries is +`/usr/local/lib64`. Many Linux distributions don't include this path as the default one for a library search, so it is +advisable to use the following options to `cmake` while building the AWS SDK: + +```sh +cmake -DCMAKE_INSTALL_LIBDIR=/usr/lib -DCMAKE_INSTALL_INCLUDEDIR=/usr/include -DBUILD_SHARED_LIBS=OFF -DBUILD_ONLY=kinesis <aws-sdk-cpp sources> +``` + +## Configuration + +To enable data sending to the kinesis backend set the following options in `netdata.conf`: + +```conf +[backend] + enabled = yes + type = kinesis + destination = us-east-1 +``` + +set the `destination` option to an AWS region. + +In the Netdata configuration directory run `./edit-config aws_kinesis.conf` and set AWS credentials and stream name: + +```yaml +# AWS credentials +aws_access_key_id = your_access_key_id +aws_secret_access_key = your_secret_access_key + +# destination stream +stream name = your_stream_name +``` + +Alternatively, AWS credentials can be set for the `netdata` user using AWS SDK for C++ [standard methods](https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html). + +A partition key for every record is computed automatically by Netdata with the purpose to distribute records across +available shards evenly. + +[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fbackends%2Faws_kinesis%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) diff --git a/backends/aws_kinesis/aws_kinesis.c b/backends/aws_kinesis/aws_kinesis.c new file mode 100644 index 0000000..b1ea478 --- /dev/null +++ b/backends/aws_kinesis/aws_kinesis.c @@ -0,0 +1,94 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#define BACKENDS_INTERNALS +#include "aws_kinesis.h" + +#define CONFIG_FILE_LINE_MAX ((CONFIG_MAX_NAME + CONFIG_MAX_VALUE + 1024) * 2) + +// ---------------------------------------------------------------------------- +// kinesis backend + +// read the aws_kinesis.conf file +int read_kinesis_conf(const char *path, char **access_key_id_p, char **secret_access_key_p, char **stream_name_p) +{ + char *access_key_id = *access_key_id_p; + char *secret_access_key = *secret_access_key_p; + char *stream_name = *stream_name_p; + + if(unlikely(access_key_id)) freez(access_key_id); + if(unlikely(secret_access_key)) freez(secret_access_key); + if(unlikely(stream_name)) freez(stream_name); + access_key_id = NULL; + secret_access_key = NULL; + stream_name = NULL; + + int line = 0; + + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "%s/aws_kinesis.conf", path); + + char buffer[CONFIG_FILE_LINE_MAX + 1], *s; + + debug(D_BACKEND, "BACKEND: opening config file '%s'", filename); + + FILE *fp = fopen(filename, "r"); + if(!fp) { + return 1; + } + + while(fgets(buffer, CONFIG_FILE_LINE_MAX, fp) != NULL) { + buffer[CONFIG_FILE_LINE_MAX] = '\0'; + line++; + + s = trim(buffer); + if(!s || *s == '#') { + debug(D_BACKEND, "BACKEND: ignoring line %d of file '%s', it is empty.", line, filename); + continue; + } + + char *name = s; + char *value = strchr(s, '='); + if(unlikely(!value)) { + error("BACKEND: ignoring line %d ('%s') of file '%s', there is no = in it.", line, s, filename); + continue; + } + *value = '\0'; + value++; + + name = trim(name); + value = trim(value); + + if(unlikely(!name || *name == '#')) { + error("BACKEND: ignoring line %d of file '%s', name is empty.", line, filename); + continue; + } + + if(!value) + value = ""; + else + value = strip_quotes(value); + + if(name[0] == 'a' && name[4] == 'a' && !strcmp(name, "aws_access_key_id")) { + access_key_id = strdupz(value); + } + else if(name[0] == 'a' && name[4] == 's' && !strcmp(name, "aws_secret_access_key")) { + secret_access_key = strdupz(value); + } + else if(name[0] == 's' && !strcmp(name, "stream name")) { + stream_name = strdupz(value); + } + } + + fclose(fp); + + if(unlikely(!stream_name || !*stream_name)) { + error("BACKEND: stream name is a mandatory Kinesis parameter but it is not configured"); + return 1; + } + + *access_key_id_p = access_key_id; + *secret_access_key_p = secret_access_key; + *stream_name_p = stream_name; + + return 0; +} diff --git a/backends/aws_kinesis/aws_kinesis.conf b/backends/aws_kinesis/aws_kinesis.conf new file mode 100644 index 0000000..cc54b5f --- /dev/null +++ b/backends/aws_kinesis/aws_kinesis.conf @@ -0,0 +1,10 @@ +# AWS Kinesis Data Streams backend configuration +# +# All options in this file are mandatory + +# AWS credentials +aws_access_key_id = +aws_secret_access_key = + +# destination stream +stream name =
\ No newline at end of file diff --git a/backends/aws_kinesis/aws_kinesis.h b/backends/aws_kinesis/aws_kinesis.h new file mode 100644 index 0000000..50a4631 --- /dev/null +++ b/backends/aws_kinesis/aws_kinesis.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_BACKEND_KINESIS_H +#define NETDATA_BACKEND_KINESIS_H + +#include "backends/backends.h" +#include "aws_kinesis_put_record.h" + +#define KINESIS_PARTITION_KEY_MAX 256 +#define KINESIS_RECORD_MAX 1024 * 1024 + +extern int read_kinesis_conf(const char *path, char **auth_key_id_p, char **secure_key_p, char **stream_name_p); + +#endif //NETDATA_BACKEND_KINESIS_H diff --git a/backends/aws_kinesis/aws_kinesis_put_record.cc b/backends/aws_kinesis/aws_kinesis_put_record.cc new file mode 100644 index 0000000..a8ba4aa --- /dev/null +++ b/backends/aws_kinesis/aws_kinesis_put_record.cc @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include <aws/core/Aws.h> +#include <aws/core/client/ClientConfiguration.h> +#include <aws/core/auth/AWSCredentials.h> +#include <aws/core/utils/Outcome.h> +#include <aws/kinesis/KinesisClient.h> +#include <aws/kinesis/model/PutRecordRequest.h> +#include "aws_kinesis_put_record.h" + +using namespace Aws; + +static SDKOptions options; + +static Kinesis::KinesisClient *client; + +struct request_outcome { + Kinesis::Model::PutRecordOutcomeCallable future_outcome; + size_t data_len; +}; + +static Vector<request_outcome> request_outcomes; + +void backends_kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout) { + InitAPI(options); + + Client::ClientConfiguration config; + + config.region = region; + config.requestTimeoutMs = timeout; + config.connectTimeoutMs = timeout; + + if(access_key_id && *access_key_id && secret_key && *secret_key) { + client = New<Kinesis::KinesisClient>("client", Auth::AWSCredentials(access_key_id, secret_key), config); + } else { + client = New<Kinesis::KinesisClient>("client", config); + } +} + +void backends_kinesis_shutdown() { + Delete(client); + + ShutdownAPI(options); +} + +int backends_kinesis_put_record(const char *stream_name, const char *partition_key, + const char *data, size_t data_len) { + Kinesis::Model::PutRecordRequest request; + + request.SetStreamName(stream_name); + request.SetPartitionKey(partition_key); + request.SetData(Utils::ByteBuffer((unsigned char*) data, data_len)); + + request_outcomes.push_back({client->PutRecordCallable(request), data_len}); + + return 0; +} + +int backends_kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes) { + Kinesis::Model::PutRecordOutcome outcome; + *sent_bytes = 0; + *lost_bytes = 0; + + for(auto request_outcome = request_outcomes.begin(); request_outcome != request_outcomes.end(); ) { + std::future_status status = request_outcome->future_outcome.wait_for(std::chrono::microseconds(100)); + + if(status == std::future_status::ready || status == std::future_status::deferred) { + outcome = request_outcome->future_outcome.get(); + *sent_bytes += request_outcome->data_len; + + if(!outcome.IsSuccess()) { + *lost_bytes += request_outcome->data_len; + outcome.GetError().GetMessage().copy(error_message, ERROR_LINE_MAX); + } + + request_outcomes.erase(request_outcome); + } else { + ++request_outcome; + } + } + + if(*lost_bytes) { + return 1; + } + + return 0; +}
\ No newline at end of file diff --git a/backends/aws_kinesis/aws_kinesis_put_record.h b/backends/aws_kinesis/aws_kinesis_put_record.h new file mode 100644 index 0000000..fa3d034 --- /dev/null +++ b/backends/aws_kinesis/aws_kinesis_put_record.h @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_BACKEND_KINESIS_PUT_RECORD_H +#define NETDATA_BACKEND_KINESIS_PUT_RECORD_H + +#define ERROR_LINE_MAX 1023 + +#ifdef __cplusplus +extern "C" { +#endif + +void backends_kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout); + +void backends_kinesis_shutdown(); + +int backends_kinesis_put_record(const char *stream_name, const char *partition_key, + const char *data, size_t data_len); + +int backends_kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes); + +#ifdef __cplusplus +} +#endif + +#endif //NETDATA_BACKEND_KINESIS_PUT_RECORD_H |