summaryrefslogtreecommitdiffstats
path: root/exporting/pubsub
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--exporting/pubsub/Makefile.am8
-rw-r--r--exporting/pubsub/README.md38
-rw-r--r--exporting/pubsub/pubsub.c195
-rw-r--r--exporting/pubsub/pubsub.h14
-rw-r--r--exporting/pubsub/pubsub_publish.cc258
-rw-r--r--exporting/pubsub/pubsub_publish.h37
6 files changed, 550 insertions, 0 deletions
diff --git a/exporting/pubsub/Makefile.am b/exporting/pubsub/Makefile.am
new file mode 100644
index 0000000..161784b
--- /dev/null
+++ b/exporting/pubsub/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/exporting/pubsub/README.md b/exporting/pubsub/README.md
new file mode 100644
index 0000000..2f9ac83
--- /dev/null
+++ b/exporting/pubsub/README.md
@@ -0,0 +1,38 @@
+<!--
+title: "Export metrics to Google Cloud Pub/Sub Service"
+description: "Export Netdata metrics to the Google Cloud Pub/Sub Service for long-term archiving or analytical processing."
+custom_edit_url: https://github.com/netdata/netdata/edit/master/exporting/pubsub/README.md
+sidebar_label: Google Cloud Pub/Sub Service
+-->
+
+# Export metrics to Google Cloud Pub/Sub Service
+
+## Prerequisites
+
+To use the Pub/Sub service for metric collecting and processing, you should first
+[install](https://github.com/googleapis/google-cloud-cpp/) Google Cloud Platform C++ Client Libraries.
+Pub/Sub support is also dependent on the dependencies of those libraries, like `protobuf`, `protoc`, and `grpc`. Next,
+Netdata should be re-installed from the source. The installer will detect that the required libraries are now available.
+
+## Configuration
+
+To enable data sending to the Pub/Sub service, run `./edit-config exporting.conf` in the Netdata configuration directory
+and set the following options:
+
+```conf
+[pubsub:my_instance]
+ enabled = yes
+ destination = pubsub.googleapis.com
+ credentials file = /etc/netdata/google_cloud_credentials.json
+ project id = my_project
+ topic id = my_topic
+```
+
+Set the `destination` option to a Pub/Sub service endpoint. `pubsub.googleapis.com` is the default one.
+
+Next, create the credentials JSON file by following Google Cloud's [authentication guide](https://cloud.google.com/docs/authentication/getting-started#creating_a_service_account). The user running the Agent
+(typically `netdata`) needs read access to `google_cloud_credentials.json`, which you can set with
+`chmod 400 google_cloud_credentials.json; chown netdata google_cloud_credentials.json`. Set the `credentials file`
+option to the full path of the file.
+
+
diff --git a/exporting/pubsub/pubsub.c b/exporting/pubsub/pubsub.c
new file mode 100644
index 0000000..b218338
--- /dev/null
+++ b/exporting/pubsub/pubsub.c
@@ -0,0 +1,195 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "pubsub.h"
+
+/**
+ * Initialize Pub/Sub connector instance
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int init_pubsub_instance(struct instance *instance)
+{
+ instance->worker = pubsub_connector_worker;
+
+ instance->start_batch_formatting = NULL;
+ instance->start_host_formatting = format_host_labels_json_plaintext;
+ instance->start_chart_formatting = NULL;
+
+
+ if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
+ instance->metric_formatting = format_dimension_collected_json_plaintext;
+ else
+ instance->metric_formatting = format_dimension_stored_json_plaintext;
+
+ instance->end_chart_formatting = NULL;
+ instance->variables_formatting = NULL;
+ instance->end_host_formatting = flush_host_labels;
+ instance->end_batch_formatting = NULL;
+
+ instance->prepare_header = NULL;
+ instance->check_response = NULL;
+
+ instance->buffer = (void *)buffer_create(0);
+ if (!instance->buffer) {
+ error("EXPORTING: cannot create buffer for Pub/Sub exporting connector instance %s", instance->config.name);
+ return 1;
+ }
+ uv_mutex_init(&instance->mutex);
+ uv_cond_init(&instance->cond_var);
+
+ struct pubsub_specific_data *connector_specific_data = callocz(1, sizeof(struct pubsub_specific_data));
+ instance->connector_specific_data = (void *)connector_specific_data;
+
+ struct pubsub_specific_config *connector_specific_config =
+ (struct pubsub_specific_config *)instance->config.connector_specific_config;
+ char error_message[ERROR_LINE_MAX + 1] = "";
+ if (pubsub_init(
+ (void *)connector_specific_data, error_message, instance->config.destination,
+ connector_specific_config->credentials_file, connector_specific_config->project_id,
+ connector_specific_config->topic_id)) {
+ error(
+ "EXPORTING: Cannot initialize a Pub/Sub publisher for instance %s: %s",
+ instance->config.name, error_message);
+ return 1;
+ }
+
+ return 0;
+}
+
+/**
+ * Clean a PubSub connector instance
+ *
+ * @param instance an instance data structure.
+ */
+void clean_pubsub_instance(struct instance *instance)
+{
+ info("EXPORTING: cleaning up instance %s ...", instance->config.name);
+
+ struct pubsub_specific_data *connector_specific_data =
+ (struct pubsub_specific_data *)instance->connector_specific_data;
+ pubsub_cleanup(connector_specific_data);
+ freez(connector_specific_data);
+
+ buffer_free(instance->buffer);
+
+ struct pubsub_specific_config *connector_specific_config =
+ (struct pubsub_specific_config *)instance->config.connector_specific_config;
+ freez(connector_specific_config->credentials_file);
+ freez(connector_specific_config->project_id);
+ freez(connector_specific_config->topic_id);
+ freez(connector_specific_config);
+
+ info("EXPORTING: instance %s exited", instance->config.name);
+ instance->exited = 1;
+
+ return;
+}
+
+/**
+ * Pub/Sub connector worker
+ *
+ * Runs in a separate thread for every instance.
+ *
+ * @param instance_p an instance data structure.
+ */
+void pubsub_connector_worker(void *instance_p)
+{
+ struct instance *instance = (struct instance *)instance_p;
+ struct pubsub_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ struct pubsub_specific_data *connector_specific_data = instance->connector_specific_data;
+
+ while (!instance->engine->exit) {
+ struct stats *stats = &instance->stats;
+ char error_message[ERROR_LINE_MAX + 1] = "";
+
+ uv_mutex_lock(&instance->mutex);
+ while (!instance->data_is_ready)
+ uv_cond_wait(&instance->cond_var, &instance->mutex);
+ instance->data_is_ready = 0;
+
+
+ if (unlikely(instance->engine->exit)) {
+ uv_mutex_unlock(&instance->mutex);
+ break;
+ }
+
+ // reset the monitoring chart counters
+ stats->received_bytes =
+ stats->sent_bytes =
+ stats->sent_metrics =
+ stats->lost_metrics =
+ stats->receptions =
+ stats->transmission_successes =
+ stats->transmission_failures =
+ stats->data_lost_events =
+ stats->lost_bytes =
+ stats->reconnects = 0;
+
+ BUFFER *buffer = (BUFFER *)instance->buffer;
+ size_t buffer_len = buffer_strlen(buffer);
+
+ stats->buffered_bytes = buffer_len;
+
+ if (pubsub_add_message(instance->connector_specific_data, (char *)buffer_tostring(buffer))) {
+ error("EXPORTING: Instance %s: Cannot add data to a message", instance->config.name);
+
+ stats->data_lost_events++;
+ stats->lost_metrics += stats->buffered_metrics;
+ stats->lost_bytes += buffer_len;
+
+ goto cleanup;
+ }
+
+ debug(
+ D_EXPORTING, "EXPORTING: pubsub_publish(): project = %s, topic = %s, buffer = %zu",
+ connector_specific_config->project_id, connector_specific_config->topic_id, buffer_len);
+
+ if (pubsub_publish((void *)connector_specific_data, error_message, stats->buffered_metrics, buffer_len)) {
+ error("EXPORTING: Instance: %s: Cannot publish a message: %s", instance->config.name, error_message);
+
+ stats->transmission_failures++;
+ stats->data_lost_events++;
+ stats->lost_metrics += stats->buffered_metrics;
+ stats->lost_bytes += buffer_len;
+
+ goto cleanup;
+ }
+
+ stats->sent_bytes = buffer_len;
+ stats->transmission_successes++;
+
+ size_t sent_metrics = 0, lost_metrics = 0, sent_bytes = 0, lost_bytes = 0;
+
+ if (unlikely(pubsub_get_result(
+ connector_specific_data, error_message, &sent_metrics, &sent_bytes, &lost_metrics, &lost_bytes))) {
+ // oops! we couldn't send (all or some of the) data
+ error("EXPORTING: %s", error_message);
+ error(
+ "EXPORTING: failed to write data to service '%s'. Willing to write %zu bytes, wrote %zu bytes.",
+ instance->config.destination, lost_bytes, sent_bytes);
+
+ stats->transmission_failures++;
+ stats->data_lost_events++;
+ stats->lost_metrics += lost_metrics;
+ stats->lost_bytes += lost_bytes;
+ } else {
+ stats->receptions++;
+ stats->sent_metrics = sent_metrics;
+ }
+
+ cleanup:
+ send_internal_metrics(instance);
+
+ buffer_flush(buffer);
+ stats->buffered_metrics = 0;
+
+ uv_mutex_unlock(&instance->mutex);
+
+#ifdef UNIT_TESTING
+ return;
+#endif
+ }
+
+ clean_pubsub_instance(instance);
+}
diff --git a/exporting/pubsub/pubsub.h b/exporting/pubsub/pubsub.h
new file mode 100644
index 0000000..0bcb76f
--- /dev/null
+++ b/exporting/pubsub/pubsub.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_PUBSUB_H
+#define NETDATA_EXPORTING_PUBSUB_H
+
+#include "exporting/exporting_engine.h"
+#include "exporting/json/json.h"
+#include "pubsub_publish.h"
+
+int init_pubsub_instance(struct instance *instance);
+void clean_pubsub_instance(struct instance *instance);
+void pubsub_connector_worker(void *instance_p);
+
+#endif //NETDATA_EXPORTING_PUBSUB_H
diff --git a/exporting/pubsub/pubsub_publish.cc b/exporting/pubsub/pubsub_publish.cc
new file mode 100644
index 0000000..cc14154
--- /dev/null
+++ b/exporting/pubsub/pubsub_publish.cc
@@ -0,0 +1,258 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <google/pubsub/v1/pubsub.grpc.pb.h>
+#include <grpcpp/grpcpp.h>
+#include <stdexcept>
+#include "pubsub_publish.h"
+
+#define EVENT_CHECK_TIMEOUT 50
+
+struct response {
+ grpc::ClientContext *context;
+ google::pubsub::v1::PublishResponse *publish_response;
+ size_t tag;
+ grpc::Status *status;
+
+ size_t published_metrics;
+ size_t published_bytes;
+};
+
+static inline void copy_error_message(char *error_message_dst, const char *error_message_src)
+{
+ std::strncpy(error_message_dst, error_message_src, ERROR_LINE_MAX);
+ error_message_dst[ERROR_LINE_MAX] = '\0';
+}
+
+/**
+ * Initialize a Pub/Sub client and a data structure for responses.
+ *
+ * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
+ * @param error_message report error message to a caller.
+ * @param destination a Pub/Sub service endpoint.
+ * @param credentials_file a full path for a file with google application credentials.
+ * @param project_id a project ID.
+ * @param topic_id a topic ID.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int pubsub_init(
+ void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file,
+ const char *project_id, const char *topic_id)
+{
+ struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
+
+ try {
+ setenv("GOOGLE_APPLICATION_CREDENTIALS", credentials_file, 0);
+
+ std::shared_ptr<grpc::ChannelCredentials> credentials = grpc::GoogleDefaultCredentials();
+ if (credentials == nullptr) {
+ copy_error_message(error_message, "Can't load credentials");
+ return 1;
+ }
+
+ std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(destination, credentials);
+
+ google::pubsub::v1::Publisher::Stub *stub = new google::pubsub::v1::Publisher::Stub(channel);
+ if (!stub) {
+ copy_error_message(error_message, "Can't create a publisher stub");
+ return 1;
+ }
+
+ connector_specific_data->stub = stub;
+
+ google::pubsub::v1::PublishRequest *request = new google::pubsub::v1::PublishRequest;
+ connector_specific_data->request = request;
+ ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))
+ ->set_topic(std::string("projects/") + project_id + "/topics/" + topic_id);
+
+ grpc::CompletionQueue *cq = new grpc::CompletionQueue;
+ connector_specific_data->completion_queue = cq;
+
+ connector_specific_data->responses = new std::list<struct response>;
+
+ return 0;
+ } catch (std::exception const &ex) {
+ std::string em(std::string("Standard exception raised: ") + ex.what());
+ copy_error_message(error_message, em.c_str());
+ return 1;
+ }
+
+ return 0;
+}
+
+/**
+ * Clean the PubSub connector instance specific data
+ */
+void pubsub_cleanup(void *pubsub_specific_data_p)
+{
+ struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
+
+ std::list<struct response> *responses = (std::list<struct response> *)connector_specific_data->responses;
+ std::list<struct response>::iterator response;
+ for (response = responses->begin(); response != responses->end(); ++response) {
+ // TODO: If we do this, there are a huge amount of possibly lost records. We need to find a right way of
+ // cleaning up contexts
+ // delete response->context;
+ delete response->publish_response;
+ delete response->status;
+ }
+ delete responses;
+
+ ((grpc::CompletionQueue *)connector_specific_data->completion_queue)->Shutdown();
+ delete (grpc::CompletionQueue *)connector_specific_data->completion_queue;
+ delete (google::pubsub::v1::PublishRequest *)connector_specific_data->request;
+ delete (google::pubsub::v1::Publisher::Stub *)connector_specific_data->stub;
+
+ // TODO: Find how to shutdown grpc gracefully. grpc_shutdown() doesn't seem to work.
+ // grpc_shutdown();
+
+ return;
+}
+
+/**
+ * Add data to a Pub/Sub request message.
+ *
+ * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
+ * @param data a text buffer with metrics.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int pubsub_add_message(void *pubsub_specific_data_p, char *data)
+{
+ struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
+
+ try {
+ google::pubsub::v1::PubsubMessage *message =
+ ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->add_messages();
+ if (!message)
+ return 1;
+
+ message->set_data(data);
+ } catch (std::exception const &ex) {
+ return 1;
+ }
+
+ return 0;
+}
+
+/**
+ * Send data to the Pub/Sub service
+ *
+ * @param pubsub_specific_data_p a pointer to a structure with client and request outcome information.
+ * @param error_message report error message to a caller.
+ * @param buffered_metrics the number of metrics we are going to send.
+ * @param buffered_bytes the number of bytes we are going to send.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes)
+{
+ struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
+
+ try {
+ grpc::ClientContext *context = new grpc::ClientContext;
+
+ std::unique_ptr<grpc::ClientAsyncResponseReader<google::pubsub::v1::PublishResponse> > rpc(
+ ((google::pubsub::v1::Publisher::Stub *)(connector_specific_data->stub))
+ ->AsyncPublish(
+ context, (*(google::pubsub::v1::PublishRequest *)(connector_specific_data->request)),
+ ((grpc::CompletionQueue *)(connector_specific_data->completion_queue))));
+
+ struct response response;
+ response.context = context;
+ response.publish_response = new google::pubsub::v1::PublishResponse;
+ response.tag = connector_specific_data->last_tag++;
+ response.status = new grpc::Status;
+ response.published_metrics = buffered_metrics;
+ response.published_bytes = buffered_bytes;
+
+ rpc->Finish(response.publish_response, response.status, (void *)response.tag);
+
+ ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->clear_messages();
+
+ ((std::list<struct response> *)(connector_specific_data->responses))->push_back(response);
+ } catch (std::exception const &ex) {
+ std::string em(std::string("Standard exception raised: ") + ex.what());
+ copy_error_message(error_message, em.c_str());
+ return 1;
+ }
+
+ return 0;
+}
+
+/**
+ * Get results from service responses
+ *
+ * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
+ * @param error_message report error message to a caller.
+ * @param sent_metrics report to a caller how many metrics was successfully sent.
+ * @param sent_bytes report to a caller how many bytes was successfully sent.
+ * @param lost_metrics report to a caller how many metrics was lost during transmission.
+ * @param lost_bytes report to a caller how many bytes was lost during transmission.
+ * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission.
+ */
+int pubsub_get_result(
+ void *pubsub_specific_data_p, char *error_message,
+ size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes)
+{
+ struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
+ std::list<struct response> *responses = (std::list<struct response> *)connector_specific_data->responses;
+ grpc::CompletionQueue::NextStatus next_status;
+
+ *sent_metrics = 0;
+ *sent_bytes = 0;
+ *lost_metrics = 0;
+ *lost_bytes = 0;
+
+ try {
+ do {
+ std::list<struct response>::iterator response;
+ void *got_tag;
+ bool ok = false;
+
+ auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(50);
+ next_status = (*(grpc::CompletionQueue *)(connector_specific_data->completion_queue))
+ .AsyncNext(&got_tag, &ok, deadline);
+
+ if (next_status == grpc::CompletionQueue::GOT_EVENT) {
+ for (response = responses->begin(); response != responses->end(); ++response) {
+ if ((void *)response->tag == got_tag)
+ break;
+ }
+
+ if (response == responses->end()) {
+ copy_error_message(error_message, "Cannot get Pub/Sub response");
+ return 1;
+ }
+
+ if (ok && response->publish_response->message_ids_size()) {
+ *sent_metrics += response->published_metrics;
+ *sent_bytes += response->published_bytes;
+ } else {
+ *lost_metrics += response->published_metrics;
+ *lost_bytes += response->published_bytes;
+ response->status->error_message().copy(error_message, ERROR_LINE_MAX);
+ error_message[ERROR_LINE_MAX] = '\0';
+ }
+
+ delete response->context;
+ delete response->publish_response;
+ delete response->status;
+ responses->erase(response);
+ }
+
+ if (next_status == grpc::CompletionQueue::SHUTDOWN) {
+ copy_error_message(error_message, "Completion queue shutdown");
+ return 1;
+ }
+
+ } while (next_status == grpc::CompletionQueue::GOT_EVENT);
+ } catch (std::exception const &ex) {
+ std::string em(std::string("Standard exception raised: ") + ex.what());
+ copy_error_message(error_message, em.c_str());
+ return 1;
+ }
+
+ if (*lost_metrics) {
+ return 1;
+ }
+
+ return 0;
+}
diff --git a/exporting/pubsub/pubsub_publish.h b/exporting/pubsub/pubsub_publish.h
new file mode 100644
index 0000000..567a262
--- /dev/null
+++ b/exporting/pubsub/pubsub_publish.h
@@ -0,0 +1,37 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_PUBSUB_PUBLISH_H
+#define NETDATA_EXPORTING_PUBSUB_PUBLISH_H
+
+#define ERROR_LINE_MAX 1023
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct pubsub_specific_data {
+ void *stub;
+ void *request;
+ void *completion_queue;
+
+ void *responses;
+ size_t last_tag;
+};
+
+int pubsub_init(
+ void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file,
+ const char *project_id, const char *topic_id);
+void pubsub_cleanup(void *pubsub_specific_data_p);
+
+int pubsub_add_message(void *pubsub_specific_data_p, char *data);
+
+int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes);
+int pubsub_get_result(
+ void *pubsub_specific_data_p, char *error_message,
+ size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //NETDATA_EXPORTING_PUBSUB_PUBLISH_H