summaryrefslogtreecommitdiffstats
path: root/src/exporting/pubsub
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:53:24 +0000
commitb5f8ee61a7f7e9bd291dd26b0585d03eb686c941 (patch)
treed4d31289c39fc00da064a825df13a0b98ce95b10 /src/exporting/pubsub
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b5f8ee61a7f7e9bd291dd26b0585d03eb686c941.tar.xz
netdata-b5f8ee61a7f7e9bd291dd26b0585d03eb686c941.zip
Adding upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/exporting/pubsub')
l---------src/exporting/pubsub/README.md1
-rw-r--r--src/exporting/pubsub/integrations/google_cloud_pub_sub.md145
-rw-r--r--src/exporting/pubsub/metadata.yaml152
-rw-r--r--src/exporting/pubsub/pubsub.c199
-rw-r--r--src/exporting/pubsub/pubsub.h14
-rw-r--r--src/exporting/pubsub/pubsub_publish.cc258
-rw-r--r--src/exporting/pubsub/pubsub_publish.h37
7 files changed, 806 insertions, 0 deletions
diff --git a/src/exporting/pubsub/README.md b/src/exporting/pubsub/README.md
new file mode 120000
index 000000000..8633f1725
--- /dev/null
+++ b/src/exporting/pubsub/README.md
@@ -0,0 +1 @@
+integrations/google_cloud_pub_sub.md \ No newline at end of file
diff --git a/src/exporting/pubsub/integrations/google_cloud_pub_sub.md b/src/exporting/pubsub/integrations/google_cloud_pub_sub.md
new file mode 100644
index 000000000..1adfd408e
--- /dev/null
+++ b/src/exporting/pubsub/integrations/google_cloud_pub_sub.md
@@ -0,0 +1,145 @@
+<!--startmeta
+custom_edit_url: "https://github.com/netdata/netdata/edit/master/src/exporting/pubsub/README.md"
+meta_yaml: "https://github.com/netdata/netdata/edit/master/src/exporting/pubsub/metadata.yaml"
+sidebar_label: "Google Cloud Pub Sub"
+learn_status: "Published"
+learn_rel_path: "Exporting Metrics"
+message: "DO NOT EDIT THIS FILE DIRECTLY, IT IS GENERATED BY THE EXPORTER'S metadata.yaml FILE"
+endmeta-->
+
+# Google Cloud Pub Sub
+
+
+<img src="https://netdata.cloud/img/pubsub.png" width="150"/>
+
+
+Export metrics to Google Cloud Pub/Sub Service
+
+
+
+<img src="https://img.shields.io/badge/maintained%20by-Netdata-%2300ab44" />
+
+## Setup
+
+### Prerequisites
+
+####
+
+- First [install](https://github.com/googleapis/google-cloud-cpp/) install Google Cloud Platform C++ Client Libraries
+- Pub/Sub support is also dependent on the dependencies of those libraries, like `protobuf`, `protoc`, and `grpc`
+- Next, Netdata should be re-installed from the source. The installer will detect that the required libraries are now available.
+
+
+
+### Configuration
+
+#### File
+
+The configuration file name for this integration is `exporting.conf`.
+
+
+You can edit the configuration file using the `edit-config` script from the
+Netdata [config directory](/docs/netdata-agent/configuration/README.md#the-netdata-config-directory).
+
+```bash
+cd /etc/netdata 2>/dev/null || cd /opt/netdata/etc/netdata
+sudo ./edit-config exporting.conf
+```
+#### Options
+
+The following options can be defined for this exporter.
+
+
+<details open><summary>Config options</summary>
+
+| Name | Description | Default | Required |
+|:----|:-----------|:-------|:--------:|
+| enabled | Enables or disables an exporting connector instance (yes/no). | no | yes |
+| destination | Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics. | pubsub.googleapis.com | yes |
+| username | Username for HTTP authentication | my_username | no |
+| password | Password for HTTP authentication | my_password | no |
+| data source | Selects the kind of data that will be sent to the external database. (as collected/average/sum) | | no |
+| hostname | The hostname to be used for sending data to the external database server. | [global].hostname | no |
+| prefix | The prefix to add to all metrics. | Netdata | no |
+| update every | Frequency of sending sending data to the external database, in seconds. | 10 | no |
+| buffer on failures | The number of iterations (`update every` seconds) to buffer data, when the external database server is not available. | 10 | no |
+| timeout ms | The timeout in milliseconds to wait for the external database server to process the data. | 2 * update_every * 1000 | no |
+| send hosts matching | Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/src/libnetdata/simple_pattern#simple-patterns). | localhost * | no |
+| send charts matching | One or more space separated patterns (use * as wildcard) checked against both chart id and chart name. | * | no |
+| send names instead of ids | Controls the metric names Netdata should send to the external database (yes/no). | | no |
+| send configured labels | Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes/no). | | no |
+| send automatic labels | Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes/no). | | no |
+
+##### destination
+
+The format of each item in this list, is: [PROTOCOL:]IP[:PORT].
+- PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine.
+- IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port.
+- PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used.
+
+Example IPv4:
+ ```yaml
+ destination = pubsub.googleapis.com
+ ```
+When multiple servers are defined, Netdata will try the next one when the previous one fails.
+
+
+##### update every
+
+Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers
+send data to the same database. This randomness does not affect the quality of the data, only the time they are sent.
+
+
+##### buffer on failures
+
+If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it).
+
+
+##### send hosts matching
+
+Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern).
+The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to
+filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts.
+
+A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`,
+use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative).
+
+
+##### send charts matching
+
+A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads,
+use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used,
+positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter
+has a higher priority than the configuration option.
+
+
+##### send names instead of ids
+
+Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names
+are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are
+different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc.
+
+
+</details>
+
+#### Examples
+
+##### Basic configuration
+
+- Set the destination option to a Pub/Sub service endpoint. pubsub.googleapis.com is the default one.
+- Create the credentials JSON file by following Google Cloud's authentication guide.
+- The user running the Agent (typically netdata) needs read access to google_cloud_credentials.json, which you can set
+ `chmod 400 google_cloud_credentials.json; chown netdata google_cloud_credentials.json`
+- Set the credentials file option to the full path of the file.
+
+
+```yaml
+[pubsub:my_instance]
+ enabled = yes
+ destination = pubsub.googleapis.com
+ credentials file = /etc/netdata/google_cloud_credentials.json
+ project id = my_project
+ topic id = my_topic
+
+```
+
diff --git a/src/exporting/pubsub/metadata.yaml b/src/exporting/pubsub/metadata.yaml
new file mode 100644
index 000000000..48ec02c47
--- /dev/null
+++ b/src/exporting/pubsub/metadata.yaml
@@ -0,0 +1,152 @@
+# yamllint disable rule:line-length
+---
+id: 'export-google-pubsub'
+meta:
+ name: 'Google Cloud Pub Sub'
+ link: 'https://cloud.google.com/pubsub'
+ categories:
+ - export
+ icon_filename: 'pubsub.png'
+keywords:
+ - exporter
+ - Google Cloud
+ - Pub Sub
+overview:
+ exporter_description: |
+ Export metrics to Google Cloud Pub/Sub Service
+ exporter_limitations: ''
+setup:
+ prerequisites:
+ list:
+ - title: ''
+ description: |
+ - First [install](https://github.com/googleapis/google-cloud-cpp/) install Google Cloud Platform C++ Client Libraries
+ - Pub/Sub support is also dependent on the dependencies of those libraries, like `protobuf`, `protoc`, and `grpc`
+ - Next, Netdata should be re-installed from the source. The installer will detect that the required libraries are now available.
+ configuration:
+ file:
+ name: 'exporting.conf'
+ options:
+ description: |
+ The following options can be defined for this exporter.
+ folding:
+ title: 'Config options'
+ enabled: true
+ list:
+ - name: 'enabled'
+ default_value: 'no'
+ description: 'Enables or disables an exporting connector instance (yes|no).'
+ required: true
+ - name: 'destination'
+ default_value: 'pubsub.googleapis.com'
+ description: 'Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics.'
+ required: true
+ detailed_description: |
+ The format of each item in this list, is: [PROTOCOL:]IP[:PORT].
+ - PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine.
+ - IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port.
+ - PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used.
+
+ Example IPv4:
+ ```yaml
+ destination = pubsub.googleapis.com
+ ```
+ When multiple servers are defined, Netdata will try the next one when the previous one fails.
+ - name: 'username'
+ default_value: 'my_username'
+ description: 'Username for HTTP authentication'
+ required: false
+ - name: 'password'
+ default_value: 'my_password'
+ description: 'Password for HTTP authentication'
+ required: false
+ - name: 'data source'
+ default_value: ''
+ description: 'Selects the kind of data that will be sent to the external database. (as collected|average|sum)'
+ required: false
+ - name: 'hostname'
+ default_value: '[global].hostname'
+ description: 'The hostname to be used for sending data to the external database server.'
+ required: false
+ - name: 'prefix'
+ default_value: 'Netdata'
+ description: 'The prefix to add to all metrics.'
+ required: false
+ - name: 'update every'
+ default_value: '10'
+ description: |
+ Frequency of sending sending data to the external database, in seconds.
+ required: false
+ detailed_description: |
+ Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers
+ send data to the same database. This randomness does not affect the quality of the data, only the time they are sent.
+ - name: 'buffer on failures'
+ default_value: '10'
+ description: |
+ The number of iterations (`update every` seconds) to buffer data, when the external database server is not available.
+ required: false
+ detailed_description: |
+ If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it).
+ - name: 'timeout ms'
+ default_value: '2 * update_every * 1000'
+ description: 'The timeout in milliseconds to wait for the external database server to process the data.'
+ required: false
+ - name: 'send hosts matching'
+ default_value: 'localhost *'
+ description: |
+ Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/src/libnetdata/simple_pattern#simple-patterns).
+ required: false
+ detailed_description: |
+ Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern).
+ The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to
+ filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts.
+
+ A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`,
+ use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative).
+ - name: 'send charts matching'
+ default_value: '*'
+ description: |
+ One or more space separated patterns (use * as wildcard) checked against both chart id and chart name.
+ required: false
+ detailed_description: |
+ A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads,
+ use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used,
+ positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter
+ has a higher priority than the configuration option.
+ - name: 'send names instead of ids'
+ default_value: ''
+ description: 'Controls the metric names Netdata should send to the external database (yes|no).'
+ required: false
+ detailed_description: |
+ Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names
+ are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are
+ different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc.
+ - name: 'send configured labels'
+ default_value: ''
+ description: 'Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes|no).'
+ required: false
+ - name: 'send automatic labels'
+ default_value: ''
+ description: 'Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes|no).'
+ required: false
+ examples:
+ folding:
+ enabled: true
+ title: ''
+ list:
+ - name: 'Basic configuration'
+ folding:
+ enabled: false
+ description: |
+ - Set the destination option to a Pub/Sub service endpoint. pubsub.googleapis.com is the default one.
+ - Create the credentials JSON file by following Google Cloud's authentication guide.
+ - The user running the Agent (typically netdata) needs read access to google_cloud_credentials.json, which you can set
+ `chmod 400 google_cloud_credentials.json; chown netdata google_cloud_credentials.json`
+ - Set the credentials file option to the full path of the file.
+ config: |
+ [pubsub:my_instance]
+ enabled = yes
+ destination = pubsub.googleapis.com
+ credentials file = /etc/netdata/google_cloud_credentials.json
+ project id = my_project
+ topic id = my_topic
diff --git a/src/exporting/pubsub/pubsub.c b/src/exporting/pubsub/pubsub.c
new file mode 100644
index 000000000..7fc416258
--- /dev/null
+++ b/src/exporting/pubsub/pubsub.c
@@ -0,0 +1,199 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "pubsub.h"
+
+/**
+ * Initialize Pub/Sub connector instance
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int init_pubsub_instance(struct instance *instance)
+{
+ instance->worker = pubsub_connector_worker;
+
+ instance->start_batch_formatting = NULL;
+ instance->start_host_formatting = format_host_labels_json_plaintext;
+ instance->start_chart_formatting = NULL;
+
+
+ if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
+ instance->metric_formatting = format_dimension_collected_json_plaintext;
+ else
+ instance->metric_formatting = format_dimension_stored_json_plaintext;
+
+ instance->end_chart_formatting = NULL;
+ instance->variables_formatting = NULL;
+ instance->end_host_formatting = flush_host_labels;
+ instance->end_batch_formatting = NULL;
+
+ instance->prepare_header = NULL;
+ instance->check_response = NULL;
+
+ instance->buffer = (void *)buffer_create(0, &netdata_buffers_statistics.buffers_exporters);
+ if (!instance->buffer) {
+ netdata_log_error("EXPORTING: cannot create buffer for Pub/Sub exporting connector instance %s", instance->config.name);
+ return 1;
+ }
+ uv_mutex_init(&instance->mutex);
+ uv_cond_init(&instance->cond_var);
+
+ struct pubsub_specific_data *connector_specific_data = callocz(1, sizeof(struct pubsub_specific_data));
+ instance->connector_specific_data = (void *)connector_specific_data;
+
+ struct pubsub_specific_config *connector_specific_config =
+ (struct pubsub_specific_config *)instance->config.connector_specific_config;
+ char error_message[ERROR_LINE_MAX + 1] = "";
+ if (pubsub_init(
+ (void *)connector_specific_data, error_message, instance->config.destination,
+ connector_specific_config->credentials_file, connector_specific_config->project_id,
+ connector_specific_config->topic_id)) {
+ netdata_log_error(
+ "EXPORTING: Cannot initialize a Pub/Sub publisher for instance %s: %s",
+ instance->config.name, error_message);
+ return 1;
+ }
+
+ return 0;
+}
+
+/**
+ * Clean a PubSub connector instance
+ *
+ * @param instance an instance data structure.
+ */
+void clean_pubsub_instance(struct instance *instance)
+{
+ netdata_log_info("EXPORTING: cleaning up instance %s ...", instance->config.name);
+
+ struct pubsub_specific_data *connector_specific_data =
+ (struct pubsub_specific_data *)instance->connector_specific_data;
+ pubsub_cleanup(connector_specific_data);
+ freez(connector_specific_data);
+
+ buffer_free(instance->buffer);
+
+ struct pubsub_specific_config *connector_specific_config =
+ (struct pubsub_specific_config *)instance->config.connector_specific_config;
+ freez(connector_specific_config->credentials_file);
+ freez(connector_specific_config->project_id);
+ freez(connector_specific_config->topic_id);
+ freez(connector_specific_config);
+
+ netdata_log_info("EXPORTING: instance %s exited", instance->config.name);
+ instance->exited = 1;
+
+ return;
+}
+
+/**
+ * Pub/Sub connector worker
+ *
+ * Runs in a separate thread for every instance.
+ *
+ * @param instance_p an instance data structure.
+ */
+void pubsub_connector_worker(void *instance_p)
+{
+ struct instance *instance = (struct instance *)instance_p;
+ struct pubsub_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ struct pubsub_specific_data *connector_specific_data = instance->connector_specific_data;
+
+ char threadname[ND_THREAD_TAG_MAX + 1];
+ snprintfz(threadname, ND_THREAD_TAG_MAX, "EXPPBSB[%zu]", instance->index);
+ uv_thread_set_name_np(threadname);
+
+ while (!instance->engine->exit) {
+ struct stats *stats = &instance->stats;
+ char error_message[ERROR_LINE_MAX + 1] = "";
+
+ uv_mutex_lock(&instance->mutex);
+ while (!instance->data_is_ready)
+ uv_cond_wait(&instance->cond_var, &instance->mutex);
+ instance->data_is_ready = 0;
+
+
+ if (unlikely(instance->engine->exit)) {
+ uv_mutex_unlock(&instance->mutex);
+ break;
+ }
+
+ // reset the monitoring chart counters
+ stats->received_bytes =
+ stats->sent_bytes =
+ stats->sent_metrics =
+ stats->lost_metrics =
+ stats->receptions =
+ stats->transmission_successes =
+ stats->transmission_failures =
+ stats->data_lost_events =
+ stats->lost_bytes =
+ stats->reconnects = 0;
+
+ BUFFER *buffer = (BUFFER *)instance->buffer;
+ size_t buffer_len = buffer_strlen(buffer);
+
+ stats->buffered_bytes = buffer_len;
+
+ if (pubsub_add_message(instance->connector_specific_data, (char *)buffer_tostring(buffer))) {
+ netdata_log_error("EXPORTING: Instance %s: Cannot add data to a message", instance->config.name);
+
+ stats->data_lost_events++;
+ stats->lost_metrics += stats->buffered_metrics;
+ stats->lost_bytes += buffer_len;
+
+ goto cleanup;
+ }
+
+ netdata_log_debug(
+ D_EXPORTING, "EXPORTING: pubsub_publish(): project = %s, topic = %s, buffer = %zu",
+ connector_specific_config->project_id, connector_specific_config->topic_id, buffer_len);
+
+ if (pubsub_publish((void *)connector_specific_data, error_message, stats->buffered_metrics, buffer_len)) {
+ netdata_log_error("EXPORTING: Instance: %s: Cannot publish a message: %s", instance->config.name, error_message);
+
+ stats->transmission_failures++;
+ stats->data_lost_events++;
+ stats->lost_metrics += stats->buffered_metrics;
+ stats->lost_bytes += buffer_len;
+
+ goto cleanup;
+ }
+
+ stats->sent_bytes = buffer_len;
+ stats->transmission_successes++;
+
+ size_t sent_metrics = 0, lost_metrics = 0, sent_bytes = 0, lost_bytes = 0;
+
+ if (unlikely(pubsub_get_result(
+ connector_specific_data, error_message, &sent_metrics, &sent_bytes, &lost_metrics, &lost_bytes))) {
+ // oops! we couldn't send (all or some of the) data
+ netdata_log_error("EXPORTING: %s", error_message);
+ netdata_log_error(
+ "EXPORTING: failed to write data to service '%s'. Willing to write %zu bytes, wrote %zu bytes.",
+ instance->config.destination, lost_bytes, sent_bytes);
+
+ stats->transmission_failures++;
+ stats->data_lost_events++;
+ stats->lost_metrics += lost_metrics;
+ stats->lost_bytes += lost_bytes;
+ } else {
+ stats->receptions++;
+ stats->sent_metrics = sent_metrics;
+ }
+
+ cleanup:
+ send_internal_metrics(instance);
+
+ buffer_flush(buffer);
+ stats->buffered_metrics = 0;
+
+ uv_mutex_unlock(&instance->mutex);
+
+#ifdef UNIT_TESTING
+ return;
+#endif
+ }
+
+ clean_pubsub_instance(instance);
+}
diff --git a/src/exporting/pubsub/pubsub.h b/src/exporting/pubsub/pubsub.h
new file mode 100644
index 000000000..0bcb76f9b
--- /dev/null
+++ b/src/exporting/pubsub/pubsub.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_PUBSUB_H
+#define NETDATA_EXPORTING_PUBSUB_H
+
+#include "exporting/exporting_engine.h"
+#include "exporting/json/json.h"
+#include "pubsub_publish.h"
+
+int init_pubsub_instance(struct instance *instance);
+void clean_pubsub_instance(struct instance *instance);
+void pubsub_connector_worker(void *instance_p);
+
+#endif //NETDATA_EXPORTING_PUBSUB_H
diff --git a/src/exporting/pubsub/pubsub_publish.cc b/src/exporting/pubsub/pubsub_publish.cc
new file mode 100644
index 000000000..cc14154f8
--- /dev/null
+++ b/src/exporting/pubsub/pubsub_publish.cc
@@ -0,0 +1,258 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <google/pubsub/v1/pubsub.grpc.pb.h>
+#include <grpcpp/grpcpp.h>
+#include <stdexcept>
+#include "pubsub_publish.h"
+
+#define EVENT_CHECK_TIMEOUT 50
+
+struct response {
+ grpc::ClientContext *context;
+ google::pubsub::v1::PublishResponse *publish_response;
+ size_t tag;
+ grpc::Status *status;
+
+ size_t published_metrics;
+ size_t published_bytes;
+};
+
+static inline void copy_error_message(char *error_message_dst, const char *error_message_src)
+{
+ std::strncpy(error_message_dst, error_message_src, ERROR_LINE_MAX);
+ error_message_dst[ERROR_LINE_MAX] = '\0';
+}
+
+/**
+ * Initialize a Pub/Sub client and a data structure for responses.
+ *
+ * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
+ * @param error_message report error message to a caller.
+ * @param destination a Pub/Sub service endpoint.
+ * @param credentials_file a full path for a file with google application credentials.
+ * @param project_id a project ID.
+ * @param topic_id a topic ID.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int pubsub_init(
+ void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file,
+ const char *project_id, const char *topic_id)
+{
+ struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
+
+ try {
+ setenv("GOOGLE_APPLICATION_CREDENTIALS", credentials_file, 0);
+
+ std::shared_ptr<grpc::ChannelCredentials> credentials = grpc::GoogleDefaultCredentials();
+ if (credentials == nullptr) {
+ copy_error_message(error_message, "Can't load credentials");
+ return 1;
+ }
+
+ std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(destination, credentials);
+
+ google::pubsub::v1::Publisher::Stub *stub = new google::pubsub::v1::Publisher::Stub(channel);
+ if (!stub) {
+ copy_error_message(error_message, "Can't create a publisher stub");
+ return 1;
+ }
+
+ connector_specific_data->stub = stub;
+
+ google::pubsub::v1::PublishRequest *request = new google::pubsub::v1::PublishRequest;
+ connector_specific_data->request = request;
+ ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))
+ ->set_topic(std::string("projects/") + project_id + "/topics/" + topic_id);
+
+ grpc::CompletionQueue *cq = new grpc::CompletionQueue;
+ connector_specific_data->completion_queue = cq;
+
+ connector_specific_data->responses = new std::list<struct response>;
+
+ return 0;
+ } catch (std::exception const &ex) {
+ std::string em(std::string("Standard exception raised: ") + ex.what());
+ copy_error_message(error_message, em.c_str());
+ return 1;
+ }
+
+ return 0;
+}
+
+/**
+ * Clean the PubSub connector instance specific data
+ */
+void pubsub_cleanup(void *pubsub_specific_data_p)
+{
+ struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
+
+ std::list<struct response> *responses = (std::list<struct response> *)connector_specific_data->responses;
+ std::list<struct response>::iterator response;
+ for (response = responses->begin(); response != responses->end(); ++response) {
+ // TODO: If we do this, there are a huge amount of possibly lost records. We need to find a right way of
+ // cleaning up contexts
+ // delete response->context;
+ delete response->publish_response;
+ delete response->status;
+ }
+ delete responses;
+
+ ((grpc::CompletionQueue *)connector_specific_data->completion_queue)->Shutdown();
+ delete (grpc::CompletionQueue *)connector_specific_data->completion_queue;
+ delete (google::pubsub::v1::PublishRequest *)connector_specific_data->request;
+ delete (google::pubsub::v1::Publisher::Stub *)connector_specific_data->stub;
+
+ // TODO: Find how to shutdown grpc gracefully. grpc_shutdown() doesn't seem to work.
+ // grpc_shutdown();
+
+ return;
+}
+
+/**
+ * Add data to a Pub/Sub request message.
+ *
+ * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
+ * @param data a text buffer with metrics.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int pubsub_add_message(void *pubsub_specific_data_p, char *data)
+{
+ struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
+
+ try {
+ google::pubsub::v1::PubsubMessage *message =
+ ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->add_messages();
+ if (!message)
+ return 1;
+
+ message->set_data(data);
+ } catch (std::exception const &ex) {
+ return 1;
+ }
+
+ return 0;
+}
+
+/**
+ * Send data to the Pub/Sub service
+ *
+ * @param pubsub_specific_data_p a pointer to a structure with client and request outcome information.
+ * @param error_message report error message to a caller.
+ * @param buffered_metrics the number of metrics we are going to send.
+ * @param buffered_bytes the number of bytes we are going to send.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes)
+{
+ struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
+
+ try {
+ grpc::ClientContext *context = new grpc::ClientContext;
+
+ std::unique_ptr<grpc::ClientAsyncResponseReader<google::pubsub::v1::PublishResponse> > rpc(
+ ((google::pubsub::v1::Publisher::Stub *)(connector_specific_data->stub))
+ ->AsyncPublish(
+ context, (*(google::pubsub::v1::PublishRequest *)(connector_specific_data->request)),
+ ((grpc::CompletionQueue *)(connector_specific_data->completion_queue))));
+
+ struct response response;
+ response.context = context;
+ response.publish_response = new google::pubsub::v1::PublishResponse;
+ response.tag = connector_specific_data->last_tag++;
+ response.status = new grpc::Status;
+ response.published_metrics = buffered_metrics;
+ response.published_bytes = buffered_bytes;
+
+ rpc->Finish(response.publish_response, response.status, (void *)response.tag);
+
+ ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->clear_messages();
+
+ ((std::list<struct response> *)(connector_specific_data->responses))->push_back(response);
+ } catch (std::exception const &ex) {
+ std::string em(std::string("Standard exception raised: ") + ex.what());
+ copy_error_message(error_message, em.c_str());
+ return 1;
+ }
+
+ return 0;
+}
+
+/**
+ * Get results from service responses
+ *
+ * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
+ * @param error_message report error message to a caller.
+ * @param sent_metrics report to a caller how many metrics was successfully sent.
+ * @param sent_bytes report to a caller how many bytes was successfully sent.
+ * @param lost_metrics report to a caller how many metrics was lost during transmission.
+ * @param lost_bytes report to a caller how many bytes was lost during transmission.
+ * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission.
+ */
+int pubsub_get_result(
+ void *pubsub_specific_data_p, char *error_message,
+ size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes)
+{
+ struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
+ std::list<struct response> *responses = (std::list<struct response> *)connector_specific_data->responses;
+ grpc::CompletionQueue::NextStatus next_status;
+
+ *sent_metrics = 0;
+ *sent_bytes = 0;
+ *lost_metrics = 0;
+ *lost_bytes = 0;
+
+ try {
+ do {
+ std::list<struct response>::iterator response;
+ void *got_tag;
+ bool ok = false;
+
+ auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(50);
+ next_status = (*(grpc::CompletionQueue *)(connector_specific_data->completion_queue))
+ .AsyncNext(&got_tag, &ok, deadline);
+
+ if (next_status == grpc::CompletionQueue::GOT_EVENT) {
+ for (response = responses->begin(); response != responses->end(); ++response) {
+ if ((void *)response->tag == got_tag)
+ break;
+ }
+
+ if (response == responses->end()) {
+ copy_error_message(error_message, "Cannot get Pub/Sub response");
+ return 1;
+ }
+
+ if (ok && response->publish_response->message_ids_size()) {
+ *sent_metrics += response->published_metrics;
+ *sent_bytes += response->published_bytes;
+ } else {
+ *lost_metrics += response->published_metrics;
+ *lost_bytes += response->published_bytes;
+ response->status->error_message().copy(error_message, ERROR_LINE_MAX);
+ error_message[ERROR_LINE_MAX] = '\0';
+ }
+
+ delete response->context;
+ delete response->publish_response;
+ delete response->status;
+ responses->erase(response);
+ }
+
+ if (next_status == grpc::CompletionQueue::SHUTDOWN) {
+ copy_error_message(error_message, "Completion queue shutdown");
+ return 1;
+ }
+
+ } while (next_status == grpc::CompletionQueue::GOT_EVENT);
+ } catch (std::exception const &ex) {
+ std::string em(std::string("Standard exception raised: ") + ex.what());
+ copy_error_message(error_message, em.c_str());
+ return 1;
+ }
+
+ if (*lost_metrics) {
+ return 1;
+ }
+
+ return 0;
+}
diff --git a/src/exporting/pubsub/pubsub_publish.h b/src/exporting/pubsub/pubsub_publish.h
new file mode 100644
index 000000000..567a262f0
--- /dev/null
+++ b/src/exporting/pubsub/pubsub_publish.h
@@ -0,0 +1,37 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_PUBSUB_PUBLISH_H
+#define NETDATA_EXPORTING_PUBSUB_PUBLISH_H
+
+#define ERROR_LINE_MAX 1023
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct pubsub_specific_data {
+ void *stub;
+ void *request;
+ void *completion_queue;
+
+ void *responses;
+ size_t last_tag;
+};
+
+int pubsub_init(
+ void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file,
+ const char *project_id, const char *topic_id);
+void pubsub_cleanup(void *pubsub_specific_data_p);
+
+int pubsub_add_message(void *pubsub_specific_data_p, char *data);
+
+int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes);
+int pubsub_get_result(
+ void *pubsub_specific_data_p, char *error_message,
+ size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //NETDATA_EXPORTING_PUBSUB_PUBLISH_H